C# 関数型プログラミングの詳細 (14) 非同期関数

[C# シリーズ経由の LINQ]

[C# 関数型プログラミングの詳細シリーズ]

最新バージョン:https://weblogs.asp.net/dixin/functional-csharp-asynchronous-function

非同期関数は、アプリケーションとサービスの応答性とスケーラビリティを向上させることができます。 C# 5.0 では、async および await キーワードが導入され、非同期プログラミング モデルが大幅に簡素化されました。

Task、Task、非同期

C#/.NET 非同期プログラミング モデルでは、void を返す非同期操作を表すために System.Threading.Tasks.Task が提供され、TResult 値を返す非同期操作を表すために System.Threading.Tasks.Task が提供されます。

namespace System.Threading.Tasks
{
    public partial class Task : IAsyncResult
    {
        public Task(Action action); // () –> void

        public void Start();

        public void Wait();

        public TaskStatus Status { get; } // Created, WaitingForActivation, WaitingToRun, Running, WaitingForChildrenToComplete, RanToCompletion, Canceled, Faulted.

        public bool IsCanceled { get; }

        public bool IsCompleted { get; }

        public bool IsFaulted { get; }

        public AggregateException Exception { get; }

        Task ContinueWith(Action<Task> continuationAction);

        Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction);

        // Other members.
    }

    public partial class Task<TResult> : Task
    {
        public Task(Func<TResult> function); // () –> TResult

        public TResult Result { get; }

        public Task ContinueWith(Action<Task<TResult>> continuationAction);

        public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction);

        // Other members.
    }
}

Task と Task は () -> void 関数と () -> TResult 関数で構築でき、Start メソッドを呼び出すことで開始できます。タスクは非同期で実行され、現在のスレッドをブロックしません。そのステータスは、Status、IsCanceled、IsCompleted、IsFaulted プロパティによって照会できます。タスクは、その Wait メソッドを呼び出すことで待機できます。このメソッドは、タスクが正常に完了するか、失敗するか、キャンセルされるまで、現在のスレッドをブロックします。 Task の場合、基になる非同期操作が正常に完了すると、Result プロパティを通じて結果を取得できます。 Task または Task の場合、基になる非同期操作が例外で失敗します。例外は、Exception プロパティを通じて利用できます。 ContinueWith メソッドを呼び出すことにより、タスクを別の非同期継続操作と連鎖させることができます。タスクの実行が終了すると、指定された継続が非同期で実行を開始します。 ContinueWith メソッドが呼び出されたときにタスクの実行が既に終了している場合は、指定された継続がすぐに実行を開始します。次の例では、ファイルを読み取るタスクを構築して開始し、別の継続タスクをチェーンして別のファイルに内容を書き込みます:

internal static partial class Functions
{
    internal static void CreateTask(string readPath, string writePath)
    {
        Thread.CurrentThread.ManagedThreadId.WriteLine(); // 10
        Task<string> task = new Task<string>(() =>
        {
            Thread.CurrentThread.ManagedThreadId.WriteLine(); // 8
            return File.ReadAllText(readPath);
        });
        task.Start();
        Task continuationTask = task.ContinueWith(antecedentTask =>
        {
            Thread.CurrentThread.ManagedThreadId.WriteLine(); // 9
            object.ReferenceEquals(antecedentTask, task).WriteLine(); // True
            if (antecedentTask.IsFaulted)
            {
                antecedentTask.Exception.WriteLine();
            }
            else
            {
                File.WriteAllText(writePath, antecedentTask.Result);
            }
        });
        continuationTask.Wait();
    }
}

非同期操作として、タスクが開始されると、ラップされた関数はデフォルトで CLR/CoreCLR スレッド プールにスケジュールされて実行されるため、それらのスレッド ID は呼び出し元のスレッド ID とは異なります。

Task は、タスクを構築して自動的に開始する Run メソッドも提供します:

namespace System.Threading.Tasks
{
    public partial class Task : IAsyncResult
    {
        public static Task Run(Action action);

        public static Task<TResult> Run<TResult>(Func<TResult> function);
    }
}

次の関数を比較してください:

internal static void Write(string path, string contents) => File.WriteAllText(path, contents);

internal static string Read(string path) => File.ReadAllText(path);

internal static Task WriteAsync(string path, string contents) => 
    Task.Run(() => File.WriteAllText(path, contents));

internal static Task<string> ReadAsync(string path) => Task.Run(() => File.ReadAllText(path));

Write が呼び出されると、その実行によって現在のスレッドがブロックされます。書き込み操作が同期的に行われると、結果なしで戻り、呼び出し元スレッドは実行を継続できます。同様に、Read が呼び出されると、その実行によって現在のスレッドもブロックされます。読み取り操作が同期的に行われると、結果が返されるため、呼び出し元は結果を利用でき、呼び出し元は実行を継続できます。 WriteAsync が呼び出されると、Task.Run を呼び出して、書き込み操作で Task インスタンスを構築し、タスクを開始して、すぐにタスクを返します。その後、呼び出し元は、書き込み操作の実行によってブロックされることなく続行できます。デフォルトでは、書き込み操作はスレッド プールにスケジュールされます。書き込み操作が完了すると、書き込み操作は結果を返さず、タスクのステータスが更新されます。同様に、ReadAsync が呼び出されると、Task.Run も呼び出されて、読み取り操作で Task インスタンスが作成され、タスクが開始され、すぐにタスクが返されます。その後、呼び出し元は、読み取り操作の実行によってブロックされることなく続行できます。デフォルトでは、読み取り操作もスレッド プールにスケジュールされます。読み取り操作が完了すると、読み取り操作の結果が得られ、タスクのステータスが更新され、その結果が Result プロパティを通じて利用可能になります。

