Observable.Startは、引数で渡したデリゲートの非同期処理を開始し、その戻り値を発行する。
Observable.Start Method (Action, IScheduler) (System.Reactive.Linq) | Microsoft Docs
Reactive Extensions入門 4「Observableを作成する便利なメソッド」 - かずきのBlog@hatena
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」 - かずきのBlog@hatena
ここからちょっとややこしくなってくる。
using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Observable_ { class Program { static void Main(string[] args) { // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成. var observable = Observable.Start(() => { Console.WriteLine("Observable.Start 1"); // "Observable.Start 1"と出力. Thread.Sleep(5000); // 5秒休止. Console.WriteLine("Observable.Start 2"); // "Observable.Start 2"と出力. return 1; // 1を返す. } ); Console.WriteLine("Observable.Start 3"); // "Observable.Start 3"と出力. // 指定の動作をするsubscriberの取得. var subscriber = observable.Subscribe(x => { Console.WriteLine("Observable.Start 4"); // "Observable.Start 4"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.Start 5"); // "Observable.Start 5"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriberを返す. Console.WriteLine("Observable.Start 6"); // "Observable.Start 6"と出力. // 購読停止. subscriber.Dispose(); // Disposeする. Console.WriteLine("Observable.Start 7"); // "Observable.Start 7"と出力. Console.ReadKey(); // 入力待ち. } } }
非同期処理内で5秒休止。
これぐらいログ入れないと流れがきちんと捉えられない。
最初は、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 7
こうなって、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 7 Observable.Start 2
5秒後にこうなって、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 7 Observable.Start 2 続行するには何かキーを押してください . . .
キーを押したら、こうなる。
通知来てないなあとおもったら、5秒の処理が終わる前にDisposeしてるからだった・・・。
using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Observable_ { class Program { static void Main(string[] args) { // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成. var observable = Observable.Start(() => { Console.WriteLine("Observable.Start 1"); // "Observable.Start 1"と出力. Thread.Sleep(5000); // 5秒休止. Console.WriteLine("Observable.Start 2"); // "Observable.Start 2"と出力. return 1; // 1を返す. } ); Console.WriteLine("Observable.Start 3"); // "Observable.Start 3"と出力. // 指定の動作をするsubscriberの取得. var subscriber = observable.Subscribe(x => { Console.WriteLine("Observable.Start 4"); // "Observable.Start 4"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.Start 5"); // "Observable.Start 5"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriberを返す. Console.WriteLine("Observable.Start 6"); // "Observable.Start 6"と出力. // 10秒待つ. Thread.Sleep(10000); // 10秒休止. // 購読停止. subscriber.Dispose(); // Disposeする. Console.WriteLine("Observable.Start 7"); // "Observable.Start 7"と出力. Console.ReadKey(); // 入力待ち. } } }
10秒待ってからDisposeする。
そうすると、
Observable.Start 1 Observable.Start 3 Observable.Start 6
で始まり、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed.
5秒後にはこうなっていて、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 7
10秒だとこう。
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 7 続行するには何かキーを押してください . . .
キーを押してこうなる。
これでわかるのは、Subscribeをしておいて、処理が終わったら、通知が来るというところ。
using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Observable_ { class Program { static void Main(string[] args) { // 引数のデリゲートを実行し, 返した値が発行されるobservableの作成. var observable = Observable.Start(() => { Console.WriteLine("Observable.Start 1"); // "Observable.Start 1"と出力. Thread.Sleep(5000); // 5秒休止. Console.WriteLine("Observable.Start 2"); // "Observable.Start 2"と出力. return 1; // 1を返す. } ); Console.WriteLine("Observable.Start 3"); // "Observable.Start 3"と出力. // 指定の動作をするsubscriberの取得. var subscriber = observable.Subscribe(x => { Console.WriteLine("Observable.Start 4"); // "Observable.Start 4"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.Start 5"); // "Observable.Start 5"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriberを返す. Console.WriteLine("Observable.Start 6"); // "Observable.Start 6"と出力. // もう一度Subscribe. var subscriber2 = observable.Subscribe(x => { Console.WriteLine("Observable.Start 8"); // "Observable.Start 8"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.Start 9"); // "Observable.Start 9"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriber2を返す. // 10秒待つ. Thread.Sleep(10000); // 10秒休止. // 購読停止. subscriber.Dispose(); // Disposeする. subscriber2.Dispose(); // Disposeする. Console.WriteLine("Observable.Start 7"); // "Observable.Start 7"と出力. Console.ReadKey(); // 入力待ち. } } }
Subscribeを2回してみる。
Observable.Start 3 Observable.Start 1 Observable.Start 6
最初、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed.
5秒、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed. Observable.Start 7
10秒、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed. Observable.Start 7 続行するには何かキーを押してください . . .
キーを押して、こうなる。
2個登録を先にしていても、処理が終わってから一気に2つ通知が来る。
計10秒経ってから改めてSubscribeした場合どうなるか。
Observable.Start 3 Observable.Start 1 Observable.Start 6
最初、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed.
5秒、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed. Observable.Start 10 x = 1 Observable.Start 11 Completed. Observable.Start 7
10秒、
Observable.Start 3 Observable.Start 1 Observable.Start 6 Observable.Start 2 Observable.Start 4 x = 1 Observable.Start 5 Completed. Observable.Start 8 x = 1 Observable.Start 9 Completed. Observable.Start 10 x = 1 Observable.Start 11 Completed. Observable.Start 7 続行するには何かキーを押してください . . .
キーを押してこうなる。
処理が終わってからSubscribeしても、通知される。
戻り値がキャッシュされているらしい。
Sample/rx/Observable/Start/src/Observable_ at master · bg1bgst333/Sample · GitHub