タスク並列ライブラリ (TPL) データフロー コンストラクト

# アクションブロック

(foreach)

このクラスは、論理的には、処理対象のデータとそのデータを処理するためのタスクを組み合わせたバッファーであり、両方を管理する「データフロー ブロック」と考えることができます。最も基本的な使用法では、ActionBlock をインスタンス化し、それにデータを「投稿」できます。 ActionBlock の構築時に提供されるデリゲートは、投稿されたすべてのデータに対して非同期的に実行されます。

同期計算

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

非同期ダウンロードを同時に 5 つまでにスロットリング

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Stephen Toub による TPL データフローの紹介

# BroadcastBlock

(アイテムをコピーし、リンクされているすべてのブロックにコピーを送信します)

BufferBlock とは異なり、BroadcastBlock の使命は、ブロックからリンクされたすべてのターゲットが公開されたすべての要素のコピーを取得できるようにすることであり、「現在の」値を伝播された値で継続的に上書きします。

さらに、BufferBlock とは異なり、BroadcastBlock は不必要にデータを保持しません。特定のデータがすべてのターゲットに提供された後、その要素は次のデータによって上書きされます (すべてのデータフロー ブロックと同様に、メッセージは FIFO 順で処理されます)。その要素はすべてのターゲットなどに提供されます。

スロットリング プロデューサーを使用した非同期プロデューサー/コンシューマー

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

エージェントからステータスを公開する

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Stephen Toub による TPL データフローの紹介

# BufferBlock

(FIFO キュー:入ってくるデータは出ていくデータです)

つまり、BufferBlock は、T のインスタンスを格納するための無制限または境界付きのバッファーを提供します。
T のインスタンスをブロックに「ポスト」することができます。これにより、ポストされるデータがブロックによって先入れ先出し (FIFO) の順序で格納されます。
ブロックから「受け取る」ことができます。これにより、以前に保存された、または将来利用可能な T のインスタンスを同期的または非同期的に取得できます (再び FIFO)。

スロットリング プロデューサーを使用した非同期プロデューサー/コンシューマー

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Stephen Toub による TPL データフローの紹介

# JoinBlock

(2 ~ 3 個の入力を収集し、それらをタプルに結合します)

BatchBlock と同様に、JoinBlock は、複数のデータ ソースからのデータをグループ化できます。実際、それが JoinBlock の主な目的です。

たとえば、JoinBlock は ISourceBlock>.

です。

BatchBlock と同様に、JoinBlock は貪欲モードと非貪欲モードの両方で動作できます。

  • デフォルトの貪欲モードでは、タプルを形成するために必要なデータが他のターゲットにない場合でも、ターゲットに提供されたすべてのデータが受け入れられます。
  • 貪欲でないモードでは、タプルを作成するために必要なデータがすべてのターゲットに提供されるまで、ブロックのターゲットはデータを延期します。その時点で、ブロックは 2 フェーズ コミット プロトコルを実行して、必要なすべてのアイテムをアトミックに取得します。情報源。この延期により、その間に別のエンティティがデータを消費して、システム全体が前進できるようになります。

限られた数のプールされたオブジェクトでリクエストを処理する

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Stephen Toub による TPL データフローの紹介

# WriteOnceBlock

(読み取り専用変数:最初のデータ項目を記憶し、そのコピーを出力として渡します。他のすべてのデータ項目は無視されます)

BufferBlock が TPL データフローの最も基本的なブロックである場合、WriteOnceBlock は最も単純です。
最大で 1 つの値を格納し、一度その値が設定されると、置き換えられたり上書きされたりすることはありません。

WriteOnceBlock は、C# の readonly メンバー変数に似ていると考えることができますが、コンストラクターでのみ設定可能でその後不変になるのではなく、一度だけ設定可能でその後は不変になります。

タスクの潜在的な出力を分割する

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Stephen Toub による TPL データフローの紹介

# BatchedJoinBlock

(2 ~ 3 個の入力から一定数の合計項目を収集し、それらをデータ項目のコレクションのタプルにグループ化します)