internal static void CallReadWrite(string path, string contents)
{
    Write(path, contents); // Blocking.
    // Sync operation is completed with no result.
    string result = Read(path); // Blocking.
    // Sync operation is completed with result available.

    Task writeTask = WriteAsync(path, contents); // Non blocking.
    // Async operation is scheduled to thread pool, and will be completed in the future with no result.
    Task<string> readTask = ReadAsync(path); // Non blocking.
    // Async operation is scheduled to thread pool, and will be completed in the future, then result will be available.
}

したがって、void を返す Write と結果を返す Read は同期関数です。 Task を返す WriteAsync と Task を返す ReadAsync は非同期関数であり、Task は将来の void として表示でき、Task は将来の TResult の結果として表示できます。ここで、操作をスレッド プールにオフロードするだけで、WriteAsync と ReadAsync が非同期になります。これはデモンストレーションを目的としたものであり、スケーラビリティの向上をもたらすものではありません。より良い実装については後で説明します。

名前付き非同期関数

既定では、名前付き非同期関数は Task または Task を返し、規則として名前に Async または AsyncTask 接尾辞が付きます。次の例は、同期関数呼び出しのファイル読み取りおよび書き込みワークフローです:

internal static void ReadWrite(string readPath, string writePath)
{
    string contents = Read(readPath);
    Write(writePath, contents);
}

関数の非同期バージョンを呼び出すことで、同じロジックを実装できます:

internal static async Task ReadWriteAsync(string readPath, string writePath)
{
    string contents = await ReadAsync(readPath);
    await WriteAsync(writePath, contents);
}

ここでは、各非同期関数呼び出しに await が使用され、コード構造は同期ワークフローと同じままです。関数本体で await キーワードが使用されている場合、その関数には async 修飾子が必要です。ワークフローが結果を返さないことに関して、非同期関数は Task (future void) を返します。この ReadWriteAsync 関数は非同期関数を呼び出します。それ自体も非同期関数です。これは、async 修飾子があり、Task を返すためです。 ReadWriteAsync が呼び出されると、ReadAsync および WriteAsync と同じように機能します。呼び出し元をブロックせず、スケジュールされた読み取りおよび書き込みワークフローを表すタスクをすぐに返します。

したがって、await キーワードは、タスクの基になる非同期操作が完了するのを仮想的に待機していると見なすことができます。タスクが失敗すると、例外がスローされます。タスクが正常に完了すると、await 式の直後の継続がコールバックされます。タスクに結果がある場合、await は結果を抽出できます。したがって、非同期ワークフローは、同期ワークフローと同じ外観を維持します。継続を構築するために必要な ContinueWith 呼び出しはありません。次の例は、同期関数呼び出しのより複雑なデータベース クエリ ワークフローであり、クエリ結果として int 値が返されます:

internal static int Query(DbConnection connection, StreamWriter logWriter)
{
    try
    {
        connection.Open(); // Return void.
        using (DbCommand command = connection.CreateCommand())
        {
            command.CommandText = "SELECT 1;";
            using (DbDataReader reader = command.ExecuteReader()) // Return DbDataReader.
            {
                if (reader.Read()) // Return bool.
                {
                    return (int)reader[0];
                }
                throw new InvalidOperationException("Failed to call sync functions.");
            }
        }
    }
    catch (SqlException exception)
    {
        logWriter.WriteLine(exception.ToString()); // Return void.
        throw new InvalidOperationException("Failed to call sync functions.", exception);
    }
}

ここで、DbConnection.Open、DbCommand.ExecuteReader、DbDataReader.Read、StreamWriter.WriteLine メソッドには、DbConnection.OpenAsync、DbCommand.ExecuteReaderAsync、DbDataReader.ReadAsync、StreamWriter.WriteLineAsync として提供される非同期バージョンがあります。これらは、Task または Task を返します。 async および await キーワードを使用すると、これらの非同期関数を簡単に呼び出すことができます:

internal static async Task<int> QueryAsync(DbConnection connection, StreamWriter logWriter)
{
    try
    {
        await connection.OpenAsync(); // Return Task.
        using (DbCommand command = connection.CreateCommand())
        {
            command.CommandText = "SELECT 1;";
            using (DbDataReader reader = await command.ExecuteReaderAsync()) // Return Task<DbDataReader>.
            {
                if (await reader.ReadAsync()) // Return Task<bool>.
                {
                    return (int)reader[0];
                }
                throw new InvalidOperationException("Failed to call async functions.");
            }
        }
    }
    catch (SqlException exception)
    {
        await logWriter.WriteLineAsync(exception.ToString()); // Return Task.
        throw new InvalidOperationException("Failed to call async functions.", exception);
    }
}

ここでも、非同期ワークフローは同期ワークフローと同じコード構造、try-catch を保持し、if ブロックの外観は同じです。この構文がないと、ContinueWith を呼び出して上記のワークフローを手動で構築するのは非常に複雑になります。 async 関数は int の結果を返しますが、戻り値の型は Task (future int) です。

上記の Write 関数と Read 関数は、File.WriteAllText と File.ReadAllText を呼び出して同期 I/O 操作を実行します。同期 I/O 操作は、StreamWriter.Write と StreamReader.ReadToEnd を呼び出すことによって内部的に実装されます。 async キーワードと await キーワードを使用すると、StreamWriter.WriteAsync と StreamReader.ReadToEndAsync を呼び出すことで、(基になるオペレーティング システムが非同期 I/O をサポートしている限り) WriteAsync と ReadAsync を実際の非同期 I/O として実装できます。

