C++ のユニバーサル非同期抽象化

Executors (P0443R11 は最新のイテレーションの 1 つ) は、C++23 への最も基本的なライブラリの追加となる準備ができています。

しかし、それは何ですか?

小さなチップであろうと、数千の CPU と GPU を備えたスーパーコンピューターであろうと、その上に非同期、同時実行、および並列コードを構築できる最も基本的な構成要素を見つけることが何よりもまず求められます。

これは簡単な作業ではなく、多くの専門家や多くの企業が何年も忙しくしてきました.

これは、特に次のことを可能にする重要な作業です:

  • 並列アルゴリズム
  • ネットワーキング
  • 非同期 I/O
  • オーディオ
  • ウィンドウ システム
  • コルーチン
  • 非同期範囲
  • 異種コンピューティング

うまく構成し、多くのアルゴリズムを共有すること。

少なくとも、それが目標です.If iterator s はアルゴリズムの操作の基礎です (その範囲とビューは抽象化されています)。非同期プログラミングの操作の基礎は何ですか?

この時点で、誰かの特徴を誤解しないように最善を尽くし、できる限り技術的に正確であるように努めているにもかかわらず、遺言執行者は委員会に非常に長い歴史を持っており、私はこの全体に最近興味を持ったばかりであることを指摘しておく必要があります - 私はドメインの専門家ではなく、チャイナ ショップの雄牛です .

そうは言っても、私は P0443R11 に正確に焦点を当てるつもりはありませんが、まだ論文にはなっていない、もう少し洗練されたものに焦点を当てます.

その作業の結果は、ほとんどの場合、いくつかの概念、いくつかのカスタマイズ ポイント、およびそれらすべてを構成するためのいくつかのアルゴリズムになります。

まず最初に。

実行コンテキスト

実行コンテキストは、コードの一部を実行するコンテキストを表します (参照してください。シンプルです)。たとえば、スレッド プール、イベント ループ、グランド セントラル ディスパッチ、GPU、ベクトル化ユニットなどです (ただし、それが図にどのように適合するかはまだわかりません)または現在のスレッドでさえ - その場合、インライン実行コンテキストについて話します .

[注:インライン実行コンテキストがあるため、実行コンテキストで何らかの作業を実行しても、体系的に非同期を意味するわけではありません。 ]

レシーバー

レシーバーは、実行コンテキストで実行したいコードを表します。最も簡単に言えば、関数です。しかし、エグゼキューターの提案の設計の重要なポイントは、エラー処理とエラー管理を体系的に提供することです。そのため、必要な 3 つの関数があります。

template <typename R, typename Error, typename... Value>
concept receiver = requires(R &r Error... e, Value&&...v) {
 set_value(r, v...); // happy path
 set_error(r, e); // error
 set_done(r); // cancelation
};

これは悪いブログ コードです - 実際には receiver receiver に分割されます そして receiver_of set_value のオーバーロードを許可する .

標準はおそらく invocable をラップする受信機を提供します 、エラー時にスローされ、キャンセル時には何もしません。

したがって、次のように書くこともできます:

fugazzi_async(execution_context, as_receiver([] {
 fmt::print("Hello from an executor");
}));

fugazzi_async までに、問題が発生しました。 返された場合、ラムダはどこかでキューに入れられている可能性があります。または、すでに実行されている可能性もあります。

これを解決する従来の方法は、future と promise を使用し、ref カウントされた共有状態を共有し、heapallocated することです。

しかし、控えめに言っても、それは最適ではありません。

送信者とスケジューラ

代わりにできることは、実行コンテキストにスロットを予約するように依頼することです。 Eric Niebler はそれを怠惰な未来と呼んでいます。コンセプトに選ばれた名前は sender です .sender 結果をレシーバーに送信するためです。

素晴らしいですが、どうすれば sender を取得できますか? ?

thread_pool.give_me_a_sender() を実行するだけです 、しかし、一般性のために、そしておそらくスレッドプールをすべての人に公開したくないので、間接的なレベルを追加します(常に機能します)。また、場合によっては、実行コンテキストが決して具体化されない可能性があると思います型システムで。

