Observable.Start

Observable.Startは、引数で渡したデリゲートの非同期処理を開始し、その戻り値を発行する。

Observable.Start Method (Action, IScheduler) (System.Reactive.Linq) | Microsoft Docs
Reactive Extensions入門 4「Observableを作成する便利なメソッド」 - かずきのBlog@hatena
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」 - かずきのBlog@hatena

ここからちょっとややこしくなってくる。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Observable_
{
    class Program
    {
        static void Main(string[] args)
        {
            // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成.
            var observable = Observable.Start(() => {
                    Console.WriteLine("Observable.Start 1");    // "Observable.Start 1"と出力.
                    Thread.Sleep(5000); // 5秒休止.
                    Console.WriteLine("Observable.Start 2");    // "Observable.Start 2"と出力.
                    return 1;   // 1を返す.
                }
            );
            Console.WriteLine("Observable.Start 3");    // "Observable.Start 3"と出力.
            // 指定の動作をするsubscriberの取得.
            var subscriber = observable.Subscribe(x => {
                    Console.WriteLine("Observable.Start 4");    // "Observable.Start 4"と出力.
                    Console.WriteLine("x = " + x);
                    Console.WriteLine("Observable.Start 5");    // "Observable.Start 5"と出力.
                },
                () => {
                    Console.WriteLine("Completed.");
                }
            ); // こういう動作を指定し, subscriberを返す.
            Console.WriteLine("Observable.Start 6");    // "Observable.Start 6"と出力.
            // 購読停止.
            subscriber.Dispose();   // Disposeする.
            Console.WriteLine("Observable.Start 7");    // "Observable.Start 7"と出力.
            Console.ReadKey();  // 入力待ち.
        }
    }
}

非同期処理内で5秒休止。
これぐらいログ入れないと流れがきちんと捉えられない。
最初は、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 7

こうなって、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 7
Observable.Start 2

5秒後にこうなって、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 7
Observable.Start 2
続行するには何かキーを押してください . . .

キーを押したら、こうなる。
通知来てないなあとおもったら、5秒の処理が終わる前にDisposeしてるからだった・・・。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Observable_
{
    class Program
    {
        static void Main(string[] args)
        {
            // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成.
            var observable = Observable.Start(() => {
                    Console.WriteLine("Observable.Start 1");    // "Observable.Start 1"と出力.
                    Thread.Sleep(5000); // 5秒休止.
                    Console.WriteLine("Observable.Start 2");    // "Observable.Start 2"と出力.
                    return 1;   // 1を返す.
                }
            );
            Console.WriteLine("Observable.Start 3");    // "Observable.Start 3"と出力.
            // 指定の動作をするsubscriberの取得.
            var subscriber = observable.Subscribe(x => {
                    Console.WriteLine("Observable.Start 4");    // "Observable.Start 4"と出力.
                    Console.WriteLine("x = " + x);
                    Console.WriteLine("Observable.Start 5");    // "Observable.Start 5"と出力.
                },
                () => {
                    Console.WriteLine("Completed.");
                }
            ); // こういう動作を指定し, subscriberを返す.
            Console.WriteLine("Observable.Start 6");    // "Observable.Start 6"と出力.
            // 10秒待つ.
            Thread.Sleep(10000); // 10秒休止.
            // 購読停止.
            subscriber.Dispose();   // Disposeする.
            Console.WriteLine("Observable.Start 7");    // "Observable.Start 7"と出力.
            Console.ReadKey();  // 入力待ち.
        }
    }
}

10秒待ってからDisposeする。
そうすると、

Observable.Start 1
Observable.Start 3
Observable.Start 6

で始まり、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.

5秒後にはこうなっていて、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 7

10秒だとこう。

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 7
続行するには何かキーを押してください . . .

キーを押してこうなる。
これでわかるのは、Subscribeをしておいて、処理が終わったら、通知が来るというところ。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Observable_
{
    class Program
    {
        static void Main(string[] args)
        {
            // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成.
            var observable = Observable.Start(() => {
                    Console.WriteLine("Observable.Start 1");    // "Observable.Start 1"と出力.
                    Thread.Sleep(5000); // 5秒休止.
                    Console.WriteLine("Observable.Start 2");    // "Observable.Start 2"と出力.
                    return 1;   // 1を返す.
                }
            );
            Console.WriteLine("Observable.Start 3");    // "Observable.Start 3"と出力.
            // 指定の動作をするsubscriberの取得.
            var subscriber = observable.Subscribe(x => {
                    Console.WriteLine("Observable.Start 4");    // "Observable.Start 4"と出力.
                    Console.WriteLine("x = " + x);
                    Console.WriteLine("Observable.Start 5");    // "Observable.Start 5"と出力.
                },
                () => {
                    Console.WriteLine("Completed.");
                }
            ); // こういう動作を指定し, subscriberを返す.
            Console.WriteLine("Observable.Start 6");    // "Observable.Start 6"と出力.
            // もう一度Subscribe.
            var subscriber2 = observable.Subscribe(x => {
                    Console.WriteLine("Observable.Start 8");    // "Observable.Start 8"と出力.
                    Console.WriteLine("x = " + x);
                    Console.WriteLine("Observable.Start 9");    // "Observable.Start 9"と出力.
                },
                () => {
                    Console.WriteLine("Completed.");
                }
            ); // こういう動作を指定し, subscriber2を返す.
            // 10秒待つ.
            Thread.Sleep(10000); // 10秒休止.
            // 購読停止.
            subscriber.Dispose();   // Disposeする.
            subscriber2.Dispose();  // Disposeする.
            Console.WriteLine("Observable.Start 7");    // "Observable.Start 7"と出力.
            Console.ReadKey();  // 入力待ち.
        }
    }
}

Subscribeを2回してみる。

Observable.Start 3
Observable.Start 1
Observable.Start 6

最初、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.

5秒、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.
Observable.Start 7

10秒、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.
Observable.Start 7
続行するには何かキーを押してください . . .

キーを押して、こうなる。
2個登録を先にしていても、処理が終わってから一気に2つ通知が来る。

計10秒経ってから改めてSubscribeした場合どうなるか。

Observable.Start 3
Observable.Start 1
Observable.Start 6

最初、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.

5秒、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.
Observable.Start 10
x = 1
Observable.Start 11
Completed.
Observable.Start 7

10秒、

Observable.Start 3
Observable.Start 1
Observable.Start 6
Observable.Start 2
Observable.Start 4
x = 1
Observable.Start 5
Completed.
Observable.Start 8
x = 1
Observable.Start 9
Completed.
Observable.Start 10
x = 1
Observable.Start 11
Completed.
Observable.Start 7
続行するには何かキーを押してください . . .

キーを押してこうなる。
処理が終わってからSubscribeしても、通知される。
戻り値がキャッシュされているらしい。

Sample/rx/Observable/Start/src/Observable_ at master · bg1bgst333/Sample · GitHub