internal static async Task WriteAsync(string path, string contents)
{
    // File.WriteAllText:
    // using (StreamWriter writer = new StreamWriter(new FileStream(
    //    path: path, mode: FileMode.Create, access: FileAccess.Write,
    //    share: FileShare.Read, bufferSize: 4096, useAsync: false)))
    // {
    //    writer.Write(contents);
    // }
    using (StreamWriter writer = new StreamWriter(new FileStream(
        path: path, mode: FileMode.Create, access: FileAccess.Write,
        share: FileShare.Read, bufferSize: 4096, useAsync: true)))
    {
        await writer.WriteAsync(contents);
    }
}

internal static async Task<string> ReadAsync(string path)
{
    // File.ReadAllText:
    // using (StreamReader reader = new StreamReader(new FileStream(
    //    path: path, mode: FileMode.Open, access: FileAccess.Read, 
    //    share: FileShare.Read, bufferSize: 4096, useAsync: false)))
    // {
    //    return reader.ReadToEnd();
    // }
    using (StreamReader reader = new StreamReader(new FileStream(
        path: path, mode: FileMode.Open, access: FileAccess.Read, 
        share: FileShare.Read, bufferSize: 4096, useAsync: true)))
    {
        return await reader.ReadToEndAsync();
    }
}

非同期関数がタスクの代わりに void を返さなければならない特別なシナリオが 1 つあります – 非同期イベント ハンドラー。たとえば、ObservableCollection には CollectionChanged イベントがあります:

namespace System.Collections.ObjectModel
{
    public class ObservableCollection<T> : Collection<T>, INotifyCollectionChanged, INotifyPropertyChanged
    {
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        // Other members.
    }
}

namespace System.Collections.Specialized
{
    public delegate void NotifyCollectionChangedEventHandler(object sender, NotifyCollectionChangedEventArgs e);
}

このイベントでは、ハンドラーが型 (オブジェクト、NotifyCollectionChangedEventArgs) –> void の関数である必要があります。したがって、非同期関数を上記のイベントのハンドラーとして定義する場合、その非同期関数は Task ではなく void を返す必要があります:

internal static partial class Functions
{
    private static StringBuilder logs = new StringBuilder();

    private static StringWriter logWriter = new StringWriter(logs);

    private static async void CollectionChangedAsync(object sender, NotifyCollectionChangedEventArgs e) =>
        await logWriter.WriteLineAsync(e.Action.ToString());

    internal static void EventHandler()
    {
        ObservableCollection<int> collection = new ObservableCollection<int>();
        collection.CollectionChanged += CollectionChangedAsync;
        collection.Add(1); // Fires CollectionChanged event.
    }
}

非同期関数によって返されるタスクに加えて、await キーワードは任意の Task および Task インスタンスで機能します。

internal static async Task AwaitTasks(string path)
{
    // string contents = await ReadAsync(path);
    Task<string> task1 = ReadAsync(path);
    string contents = await task1;

    // await WriteAsync(path, contents);
    Task task2 = WriteAsync(path, contents);
    await task2;

    // await Task.Run(() => { });
    Task task3 = Task.Run(() => { });
    await task3;

    // int result = await Task.Run(() => 0);
    Task<int> task4 = Task.Run(() => 0);
    int result = await task4;

    // await Task.Delay(TimeSpan.FromSeconds(10));
    Task task5 = Task.Delay(TimeSpan.FromSeconds(10));
    await task5;

    // result = await Task.FromResult(result);
    Task<int> task6 = Task.FromResult(result);
    result = await task6;
}

タスクが開始されない場合、実行が終了することはありません。 await 式の後のコードはコールバックされません:

internal static async Task HotColdTasks(string path)
{
    Task hotTask = new Task(() => { });
    hotTask.Start();
    await hotTask;
    hotTask.Status.WriteLine();

    Task coldTask = new Task(() => { });
    await coldTask;
    coldTask.Status.WriteLine(); // Never executes.
}

まだ開始されていないタスクをコールドタスク、開始済みのタスクをホットタスクと呼びます。慣例として、タスクを返す関数は常にホット タスクを返す必要があります。すべての .NET API はこの規則に従います。

awaitable-awaiter パターン

C# は、awaitable-awaiter パターンを使用して await 式をコンパイルします。 Task と Task に加えて、await キーワードは、待機可能な任意の型で使用できます。 awaitable 型には、awaiter を返す GetAwaiter インスタンスまたは拡張メソッドがあります。 awaiter 型は System.Runtime.CompilerServices.INotifyCompletion インターフェイスを実装し、bool 値を返す IsCompleted プロパティと、void または結果値を返す GetResult インスタンス メソッドも備えています。次の IAwaitable および IAwaiter インターフェイスは、結果のない操作の awaitable-awaiter パターンを示しています:

public interface IAwaitable
{
    IAwaiter GetAwaiter();
}

public interface IAwaiter : INotifyCompletion
{
    bool IsCompleted { get; }

    void GetResult(); // No result.
}

また、次の IAwaitable および IAwaiter インターフェイスは、結果を伴う操作の awaitable-awaiter パターンを示しています。

public interface IAwaitable<TResult>
{
    IAwaiter<TResult> GetAwaiter();
}

public interface IAwaiter<TResult> : INotifyCompletion
{
    bool IsCompleted { get; }

    TResult GetResult(); // TResult result.
}

また、INotifyCompletion インターフェイスには、継続を連鎖させるための OnCompleted メソッドが 1 つあります。

namespace System.Runtime.CompilerServices
{
    public interface INotifyCompletion
    {
        void OnCompleted(Action continuation);
    }
}