scheduler 実行コンテキストへの軽量ハンドルを表します。その唯一の目的は sender を作成することです

execution context -> scheduler -> sender .

thread_pool tp(std::thread::hardware_concurrency());
auto scheduler = tp.scheduler();
auto sender = scheduler.schedule();

sender を作成する方法は他にもあります たとえば、auto async_read(device, buffer) -> sender リアクター キューで読み取り操作をキューに入れることができる送信者を作成する関数にすることができます。

タイマーは送信者などにもなります

送信者ができたら、何らかの作業を行うことを考え始めることができます。注意を払っていれば、これまでのところほとんど作業を行っていません。

正当な理由があります。オペレーションを怠惰に準備することで、アルゴリズムを使用してオペレーションを構成できます。先を越さないようにしましょう。

今のところ、必要なのは sender を取る関数だけです そして receiver そして実行のために送信者を送信します。

submit(my_sender, my_receiver);

実行コンテキスト my_sender の たとえば、スレッドプールです。レシーバーはキューに入れられ、そのスレッドプールのスレッドで実行されます (my_receiver.set_value() を呼び出すことにより) ).

そして、いくつかの作業が最終的に完了します。 P0443R11 によると、そうです。

しかし、落とし穴があります。

説明のためにスレッドプールを書きましょう

class my_first_thread_pool {
 std::vector<std::any_receiver<void>> work;

 void enqueue(receiver auto r) {
 std::any_receiver<void> oh_no{std::move(r)}; // 😰
 work.emplace_back(oh_no); // 😱
 /* ... */
 }

 /* ... */
};

スレッドプールを実装するには enqueue 関数 (submit(sender, receiver) によって呼び出されます) 、型消去のコストを支払う必要があります (おそらく 割り当てを意味します)、および別の割り当て (少なくとも) 型消去されたレシーバーをヒープに配置して、それらのコンテナーを保持できるようにします)。

これは多くの人にとっては問題ありませんが、理想的ではなく、契約を破ることになる人もいます。

より良い方法はありますか?はい。しかし、この時点で、P0443R11 とは異なります。以下の内容については、私が発明したものではないことに注意してください。これらのアイデアについては、将来論文が発表される予定です。

もう 1 レベルの間接化によってすべてを改善できるので、そうしましょう。

オペレーション

submit(sender, receiver) の代わりに 作業をすぐに送信する場合、送信者と受信者を取り、両方の集計を返す関数を作成できますが、他には何もしません。その関数を connect と呼びましょう :

thread_pool tp(std::thread::hardware_concurrency());
auto scheduler = tp.scheduler();
auto sender = scheduler.schedule();

auto op = connect(std::move(sender), as_receiver([] {
 return 42;
}));

戻りオブジェクト op operation を満たす 繰り返しますが、これは後で実行コンテキストでエンキューできるワークロードを準備するだけです。

しかし、追加の間接化の良い点は、単一のオブジェクトをキューに入れ、それを待つ必要がある場合、それをスタックに割り当てることができるということです。

いくつかのコードで説明する方が良いかもしれません:

struct schedule_operation_base {
 virtual void set_value() = 0;
};

template <sender S, receiver R>
struct schedule_operation : schedule_operation_base {
 //...
 void set_value() override {
 my_receiver.set_value();
 }
private:
 R my_receiver;
};

sender type には独自の特定の操作タイプがあります。

template <receiver MyReceiverType>
auto(thread_pool::sender, MyReceiverType my_receiver)
 -> thread_pool::schedule_operation<thread_pool::sender, MyReceiverType>;

これにより、実行コンテキスト自体ではなく、各操作に実行コンテキストの関連付けられた状態を格納できます。したがって、スレッド プールに作業のコンテナーを保持する代わりに、キューに入れられたすべての操作の侵入型リンク リストを作成できます。

[注:これは operation を意味します コピーも移動もできません]

これは視覚化するのが難しいため、通常のスレッド プールがその状態 (キューに入れられた作業) を追跡する方法を視覚化したものを次に示します。

operation とは 機械ができること:

