Observable.Defer<TValue>

Observable.Defer<TValue>は、変わった動きをする。

Observable.Defer(TValue) Method (System.Reactive.Linq) | Microsoft Docs
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」 - かずきのBlog@hatena

Subscribeするたびに、渡されたラムダ式を実行する。
ラムダ式の戻り値は、IObservable<TValue>である必要がある。
大抵は、中で生成した値の発行と通知が完了したSubjectであり、それを返すとDeferの戻り値として返る。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Observable_
{
    class Program
    {
        // メインメソッド
        static void Main(string[] args)
        {
            // 3秒後にint型の値を発行して終了するobservableの作成.
            Console.WriteLine("Observable.Defer<TValue> 1");    // "Observable.Defer<TValue> 1"を出力.
            var observable = Observable.Defer<int>(() =>
                {
                    // 値の発行と完了.
                    Console.WriteLine("Observable.Defer<TValue> 2");    // "Observable.Defer<TValue> 2"を出力.
                    Thread.Sleep(3000); // 3秒休止.
                    Console.WriteLine("Observable.Defer<TValue> 3");    // "Observable.Defer<TValue> 3"を出力.
                    var subscriber = new Subject<int>();    // subscriber生成.
                    Console.WriteLine("Observable.Defer<TValue> 4");    // "Observable.Defer<TValue> 4"を出力.
                    subscriber.OnNext(1);   // 1を発行.
                    Console.WriteLine("Observable.Defer<TValue> 5");    // "Observable.Defer<TValue> 5"を出力.
                    Thread.Sleep(3000); // 3秒休止.
                    Console.WriteLine("Observable.Defer<TValue> 6");    // "Observable.Defer<TValue> 6"を出力.
                    subscriber.OnCompleted();   // 完了通知.
                    Console.WriteLine("Observable.Defer<TValue> 7");    // "Observable.Defer<TValue> 7"を出力.
                    return subscriber;  // subscriberはIObservable<int>なのでそのまま返せる.
                }
            );
            Console.WriteLine("Observable.Defer<TValue> 8");    // "Observable.Defer<TValue> 8"を出力.
            // 1度目のSubscribe.
            var subscriber1 = observable.Subscribe(x => Console.WriteLine("x = " + x), ex => Console.WriteLine("ex.Message = " + ex.Message), () => Console.WriteLine("Complete.")); // 値の通知, 例外の通知, 完了の通知をセット.
            Console.WriteLine("Observable.Defer<TValue> 9");    // "Observable.Defer<TValue> 9"を出力.
            // 2度目のSubscribe.
            var subscriber2 = observable.Subscribe(x => Console.WriteLine("x = " + x), ex => Console.WriteLine("ex.Message = " + ex.Message), () => Console.WriteLine("Complete.")); // 値の通知, 例外の通知, 完了の通知をセット.
            Console.WriteLine("Observable.Defer<TValue> 10");    // "Observable.Defer<TValue> 10"を出力.
            subscriber1.Dispose();  // subscriber1の解除.
            subscriber2.Dispose();  // subscriber2の解除.
        }
    }
}

実行すると、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2

最初、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5

3秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2

6秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5

9秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
Complete.
Observable.Defer<TValue> 10
続行するには何かキーを押してください . . .

12秒後となる。

Subscribeするたびにラムダ式を実行する。
言い換えれば、Subscribeするまではラムダ式の実行を遅らせることが出来る。
そしてラムダ式の中でSleepしているが、これも実際に起こる。
ただし、内側のSubjectについては完了した状態で返しているので、内側でOnNextしても、外側のSubscribeに対する通知は起きない。

ReplaySubjectであれば、OnNextした値はキャッシュされるので、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2

最初、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5

3秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
x = 1
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2

6秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
x = 1
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5

9秒後、

Observable.Defer<TValue> 1
Observable.Defer<TValue> 8
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
x = 1
Complete.
Observable.Defer<TValue> 9
Observable.Defer<TValue> 2
Observable.Defer<TValue> 3
Observable.Defer<TValue> 4
Observable.Defer<TValue> 5
Observable.Defer<TValue> 6
Observable.Defer<TValue> 7
x = 1
Complete.
Observable.Defer<TValue> 10
続行するには何かキーを押してください . . .

12秒後となり、外側のSubscribeに対する通知は起きる。

Sample/rx/Observable/Defer_TValue/src/Observable_ at master · bg1bgst333/Sample · GitHub