Task と Task が awaitable-awaiter パターンを実装する方法を次に示します。タスクは仮想的に IAwaitable の実装と見なすことができます。これには、System.Runtime.CompilerServices.TaskAwaiter を返す GetAwaiter インスタンス メソッドがあり、仮想的に IAwaiter の実装と見なすことができます。同様に、Task は仮想的に IAwaitable の実装と見なすことができます。これには System.Runtime.CompilerServices.TaskAwaiter を返す GetAwaiter メソッドがあり、仮想的に IAwaiter の実装と見なすことができます。

namespace System.Threading.Tasks
{
    public partial class Task : IAsyncResult
    {
        public TaskAwaiter GetAwaiter();
    }

    public partial class Task<TResult> : Task
    {
        public TaskAwaiter<TResult> GetAwaiter();
    }
}

namespace System.Runtime.CompilerServices
{
    public struct TaskAwaiter : ICriticalNotifyCompletion, INotifyCompletion
    {
        public bool IsCompleted { get; }

        public void GetResult(); // No result.

        public void OnCompleted(Action continuation);

        // Other members.
    }

    public struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
    {
        public bool IsCompleted { get; }

        public TResult GetResult(); // TResult result.

        public void OnCompleted(Action continuation);

        // Other members.
    }
}

awaitable-awaiter パターンが実装されている限り、他の型を await キーワードと共に使用できます。 Action を例にとると、上記の TaskAwaiter を再利用することで、GetAwaiter メソッドを拡張メソッドとして簡単に実装できます。

public static partial class ActionExtensions
{
    public static TaskAwaiter GetAwaiter(this Action action) => Task.Run(action).GetAwaiter();
}

同様に、TaskAwaiter:

を再利用することで、このパターンを Func に実装できます。
public static partial class FuncExtensions
{
    public static TaskAwaiter<TResult> GetAwaiter<TResult>(this Func<TResult> function) =>
        Task.Run(function).GetAwaiter();
}

await キーワードを関数で直接使用できるようになりました:

internal static async Task AwaitFunctions(string readPath, string writePath)
{
    Func<string> read = () => File.ReadAllText(readPath);
    string contents = await read;

    Action write = () => File.WriteAllText(writePath, contents);
    await write;
}

非同期ステート マシン

前述のように、async および await キーワードを使用すると、非同期関数は非ブロックになります。コンパイル時に、非同期関数のワークフローが非同期ステート マシンにコンパイルされます。実行時にこの非同期関数が呼び出されると、コンパイラによって生成された非同期ステート マシンが開始され、非同期ステート マシンのワークフローを表すタスクがすぐに返されます。これを実証するために、次の非同期メソッドを定義します:

internal static async Task<T> Async<T>(T value)
{
    T value1 = Start(value);
    T result1 = await Async1(value1);
    T value2 = Continuation1(result1);
    T result2 = await Async2(value2);
    T value3 = Continuation2(result2);
    T result3 = await Async3(value3);
    T result = Continuation3(result3);
    return result;
}

internal static T Start<T>(T value) => value;

internal static Task<T> Async1<T>(T value) => Task.Run(() => value);

internal static T Continuation1<T>(T value) => value;

internal static Task<T> Async2<T>(T value) => Task.FromResult(value);

internal static T Continuation2<T>(T value) => value;

internal static Task<T> Async3<T>(T value) => Task.Run(() => value);

internal static T Continuation3<T>(T value) => value;

コンパイル後、async 修飾子はなくなりました。 async 関数は、async ステート マシンを開始するための通常の関数になります:

[AsyncStateMachine(typeof(AsyncStateMachine<>))]
internal static Task<T> CompiledAsync<T>(T value)
{
    AsyncStateMachine<T> asyncStateMachine = new AsyncStateMachine<T>()
    {
        Value = value,
        Builder = AsyncTaskMethodBuilder<T>.Create(),
        State = -1 // -1 means start.
    };
    asyncStateMachine.Builder.Start(ref asyncStateMachine);
    return asyncStateMachine.Builder.Task;
}

生成された非同期ステート マシンは、リリース ビルドでは構造体であり、デバッグ ビルドではクラスです:

[CompilerGenerated]
[StructLayout(LayoutKind.Auto)]
private struct AsyncStateMachine<TResult> : IAsyncStateMachine
{
    public int State;

    public AsyncTaskMethodBuilder<TResult> Builder;

    public TResult Value;

    private TaskAwaiter<TResult> awaiter;

    void IAsyncStateMachine.MoveNext()
    {
        TResult result;
        try
        {
            switch (this.State)
            {
                case -1: // Start code from the beginning to the 1st await.
                    // Workflow begins.
                    TResult value1 = Start(this.Value);
                    this.awaiter = Async1(value1).GetAwaiter();
                    if (this.awaiter.IsCompleted)
                    {
                        // If the task returned by Async1 is already completed, immediately execute the continuation.
                        goto case 0;
                    }
                    else
                    {
                        this.State = 0;
                        // If the task returned by Async1 is not completed, specify the continuation as its callback.
                        this.Builder.AwaitUnsafeOnCompleted(ref this.awaiter, ref this);
                        // Later when the task returned by Async1 is completed, it calls back MoveNext, where State is 0.
                        return;
                    }
                case 0: // Continuation code from after the 1st await to the 2nd await.
                    // The task returned by Async1 is completed. The result is available immediately through GetResult.
                    TResult result1 = this.awaiter.GetResult();
                    TResult value2 = Continuation1(result1);
                    this.awaiter = Async2(value2).GetAwaiter();
                    if (this.awaiter.IsCompleted)
                    {
                        // If the task returned by Async2 is already completed, immediately execute the continuation.
                        goto case 1;
                    }
                    else
                    {
                        this.State = 1;
                        // If the task returned by Async2 is not completed, specify the continuation as its callback.
                        this.Builder.AwaitUnsafeOnCompleted(ref this.awaiter, ref this);
                        // Later when the task returned by Async2 is completed, it calls back MoveNext, where State is 1.
                        return;
                    }
                case 1: // Continuation code from after the 2nd await to the 3rd await.
                    // The task returned by Async2 is completed. The result is available immediately through GetResult.
                    TResult result2 = this.awaiter.GetResult();
                    TResult value3 = Continuation2(result2);
                    this.awaiter = Async3(value3).GetAwaiter();
                    if (this.awaiter.IsCompleted)
                    {
                        // If the task returned by Async3 is already completed, immediately execute the continuation.
                        goto case 2;
                    }
                    else
                    {
                        this.State = 2;
                        // If the task returned by Async3 is not completed, specify the continuation as its callback.
                        this.Builder.AwaitUnsafeOnCompleted(ref this.awaiter, ref this);
                        // Later when the task returned by Async3 is completed, it calls back MoveNext, where State is 1.
                        return;
                    }
                case 2: // Continuation code from after the 3rd await to the end.
                    // The task returned by Async3 is completed. The result is available immediately through GetResult.
                    TResult result3 = this.awaiter.GetResult();
                    result = Continuation3(result3);
                    this.State = -2; // -2 means end.
                    this.Builder.SetResult(result);
                    // Workflow ends.
                    return;
            }
        }
        catch (Exception exception)
        {
            this.State = -2; // -2 means end.
            this.Builder.SetException(exception);
        }
    }

