チェーンされたタスクから監視可能

独自の async Generate をロールする場合 while ループをラップする代わりに、再帰的なスケジューリングを使用することをお勧めします。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

これにはいくつかの利点があります。まず、これをキャンセルできます。単純な while ループを使用すると、直接キャンセルする方法がありません。実際、オブザーバブルが完了するまで、サブスクライブ関数に戻ることさえありません。第二に、これにより、各アイテムのスケジューリング/非同期を制御できます (テストが簡単になります)。これにより、全体的にライブラリに適したものになります


かなりのテストを行った後、これは組み込みの Rx 演算子を使用してうまく機能すると思います。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

このコードを次のようにテストしました:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

そして、このシーケンスを実行します:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

この結果が得られました:

テスト コードでは、Reactive Extension チームの Interactive Extensions - NuGet "Ix-Main" も使用しました。