C++20:cppcoro による強力なコルーチン

前回の投稿「C++20:cppcoro を使用したコルーチン」で、Lewis Baker によるコルーチン ライブラリの基本的な紹介を行いました。この紹介では、基本的なコルーチン タスクとジェネレーターについて説明しました。今日、私はスレッドをタスクに追加し、強力な抽象化を実現しています。

前回の記事「C++20:コルーチンによるスレッド同期」を覚えていますか? 「いいえ」の場合、条件変数の課題を提示しました。条件変数は、センダー/レシーバーまたはプロデューサー/コンシューマー ワークフローなどのスレッド同期の従来の方法です。条件変数には大きな設計上の欠陥があり、通知なしで呼び出されたり (偽のウェイクアップ)、通知を傍受したり (ウェイクアップの喪失) する可能性があります。どちらの場合も、デッドロックが発生する可能性があります。コルーチンに基づくスレッド同期に関する次の例には、偽のウェイクアップやウェイクアップの喪失などの条件変数の固有のリスクはありませんでした。しかし、この例には別の問題がありました。複雑すぎました。

cppcoro のおかげで、両方の長所を活用できます。つまり、条件変数の設計上の欠陥がない単純なイベント メカニズムです。

single_consumer_event

single_consumer_event は、ドキュメントによると、一度に待機する単一のコルーチンのみをサポートする単純な手動リセット イベント タイプです。これはまさに、私が必要としているものです:

// cppcoroProducerConsumer.cpp

#include <cppcoro/single_consumer_event.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>

#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>

cppcoro::single_consumer_event event; 

cppcoro::task<> consumer() {
 
 auto start = std::chrono::high_resolution_clock::now();
 
 co_await event; // suspended until some thread calls event.set()
 
 auto end = std::chrono::high_resolution_clock::now();
 std::chrono::duration<double> elapsed = end - start;
 std::cout << "Consumer waited " << elapsed.count() << " seconds." << std::endl;
 
 co_return;
}

void producer() {

 using namespace std::chrono_literals;
 std::this_thread::sleep_for(2s);
 
 event.set(); // resumes the consumer 
 
}

int main() {
 
 std::cout << std::endl;
 
 auto con = std::async([]{ cppcoro::sync_wait(consumer()); }); // (1)
 auto prod = std::async(producer); // (2)
 
 con.get(), prod.get();
 
 std::cout << std::endl;
 
}

コードは自明である必要があります。コンシューマー (1 行目) とプロデューサー (2 行目) はそれぞれのスレッドで実行されます。呼び出し cppcoro::sync_wait(consumer()) (1 行目) は、メイン関数をコルーチンにすることができないため、最上位のタスクとして機能します。呼び出しは、コルーチン コンシューマが完了するまで待機します。コルーチン コンシューマーは、誰かが event.set() を呼び出すまで、call co_await イベントで待機します。関数プロデューサーは、2 秒間のスリープ後にこのイベントを送信します。

cppcoro ライブラリのおかげで、スレッドをキャンセルできます。

キャンセル

呼び出し元と呼び出し先は、cppcoro::cancellation_token で通信します。キャンセルのリクエストを受け取った関数の呼び出し先は、2 つの方法で応答できます。

<オール>
  • キャンセルのリクエストを定期的にポーリングします。 cppcoro::cancellation_token は、この目的のために is_cancellation_requested() と throw_if_cancellation_requested() の 2 つのメンバー関数をサポートしています。
  • キャンセル リクエストの場合に実行されるコールバックを登録します。
  • 次の例は、最初の使用例を示しています。

    // cppcoroCancellation.cpp
    
    #include <chrono>
    #include <iostream>
    #include <future>
    
    #include <cppcoro/cancellation_token.hpp>
    #include <cppcoro/cancellation_source.hpp>
    
    using namespace std::chrono_literals; 
    
    int main() {
     
     std::cout << std::endl;
     
     cppcoro::cancellation_source canSource;
     cppcoro::cancellation_token canToken = canSource.token(); // (1)
    
     auto cancelSender = std::async([&canSource] { 
     std::this_thread::sleep_for(2s);
     canSource.request_cancellation(); // (3)
     std::cout << "canSource.request_cancellation() " << std::endl;
     });
     
     auto cancelReceiver = std::async([&canToken] { 
     while(true) {
     std::cout << "Wait for cancellation request" << std::endl;
     std::this_thread::sleep_for(200ms);
     if (canToken.is_cancellation_requested()) return; // (2)
     }
     });
    
     cancelSender.get(), cancelReceiver.get();
     
     std::cout << std::endl;
    
    }
    

    行 (1) は、 cancel_source によって作成された cancel_token を示しています。呼び出し元 cancelSender はキャンセル ソース canSource を取得し、呼び出し先 cancelReceiver はキャンセル トークンを取得します。呼び出し先は、2 秒後に呼び出し側が呼び出し call.request_cancellation() (行 3) を介して送信するキャンセル要求 (行 2) を永続的にポーリングします。

    2 つの興味深い観察を行いたいと思います。

    <オール>
  • キャンセルは協力的です。呼び出し先がそのキャンセル要求を無視した場合、何も起こりません。
  • C++20 では、改良された std::thread:std::jthread が得られます。 std::jthread はデストラクタに自動的に参加し、割り込みトークンを介して中断できます。改善された std::thread の詳細については、以前の記事「C++20 を使用した新しいスレッド:std::jthread」を参照してください。
  • cppcoro はミューテックスもサポートしています。

    async_mutex

    cppcoro::async_mutex などのミューテックスは、複数のスレッドによる同時アクセスから共有データを保護するための同期メカニズムです。

    // cppcoroMutex.cpp
    
    #include <cppcoro/async_mutex.hpp>
    #include <cppcoro/sync_wait.hpp>
    #include <cppcoro/task.hpp>
    
    #include <iostream>
    #include <thread>
    #include <vector>
    
    
    cppcoro::async_mutex mutex;
    
    int sum{}; // (2)
    
    cppcoro::task<> addToSum(int num) {
     cppcoro::async_mutex_lock lockSum = co_await mutex.scoped_lock_async(); // (3) 
     sum += num;
     
    } // (4)
    
    int main() {
     
     std::cout << std::endl;
     
     std::vector<std::thread> vec(10); // (1)
     
     for(auto& thr: vec) {
     thr = std::thread([]{ for(int n = 0; n < 10; ++n) cppcoro::sync_wait(addToSum(n)); } );
     }
     
     for(auto& thr: vec) thr.join();
     
     std::cout << "sum: " << sum << std::endl;
     
     std::cout << std::endl;
     
    }
    

    行 (1) は 10 個のスレッドを作成します。各スレッドは、共有合計に 0 から 9 の数字を追加します (2 行目)。関数 addToSum はコルーチンです。コルーチンは、ミューテックスが取得されるまで式 co_await mutex.scoped_lock_async() (3 行目) で待機します。ミューテックスを待機するコルーチンはブロックされませんが、中断されます。前のロック保持者は、そのロック解除呼び出しで待機中のコルーチンを再開します。その名前が示すように、mutex はスコープの終わり (4 行目) までロックされたままになります。

    次は?

    関数 cppcoro::when_all のおかげで、1 つだけでなく、より多くのコルーチンを待機できます。次の投稿では、cppcoro::when_all と cppcoro::static_thread_pool を使用して、強力なワークフローを作成します。