    [DebuggerHidden]
    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine asyncStateMachine) =>
        this.Builder.SetStateMachine(asyncStateMachine);
}

生成された非同期ステート マシンは有限ステート マシンです:

ワークフローはその MoveNext メソッドにコンパイルされ、ワー​​クフローは 3 つの await キーワードによって 4 つのブロックに分割されます。ワークフローのパラメーターはステート マシンのフィールドとしてコンパイルされるため、MoveNext 内のワークフローからアクセスできます。ステート マシンが初期化されると、その初期状態は –1 であり、これは開始を意味します。ステート マシンが開始されると、MoveNext が呼び出され、case –1 ブロックが実行されます。このブロックには、ワークフローの最初から最初の await 式までのコードが含まれ、GetAwaiter 呼び出しにコンパイルされます。 awaiter がすでに完了している場合は、継続をすぐに実行する必要があるため、次のケース 0 ブロックが実行されます。 awaiter が完了していない場合、継続 (次の状態が 0 の MoveNext 呼び出し) は、将来の完了時に awaiter のコールバックとして指定されます。どちらの場合も、case 0 ブロックのコードが実行されると、前の awaiter は既に完了しており、その結果は GetResult メソッドを通じてすぐに利用できます。ケース 2 の最後のブロックが実行されるまで、同じパターンで実行が続きます。

ランタイム コンテキスト キャプチャ

各 await 式では、待機中のタスクがまだ完了していない場合、完了時に継続がコールバックとしてスケジュールされます。その結果、最初の呼び出し元スレッドとは異なるスレッドによって継続が実行される可能性があります。デフォルトでは、初期スレッドのランタイム コンテキスト情報がキャプチャされ、継続を実行するために再利用されます。これを実証するために、上記の Action の awaitable-awaiter パターンをカスタム awaiter で再実装できます。

public static partial class ActionExtensions
{
    public static IAwaiter GetAwaiter(this Action action) => new ActionAwaiter(Task.Run(action));
}

public class ActionAwaiter : IAwaiter
{
    private readonly (SynchronizationContext, TaskScheduler, ExecutionContext) runtimeContext =
        RuntimeContext.Capture();

    private readonly Task task;

    public ActionAwaiter(Task task) => this.task = task;

    public bool IsCompleted => this.task.IsCompleted;

    public void GetResult() => this.task.Wait();

    public void OnCompleted(Action continuation) => this.task.ContinueWith(task =>
        this.runtimeContext.Execute(continuation));
}

awaiter が構築されると、現在のスレッドの System.Threading.SynchronizationContext、System.Threading.Tasks.TaskScheduler、System.Threading.ExecutionContext などのランタイム コンテキスト情報が取得されます。次に、OnCompleted で継続がコールバックされると、以前にキャプチャされたランタイム コンテキスト情報を使用して実行されます。カスタム awaiter は、同じパターンで Func に実装できます:

public static partial class FuncExtensions
{
    public static IAwaiter<TResult> GetAwaiter<TResult>(this Func<TResult> function) =>
        new FuncAwaiter<TResult>(Task.Run(function));
}

public class FuncAwaiter<TResult> : IAwaiter<TResult>
{
    private readonly (SynchronizationContext, TaskScheduler, ExecutionContext) runtimeContext =
        RuntimeContext.Capture();

    private readonly Task<TResult> task;

    public FuncAwaiter(Task<TResult> task) => this.task = task;

    public bool IsCompleted => this.task.IsCompleted;

    public TResult GetResult() => this.task.Result;

    public void OnCompleted(Action continuation) => this.task.ContinueWith(task =>
        this.runtimeContext.Execute(continuation));
}

以下は、ランタイム コンテキストのキャプチャと再開の基本的な実装です。

public static class RuntimeContext
{
    public static (SynchronizationContext, TaskScheduler, ExecutionContext) Capture() =>
        (SynchronizationContext.Current, TaskScheduler.Current, ExecutionContext.Capture());