理想的なケースでは、ヒープ割り当てがなく、エンキュー/デキュー作業がいくつかのポインターを設定しています。これは、スレッド プールの作業キューを保護するミューテックスが非常に短時間保持されることを意味します。

もちろん、一度に多くの操作をキューに入れる必要がある場合や、操作が完了するのを待ちたくない場合もあります。

このような場合、ヒープを割り当てる (またはカスタム アロケータで割り当てる) 追加の関数が必要になります。ヒープ割り当て関数は spawn と呼ばれます。 .

void spawn(sender, receiver);

spawn operation を破棄する別のレシーバーでレシーバーをラップします レシーバーの 1 つのメソッドが呼び出されるたびに。

これにより、所有モデルがかなり単純になります。非同期コードの偉業。

spawn また、システム全体でアロケーターを処理する必要がある唯一のポイントでもあります (アロケーター自体は、構成された操作のために送信者に転送される必要がある可能性があります)。

少し要約すると、全体の図は次のとおりです。

すべてが送信者です

スレッドプールには通常 wait があります メソッド。

しかし、送信側/受信側モデルでは、代わりにスレッド プールが送信側を返すメソッドを提供でき、一般的な待機アルゴリズムによって、スレッド プールが空のときに呼び出される受信側にその送信側をアタッチできます。

thread_pool p;
//...
wait(p.depleted());

以下を含む多くのそのようなアルゴリズムを提供できます:

  • when_all
  • when_any
  • then / sequence
  • transform の非同期バージョン

これらのアルゴリズムは、たとえば、より良い未来を描くために使用できます。しかし、私はまだそのすべてで遊んでいません.

すべてを支配するコルーチン。

未完成の非同期機能が標準で提供されている場合、私が重要だと思う側面の 1 つは、それらがファーストクラスのコルーチンをサポートしていることです。コルーチンは、99% の人々が非同期コードを記述する方法であるべきです。足を吹き飛ばすという大幅な変化。

しかし同時に、いくつかのユースケースでは、コルーチンは適切ではありません.

そのため、コルーチンを使用したくないときに料金を支払う必要がなく、トリビアなコルーチンを使用できる方法が必要です。

複雑に見えますか?

実際には、単一の関数を定義する必要があります:

template <sender S>
auto operator co_await(S&& sender);

これで すべて sender awaitable つまり、co_await で使用できます。 式。完全なプロトタイプの実装は、約 100 loc です。悪くない。

そして今、これを書くことができます:

#include <https://gist.githubusercontent.com/cor3ntin/14b9d30e07d48f5cdd13413c4fd96398/raw/f56dff4a94e053a41a16b66542e2322401f7fdbe/corio.hpp> 名前空間 cor3ntin::corio を使用;

oneway_task task_with_coro(execution::scheduler auto s) {
 co_await s.schedule();
 printf("Hello"); //runs in thread pool
}

void task_with_spawn(execution::scheduler auto s) {
 auto sender = s.schedule();
 execution::spawn(std::move(sender), as_receiver([]{
 printf("Hello");
 }));
}

int main() {
 static_thread_pool p(std::thread::hardware_concurrency());
 task_with_coro(p.scheduler());
 task_with_spawn(p.scheduler());
 wait(p.depleted());
}

かなりの魔法です!1

実際、コルーチンの送信者/受信者とプロミス/継続の間にほぼ 1/1 のマッピングがあるため、これが可能です。

`submit` を `connect`/`start` に分解することで、より柔軟な所有権のセマンティクスが得られ、設計が概念的にコルーチンと一致し、コルーチンが送信者/送信者を表現する効率的な方法になります。

— 🇺🇦エリック ニーブラー 🇺🇦 #BLM (@ericniebler) 2019 年 10 月 22 日

15 年前、ハーブ・サッターは無料のランチは終わったと宣言しましたが、適切なプリミティブのセットがあれば、ケーキを食べて食べることもできるかもしれません.

カスタマイズ ポイント

