独自の async Generate
をロールする場合 while ループをラップする代わりに、再帰的なスケジューリングを使用することをお勧めします。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
これにはいくつかの利点があります。まず、これをキャンセルできます。単純な while ループを使用すると、直接キャンセルする方法がありません。実際、オブザーバブルが完了するまで、サブスクライブ関数に戻ることさえありません。第二に、これにより、各アイテムのスケジューリング/非同期を制御できます (テストが簡単になります)。これにより、全体的にライブラリに適したものになります
かなりのテストを行った後、これは組み込みの Rx 演算子を使用してうまく機能すると思います。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
このコードを次のようにテストしました:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
そして、このシーケンスを実行します:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
この結果が得られました:
テスト コードでは、Reactive Extension チームの Interactive Extensions - NuGet "Ix-Main" も使用しました。