    public static void Execute(
        this (SynchronizationContext, TaskScheduler, ExecutionContext) runtimeContext, Action continuation)
    {
        var (synchronizationContext, taskScheduler, executionContext) = runtimeContext;
        if (synchronizationContext != null && synchronizationContext.GetType() != typeof(SynchronizationContext))
        {
            if (synchronizationContext == SynchronizationContext.Current)
            {
                executionContext.Run(continuation);
            }
            else
            {
                executionContext.Run(() => synchronizationContext.Post(
                    d: state => continuation(), state: null));
            }
            return;
        }
        if (taskScheduler != null && taskScheduler != TaskScheduler.Default)
        {
            Task continuationTask = new Task(continuation);
            continuationTask.Start(taskScheduler);
            return;
        }
        executionContext.Run(continuation);
    }

    public static void Run(this ExecutionContext executionContext, Action continuation)
    {
        if (executionContext != null)
        {
            ExecutionContext.Run(
                executionContext: executionContext, 
                callback: executionContextState => continuation(), 
                state: null);
        }
        else
        {
            continuation();
        }
    }
}

継続が実行されると、最初に以前にキャプチャされた SynchronizationContext がチェックされます。特殊化された SynchronizationContext がキャプチャされ、それが現在の SynchronizationContext と異なる場合、キャプチャされた SynchronizationContext および ExecutionContext を使用して継続が実行されます。キャプチャされた特殊な SynchronizationContext がない場合、TaskScheduler がチェックされます。特殊化された TaskScheduler がキャプチャされると、継続をタスクとしてスケジュールするために使用されます。他のすべてのケースでは、継続はキャプチャされた ExecutionContext で実行されます。

Task と Task は、継続が以前にキャプチャされたランタイム コンテキストにマーシャリングされるかどうかを指定する ConfigureAwait メソッドを提供します。

namespace System.Threading.Tasks
{
    public partial class Task : IAsyncResult
    {
        public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext);
    }

    public partial class Task<TResult> : Task
    {
        public ConfiguredTaskAwaitable<TResult> ConfigureAwait(bool continueOnCapturedContext);
    }
}

ランタイム コンテキスト キャプチャのデモを行うには、バックグラウンド スレッドを開始して各タスクを実行するカスタム タスク スケジューラを定義します。

public class BackgroundThreadTaskScheduler : TaskScheduler
{
    protected override IEnumerable<Task> GetScheduledTasks() => throw new NotImplementedException();

    protected override void QueueTask(Task task) =>
        new Thread(() => this.TryExecuteTask(task)) { IsBackground = true }.Start();

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) =>
        this.TryExecuteTask(task);
}

次の非同期関数には 2 つの await 式があり、ConfigureAwait は異なるブール値で呼び出されます:

internal static async Task ConfigureRuntimeContextCapture(string readPath, string writePath)
{
    TaskScheduler taskScheduler1 = TaskScheduler.Current;
    string contents = await ReadAsync(readPath).ConfigureAwait(continueOnCapturedContext: true);
    // Equivalent to: await ReadAsync(readPath);

    // Continuation is executed with captured runtime context.
    TaskScheduler taskScheduler2 = TaskScheduler.Current;
    object.ReferenceEquals(taskScheduler1, taskScheduler2).WriteLine(); // True
    await WriteAsync(writePath, contents).ConfigureAwait(continueOnCapturedContext: false);

    // Continuation is executed without captured runtime context.
    TaskScheduler taskScheduler3 = TaskScheduler.Current;
    object.ReferenceEquals(taskScheduler1, taskScheduler3).WriteLine(); // False
}

タスク スケジューラ キャプチャのデモを行うには、カスタム タスク スケジューラを指定して上記の非同期関数を呼び出します。

internal static async Task CallConfigureContextCapture(string readPath, string writePath)
{
    Task<Task> task = new Task<Task>(() => ConfigureRuntimeContextCapture(readPath, writePath));
    task.Start(new BackgroundThreadTaskScheduler());
    await task.Unwrap(); // Equivalent to: await await task;
}

ここでは、非同期関数 ConfigureRuntimeContextCapture が Task を返すため、非同期関数で構築されたタスクは Task 型になります。 Task を通常の Task に変換する Unwrap 拡張メソッドが提供されています:

namespace System.Threading.Tasks
{
    public static class TaskExtensions
    {
        public static Task Unwrap(this Task<Task> task);

        public static Task<TResult> Unwrap<TResult>(this Task<Task<TResult>> task);
    }
}

非同期関数 ConfigureRuntimeContextCapture が実行されると、その初期タスク スケジューラは指定されたカスタム タスク スケジューラになります。最初の await 式では、ConfigureAwait が true で呼び出されるため、ランタイム コンテキスト情報がキャプチャされ、キャプチャされたランタイム コンテキスト情報を使用して継続が実行されます。これはデフォルトの動作であるため、true を指定して ConfigureAwait を呼び出すことは、ConfigureAwait をまったく呼び出さないことと同じです。その結果、最初の継続は同じカスタム タスク スケジューラで実行されます。 2 番目の await 式では、ConfigureAwait が false で呼び出されるため、実行時のコンテキスト情報は取得されません。その結果、2 番目の継続は既定のタスク スケジューラ (System.Threading.Tasks.ThreadPoolTask​​Scheduler) で実行されます。

ランタイム コンテキストのキャプチャは、SynchronizationContext によっても実証できます。 SynchronizationContext は、さまざまなアプリケーション モデルでさまざまな実装を持っています。例:

  • ASP.NET:System.Web.AspNetSynchronizationContext
  • WPF:System.Windows.Threading.DispatcherSynchronizationContext
  • WinForms:System.Windows.Forms.WindowsFormsSynchronizationContext
  • WinRT と Windows ユニバーサル:System.Threading.WinRTSynchronizationContext

例として、Windows ユニバーサル アプリケーションを取り上げます。 Visual Studio で、Windows ユニバーサル アプリケーションを作成し、その UI にボタンを追加します。

