Observable.ToAsyncは、引数のデリゲートが返した値を発行するobservableを返すデリゲートの作成に使う。
Observable.ToAsync Method (System.Reactive.Linq) | Microsoft Docs
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」 - かずきのBlog@hatena
確認してみたけど、Startの場合は、Startが呼ばれた時点で非同期処理を開始する。
ToAsyncは、IObservableを返すデリゲートを返すが、実行は()やInvoke()で行う。
なので、実行を遅らせたりできる。
実行してIObservableが取得出来たら、それをSubscribeできる。
using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Observable_ { class Program { static void Main(string[] args) { // 引数のデリゲートが返した値を発行するobservableを返すデリゲートの作成. var func = Observable.ToAsync(() =>{ // funcに格納. Console.WriteLine("Observable.ToAsync 1"); // "Observable.ToAsync 1"と出力. Thread.Sleep(5000); // 5秒休止. Console.WriteLine("Observable.ToAsync 2"); // "Observable.ToAsync 2"と出力. return 1; // 1を返す. } ); Console.WriteLine("Observable.ToAsync 3"); // "Observable.ToAsync 3"と出力. var observable = func(); // funcを実行し, observableを取得. Console.WriteLine("Observable.ToAsync 4"); // "Observable.ToAsync 4"と出力. // 指定の動作をするsubscriberの取得. var subscriber = observable.Subscribe(x => { Console.WriteLine("Observable.ToAsync 5"); // "Observable.ToAsync 5"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.ToAsync 6"); // "Observable.ToAsync 6"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriberを返す. Console.WriteLine("Observable.ToAsync 7"); // "Observable.ToAsync 7"と出力. // 10秒待つ. Thread.Sleep(10000); // 10秒休止. // 購読停止. subscriber.Dispose(); // Disposeする. Console.WriteLine("Observable.ToAsync 8"); // "Observable.ToAsync 8"と出力. Console.ReadKey(); // 入力待ち. } } }
こういった場合、
Observable.ToAsync 3 Observable.ToAsync 4 Observable.ToAsync 1 Observable.ToAsync 7
最初、
Observable.ToAsync 3 Observable.ToAsync 4 Observable.ToAsync 1 Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed.
5秒、
Observable.ToAsync 3 Observable.ToAsync 4 Observable.ToAsync 1 Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed. Observable.ToAsync 8
10秒、
Observable.ToAsync 3 Observable.ToAsync 4 Observable.ToAsync 1 Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed. Observable.ToAsync 8 続行するには何かキーを押してください . . .
リターンを押してこうなる。
using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Observable_ { class Program { static void Main(string[] args) { // 引数のデリゲートが返した値を発行するobservableを返すデリゲートの作成. var func = Observable.ToAsync(() =>{ // funcに格納. Console.WriteLine("Observable.ToAsync 1"); // "Observable.ToAsync 1"と出力. Thread.Sleep(5000); // 5秒休止. Console.WriteLine("Observable.ToAsync 2"); // "Observable.ToAsync 2"と出力. return 1; // 1を返す. } ); Console.WriteLine("Observable.ToAsync 3A"); // "Observable.ToAsync 3A"と出力. Thread.Sleep(2000); // 2秒休止. Console.WriteLine("Observable.ToAsync 3B"); // "Observable.ToAsync 3B"と出力. var observable = func(); // funcを実行し, observableを取得. Console.WriteLine("Observable.ToAsync 4A"); // "Observable.ToAsync 4A"と出力. Thread.Sleep(1000); // 1秒休止. Console.WriteLine("Observable.ToAsync 4B"); // "Observable.ToAsync 4B"と出力. // 指定の動作をするsubscriberの取得. var subscriber = observable.Subscribe(x => { Console.WriteLine("Observable.ToAsync 5"); // "Observable.ToAsync 5"と出力. Console.WriteLine("x = " + x); Console.WriteLine("Observable.ToAsync 6"); // "Observable.ToAsync 6"と出力. }, () => { Console.WriteLine("Completed."); } ); // こういう動作を指定し, subscriberを返す. Console.WriteLine("Observable.ToAsync 7"); // "Observable.ToAsync 7"と出力. // 10秒待つ. Thread.Sleep(10000); // 10秒休止. // 購読停止. subscriber.Dispose(); // Disposeする. Console.WriteLine("Observable.ToAsync 8"); // "Observable.ToAsync 8"と出力. Console.ReadKey(); // 入力待ち. } } }
ToAsyncのあとに2秒待って、funcのあとに1秒待って、Subscribe。
Observable.ToAsync 3A
最初、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1
2秒後、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1 Observable.ToAsync 4B Observable.ToAsync 7
3秒後、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1 Observable.ToAsync 4B Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed.
7秒後、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1 Observable.ToAsync 4B Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed. Observable.ToAsync 8
12秒後、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1 Observable.ToAsync 4B Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed. Observable.ToAsync 8 続行するには何かキーを押してください . .
キーを押したらこうなる。
ToAsyncの中身の非同期処理が終わってから、Subscribeをもう1回やった場合でも、
Observable.ToAsync 3A Observable.ToAsync 3B Observable.ToAsync 4A Observable.ToAsync 1 Observable.ToAsync 4B Observable.ToAsync 7 Observable.ToAsync 2 Observable.ToAsync 5 x = 1 Observable.ToAsync 6 Completed. Observable.ToAsync 9 x = 1 Observable.ToAsync 10 Completed. Observable.ToAsync 8 Observable.ToAsync 11 続行するには何かキーを押してください . . .
発行される値はキャッシュされているので、2度目もその値で通知される。
これはStartの場合と同様である。
Sample/rx/Observable/ToAsync/src/Observable_ at master · bg1bgst333/Sample · GitHub