私が言及したほとんどすべての機能はカスタマイズ ポイントです。つまり、特定の送信者または受信者に特化できることを意味します。

  • set_value(receiver)
  • set_done(receiver)
  • set_error(receiver)
  • schedule(scheduler)
  • connect(sender, receiver)
  • spawn(sender, receiver)
  • start(operation)

まだ言及していない最後の CPO は bool is_blocking(sender) です これは、送信者が現在の (インライン) 実行コンセプトで受信者を呼び出すかどうかを照会します。

これがなければ、前進しないプログラムを書くのは非常に簡単です。

カスタマイズは tag_invoke に基づいています タイプが消去されたオブジェクトが CPO 呼び出しを転送できるようにするカスタマイズ ポイント オブジェクト メカニズム。

非常に優れたアイデアですが、これは言語の問題に対するライブラリ ソリューションを提供しようとしていると思わずにはいられません。

一括実行とプロパティ

P0443R11 はまた、一括実行とエグゼキュータの動作を調整するための多数のクエリ可能なプロパティを提供します…これらは今のところ私があまり慣れていない領域であり、この記事は長くなっています。お楽しみに。

また、エグゼキューターと io_uring をどのように活用できるかについても、今後検討したいと考えています。 、Grand Central Dispatch、および Windows スレッド プール。

書けるようになりたいから

co_await socket.write("Hello");
co_await socket.read(buffer);

executor という単語が再び表示されます。 .

エグゼキュータ

P0761 説明

エグゼキュータは、特定の実行コンテキストに関連付けられたオブジェクトです。呼び出し可能な関数オブジェクトから実行エージェントを作成するための 1 つ以上の実行関数を提供します。 […] Executor 自体は、私たちの設計の主な関心事です。

しかし、レシーバーは関数よりも基本的なビルディング ブロックであるため、実行関数を簡単に実装できます。

void execute(execution_context ctx, invocable auto&& f) {
 auto sender = ctx.scheduler().schedule();
 spawn(std::move(sender), as_receiver(std::forward<decltype(f)>(f)));
}

したがって、Executor 提案の最も重要でない部分は Executor である可能性があります。

そしてそれが意味することは…

operation 操作の基本です。

謝辞

ルイス・ベイカー、エリック・ニーブラー、カーク・シュープ、デビッド・ホールマンが辛抱強く彼らの仕事を説明してくれたことに感謝します.

この記事の例をコンパイルできるツールを提供してくれた Saar Raz と Matt Godbolt に感謝します。

リソースとリファレンス

CppCon 2019:Eric Niebler、David Hollman「A Unifying Abstraction for Async in C++」

C++Now 2019:David Hollman “The Ongoing Saga of ISO-C++ Executors”

論文

P1897 - C++23 エグゼキュータに向けて:アルゴリズムの初期セット - Lee Howes

P1895 - tag_invoke:カスタマイズ可能な機能をサポートするための一般的なパターン - ルイス・ベイカー、エリック・ニーブラー、カーク・シュープ

P1341 - 標準ライブラリでの非同期 API の統合 - ルイス ベイカー

P1436 - アフィニティベースの実行のための Executor プロパティ - Gordon Brown、Ruyman Reyes、Michael Wong、H. Carter Edwards、Thomas Rodgers、Mark Hoemmen

P1660 - A Compromise Executor デザイン スケッチ (Jared Hoberock、Michael Garland、Bryce Adelstein Lelbach、Michał Dominiak、Eric Niebler、Kirk Soop、Lewis Baker、Lee Howes、David S. Hollman、Gordon Brown 作

P0443 - C++ の統合エグゼキュータの提案 - Jared Hoberock、Michael Garland、Chris Kohlhoff、Chris Mysen、Carter Edwards、Gordon Brown、David Hollman、Lee Howes、Kirk Soop、Eric Niebler

実装

Pushmi - 送信者/受信者モデルの以前の反復の Facebook/folly の実装。

Corio - 私が最近始めた非常に不完全で未熟なプロジェクト - 何かを理解する最善の方法は、それを実装することです. <オール>

  • Compiler Explorer はマルチスレッド コードの実行をサポートしていませんが、開発中です。ありがとう、マット! ↩︎