BatchedJoinBlock は、ある意味で BatchBlock と JoinBlock の組み合わせです。
JoinBlock は各ターゲットから 1 つの入力をタプルに集約するために使用され、BatchBlock は N 個の入力をコレクションに集約するために使用され、BatchedJoinBlock は全体から N 個の入力を収集するために使用されます。すべてのターゲットをコレクションのタプルにします。

分散/収集

N 個の操作が開始されるスキャッター/ギャザーの問題を考えてみましょう。そのうちのいくつかは成功して文字列出力を生成し、他の操作は失敗して例外を生成する可能性があります。

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Stephen Toub による TPL データフローの紹介

# TransformBlock

(選択、1 対 1)

ActionBlock と同様に、TransformBlock を使用すると、デリゲートを実行して、入力データごとに何らかのアクションを実行できます。 ActionBlock とは異なり、この処理には出力があります。 このデリゲートは Func にすることができます。この場合、その要素の処理はデリゲートが返されたときに完了したと見なされます。または、Func にすることができます。この場合、その要素の処理は完了していないと見なされます。デリゲートが返されたとき、返されたタスクが完了したとき。LINQ に精通している人にとっては、入力を受け取り、その入力を何らかの方法で変換してから出力を生成するという点で、Select() に多少似ています。

デフォルトでは、TransformBlock は MaxDegreeOfParallelism を 1 に設定してデータを順次処理します。バッファリングされた入力を受信して​​処理するだけでなく、このブロックは処理されたすべての出力を取得し、それをバッファリングします (まだ処理されていないデータ処理された、および処理されたデータ)。

2 つのタスクがあります。1 つはデータを処理するタスクで、もう 1 つはデータを次のブロックにプッシュするタスクです。

並行パイプライン

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Stephen Toub による TPL データフローの紹介

# TransformManyBlock

(SelectMany、1-m:このマッピングの結果は、LINQ の SelectMany と同様に「フラット化」されます)

TransformManyBlock は、TransformBlock と非常によく似ています。
主な違いは、TransformBlock は各入力に対して 1 つの出力のみを生成するのに対し、TransformManyBlock は各入力に対して任意の数 (ゼロ以上) の出力を生成することです。 ActionBlock および TransformBlock と同様に、この処理は、同期処理と非同期処理の両方でデリゲートを使用して指定できます。

同期には Func が使用され、非同期には Func> が使用されます。 ActionBlock と TransformBlock の両方と同様に、TransformManyBlock は既定で順次処理に設定されますが、それ以外の構成にすることもできます。

マッピング デリゲートはアイテムのコレクションを返し、出力バッファーに個別に挿入されます。

非同期ウェブ クローラー

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Enumerable をその構成要素に展開する

var expanded = new TransformManyBlock<T[], T>(array => array);

1 要素から 0 要素または 1 要素へのフィルタリング

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Stephen Toub による TPL データフローの紹介

# バッチブロック

(一定数の連続したデータ項目をデータ項目のコレクションにグループ化します)

BatchBlock は、N 個の単一項目を、要素の配列として表される 1 つのバッチ項目に結合します。インスタンスは特定のバッチ サイズで作成され、ブロックはその数の要素を受け取るとすぐにバッチを作成し、バッチを出力バッファーに非同期的に出力します。

BatchBlock は貪欲モードと非貪欲モードの両方で実行できます。

  • デフォルトの貪欲モードでは、任意の数のソースからブロックに提供されたすべてのメッセージが受け入れられ、バッファリングされてバッチに変換されます。
    • 貪欲でないモードでは、バッチを作成するのに十分なソースがブロックにメッセージを提供するまで、すべてのメッセージがソースから延期されます。したがって、BatchBlock を使用して、N ソースのそれぞれから 1 つの要素、1 つのソースから N 要素、およびその間の無数のオプションを受け取ることができます。

    リクエストを 100 個のグループにまとめてデータベースに送信する

    var batchRequests = new BatchBlock<Request>(batchSize:100);
    var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
    
    batchRequests.LinkTo(sendToDb);
    
    

    1 秒に 1 回のバッチの作成

    var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
    new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
    
    

    Stephen Toub による TPL データフローの紹介