<Button x:Name="Button" Content="Button" HorizontalAlignment="Center" VerticalAlignment="Center" Click="ButtonClick" />

コード ビハインドで、Click イベント ハンドラーを非同期関数として実装します。

private async void ButtonClick(object sender, RoutedEventArgs e)
{
    SynchronizationContext synchronizationContext1 = SynchronizationContext.Current;
    ExecutionContext executionContext1 = ExecutionContext.Capture();
    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(continueOnCapturedContext: true);
    // Equivalent to: await Task.Delay(TimeSpan.FromSeconds(1));
            
    // Continuation is executed with captured runtime context.
    SynchronizationContext synchronizationContext2 = SynchronizationContext.Current;
    Debug.WriteLine(synchronizationContext1 == synchronizationContext2); // True
    this.Button.Background = new SolidColorBrush(Colors.Blue); // UI update works.
    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(continueOnCapturedContext: false);
            
    // Continuation is executed without captured runtime context.
    SynchronizationContext synchronizationContext3 = SynchronizationContext.Current;
    Debug.WriteLine(synchronizationContext1 == synchronizationContext3); // False
    this.Button.Background = new SolidColorBrush(Colors.Yellow); // UI update fails.
    // Exception: The application called an interface that was marshalled for a different thread.
}

WinRTSynchronizationContext は、UI スレッドでのみ使用できます。ボタンがクリックされると、UI スレッドは非同期関数 ButtonClick を実行するため、初期 SynchronizationContext は WinRTSynchronizationContext です。前の例と同様に、ConfigureAwait が true で呼び出されると、継続は以前にキャプチャされた WinRTSynchronizationContext で実行されるため、継続は UI を正常に更新できます。 ConfigureAwait が true で呼び出されると、WinRTSynchronizationContext で継続が実行されず、UI の更新に失敗して例外がスローされます。

一般化された非同期戻り値の型と非同期メソッド ビルダー

C# 7 以降、async 関数は、async メソッド ビルダーが指定されている限り、待機可能な型を返すようにサポートされています。たとえば、次の FuncAwaitable は awaitable 型で、上記の FuncAwater を awaiter として再利用します:

[AsyncMethodBuilder(typeof(AsyncFuncAwaitableMethodBuilder<>))]
public class FuncAwaitable<TResult> : IAwaitable<TResult>
{
    private readonly Func<TResult> function;

    public FuncAwaitable(Func<TResult> function) => this.function = function;

    public IAwaiter<TResult> GetAwaiter() => new FuncAwaiter<TResult>(Task.Run(this.function));
}

Func は上記の GetAwaiter 拡張メソッドで既に待機可能ですが、ここではそのようなラッパー型が実装されているため、[AsyncMethodBuilder] 属性を使用して非同期メソッド ビルダーを指定できます。非同期メソッド ビルダーは次のように定義されます:

public class AsyncFuncAwaitableMethodBuilder<TResult>
{
    private AsyncTaskMethodBuilder<TResult> taskMethodBuilder;

    private TResult result;

    private bool hasResult;

    private bool useBuilder;

    public static AsyncFuncAwaitableMethodBuilder<TResult> Create() =>
        new AsyncFuncAwaitableMethodBuilder<TResult>()
        {
            taskMethodBuilder = AsyncTaskMethodBuilder<TResult>.Create()
        };

    public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine =>
        this.taskMethodBuilder.Start(ref stateMachine);

    public void SetStateMachine(IAsyncStateMachine stateMachine) =>
        this.taskMethodBuilder.SetStateMachine(stateMachine);

    public void SetResult(TResult result)
    {
        if (this.useBuilder)
        {
            this.taskMethodBuilder.SetResult(result);
        }
        else
        {
            this.result = result;
            this.hasResult = true;
        }
    }

    public void SetException(Exception exception) => this.taskMethodBuilder.SetException(exception);

    public FuncAwaitable<TResult> Task
    {
        get
        {
            if (this.hasResult)
            {
                TResult result = this.result;
                return new FuncAwaitable<TResult>(() => result);
            }
            else
            {
                this.useBuilder = true;
                Task<TResult> task = this.taskMethodBuilder.Task;
                return new FuncAwaitable<TResult>(() => task.Result);
            }
        }
    }

    public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine
    {
        this.useBuilder = true;
        this.taskMethodBuilder.AwaitOnCompleted(ref awaiter, ref stateMachine);
    }

    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
    {
        this.useBuilder = true;
        this.taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
    }
}

FuncAwitable 型を非同期関数で返すことができるようになりました:

internal static async FuncAwaitable<T> ReturnFuncAwaitable<T>(T value)
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return value;
}

そのコンパイルは、タスクを返す非同期関数と同じパターンです。唯一の違いは、生成された非同期ステート マシンでは、ビルダー フィールドがタスクの AsyncTaskMethodBuilder ではなく、指定された AsyncFuncAwaitableMethodBuilder になることです。そして明らかに、この非同期関数は await 可能な型を返すため、await 式で呼び出すことができます:

internal static async Task CallReturnFuncAwaitable<T>(T value)
{
    T result = await ReturnFuncAwaitable(value);
}

ValueTask とパフォーマンス

一般化された非同期戻り型のサポートにより、Microsoft は System.Threading.Tasks.Extensions NuGet パッケージで System.Threading.Tasks.ValueTask awaitable 構造も提供します。

namespace System.Threading.Tasks
{
    [AsyncMethodBuilder(typeof(AsyncValueTaskMethodBuilder<>))]
    [StructLayout(LayoutKind.Auto)]
    public struct ValueTask<TResult> : IEquatable<ValueTask<TResult>>
    {
        public ValueTask(TResult result);

        public ValueTask(Task<TResult> task);

        public ValueTaskAwaiter<TResult> GetAwaiter();

        // Other members.
    }
}

その awaiter は System.Threading.Tasks.ValueTaskAwaiter であり、その非同期メソッド ビルダーは System.Runtime.CompilerServices.AsyncValueTaskMethodBuilder として指定され、同じパッケージで提供されます。値型として、ValueTask は参照型 Task よりも安価に割り当てられます。また、Func 操作のラッパーとしての Task とは異なり、ValueTask は、Func 操作または既に使用可能な TResult 結果のラッパーにすることができます。そのため、ValueTask を使用すると、非同期操作を待機する前に結果を取得できる非同期関数のパフォーマンスを向上させることができます。次の例では、指定された URI からデータをダウンロードします:

private static Dictionary<string, byte[]> cache = 
    new Dictionary<string, byte[]>(StringComparer.OrdinalIgnoreCase);

internal static async Task<byte[]> DownloadAsyncTask(string uri)
{
    if (cache.TryGetValue(uri, out byte[] cachedResult))
    {
        return cachedResult;
    }
    using (HttpClient httpClient = new HttpClient())
    {
        byte[] result = await httpClient.GetByteArrayAsync(uri);
        cache.Add(uri, result);
        return result;
    }
}

最初にキャッシュをチェックし、指定された URI のデータが既にキャッシュされている場合は、非同期操作を実行せずにキャッシュされたデータを返します。ただし、コンパイル時には関数に async 修飾子があるため、ワークフロー全体が非同期ステート マシンになります。実行時に、タスクは常にマネージ ヒープに割り当てられ、ガベージ コレクションが必要です。結果がキャッシュで利用可能で、非同期操作が必要ない場合でも、非同期ステート マシンが常に実行されます。 ValueTask を使用すると、これを簡単に最適化できます:

internal static ValueTask<byte[]> DownloadAsyncValueTask(string uri)
{
    return cache.TryGetValue(uri, out byte[] cachedResult)
        ? new ValueTask<byte[]>(cachedResult)
        : new ValueTask<byte[]>(DownloadAsync());

    async Task<byte[]> DownloadAsync()
    {
        using (HttpClient httpClient = new HttpClient())
        {
            byte[] result = await httpClient.GetByteArrayAsync(uri);
            cache.Add(uri, result);
            return result;
        }
    }
}

これで、この関数は、待機可能な ValueTask を返す同期関数になります。結果がキャッシュで利用できる場合、非同期操作や非同期ステート マシンは関与せず、マネージド ヒープに割り当てられるタスクもありません。非同期操作は、非同期ステート マシンにコンパイルされる非同期ローカル関数にカプセル化され、結果がキャッシュで利用できない場合にのみ関与します。その結果、特にキャッシュが頻繁にヒットする場合に、パフォーマンスを向上させることができます。実際には、パフォーマンスをベンチマークして、使用するパターンを決定してください。

匿名非同期関数

async および await キーワードは、ラムダ式で使用できます:

internal static async Task AsyncLambda(string readPath, string writePath)
{
    Func<string, Task<string>> readAsync = async (path) =>
    {
        using (StreamReader reader = new StreamReader(new FileStream(
            path: path, mode: FileMode.Open, access: FileAccess.Read,
            share: FileShare.Read, bufferSize: 4096, useAsync: true)))
        {
            return await reader.ReadToEndAsync();
        }
    };
    Func<string, string, Task> writeAsync = async (path, contents) =>
    {
        using (StreamWriter writer = new StreamWriter(new FileStream(
            path: path, mode: FileMode.Create, access: FileAccess.Write,
            share: FileShare.Read, bufferSize: 4096, useAsync: true)))
        {
            await writer.WriteAsync(contents);
        }
    };

    string result = await readAsync(readPath);
    await writeAsync(writePath, result); 
}

ここで、これら 2 つの非同期ラムダ式は、通常の同期ラムダ式と同じパターンで、表示クラス メソッドとしてコンパイルされます。

タスクは任意の型を返す無名関数で構築できるため、タスクを返す非同期無名関数でも構築できます:

internal static async Task AsyncAnonymous(string readPath, string writePath)
{
    Task<Task<string>> task1 = new Task<Task<string>>(async () => await ReadAsync(readPath));
    task1.Start(); // Cold task needs to be started.
    string contents = await task1.Unwrap(); // Equivalent to: string contents = await await task1;

    Task<Task> task2 = new Task<Task>(async () => await WriteAsync(writePath, null));
    task2.Start(); // Cold task needs to be started.
    await task2.Unwrap(); // Equivalent to: await await task2;
}

最初のタスクは () –> Task 型の非同期匿名関数で構築されるため、構築されたタスクは Task> 型になります。同様に、2 番目のタスクは型 () –> Task の非同期匿名関数で構築されるため、構築されたタスクは型 Task になります。前述のように、ネストされたタスクはアンラップして待機できます。このシナリオでは、Task.Run のオーバーロードが提供され、非同期関数を受け入れ、ネストされたタスクを自動的にアンラップします:

namespace System.Threading.Tasks
{
    public partial class Task : IAsyncResult
    {
        public static Task Run(Func<Task> function);

        public static Task<TResult> Run<TResult>(Func<Task<TResult>> function);
    }
}

上記の例は次のように簡略化できます:

internal static async Task RunAsync(string readPath, string writePath)
{
    Task<string> task1 = Task.Run(async () => await ReadAsync(readPath)); // Automatically unwrapped.
    string contents = await task1; // Task.Run returns hot task..

    Task task2 = Task.Run(async () => await WriteAsync(writePath, contents)); // Automatically unwrapped.
    await task2; // Task.Run returns hot task.
}