返回

IO限制的异步操作

Windows如何执行I/O操作

上图是连接了几个硬件设备的计算机系统.

  1. 程序通过构造FileStream对象来打开磁盘文件

    调用Read方法从文件中读取数据,将你的线程从托管代码转变为本机/用户模式代码Read内部调用Win32ReadFile函数

  2. ReadFile分配一个小的数据结构,称为I/O请求包(I/O Request Packet,IRP)

    IRP结构初始化后包含的内容有: 文件句柄,文件中的偏移量(从这个位置开始读取字符),一个Byte[]数组的地址(数组用读取的字节来填充),要传输的字节数以及其他常规性内容.

  3. ReadFile将你的线程从本机/用户模式代码转变成本机/内核模式代码, 并传递IRP数据结构,从而调用Window内核

  4. 根据IRP中的设备句柄,Windows内核知道I/O操作要传送给哪个硬件设备,因此Windows将IRP传送给对应的设备驱动程序的IRP队列

  5. 每个设备都维护自己的IRP队列, 其中包含了机器上运行的所有进程发出的I/O请求

    IRP数据包到达时, 设备驱动程序将IRP信息传给物理硬件设备, 硬件设备执行请求的I/O操作

  6. 注意一个重要问题, 在硬件设备执行I/O操作期间,发出了I/O请求的线程将无事可做, 所以Windows将线程变为睡眠状态,防止它浪费CPU时间

    虽然线程不浪费时间,但其仍然浪费空间(内存),因为它的用户模式栈,内核模式栈,线程环境块,其他数据结构都还在内存中,完全没有谁去访问这些东西。

  7. 最终,硬件设备完成I/O操作,然后Windows会唤醒你的线程,把它调度给一个CPU,使它从内核模式返回用户模式, 再返回至托管代码⑥⑦⑧

    FileStream对象Read方法现在返回一个Int32,指明从文件中读取的实际字节数, 使你知道在传给Read的Byte[]中, 实际能检索到多少个字节.

上面的步骤看起来很不错,但是依旧存在两个问题:

  • 请求的数量越来越多,创建的线程就越来越多,那么被阻塞的线程就会越来越多,这样会更浪费内存。
  • 用执行结果来响应请求,如果请求的数量非常多,那么解锁的阻塞线程也就很多,而且机器上的线程数都会远远大于CPU数,所以在阻塞线程被集中解锁期间CPU很有可能会频繁地发生上下文切换,损害性能。

上图展示Windows如何异步读取I/O操作,仍然使用FileStream来构建对象,但是需要传递FileOptions.Asynchronous标志,告诉Windows希望文件的读/写以异步的方式进行, 上图删除了除硬盘外的硬件设备, 引入了CLR的线程池, 稍微修改了代码. 传递了FileOptions.Asynchronous标志.

  1. 现在调用ReadAsync而不是Read从文件中读取数据.ReadAsync. 在ReadAsync内部分配一个Task<Int32>对象来代表用于完成的读取操作的代码。然后ReadAsync调用Win32 ReadFile函数
  2. ReadFile分配IRP数据包
  3. 然后将其传递给Windows内核
  4. Windows内核把IRP数据包添加到IRP队列中
  5. 此时线程不会再阻塞,而是可以直接运行返回至你的代码。所以线程能够立即从ReadAsync调用中返回. ⑤⑥⑦

当然, 此时IRP可能尚未处理好, 所以不能够在ReadAsync之后的代码中访问传递的Byte[]中的字节.

那么, 什么时候以及用什么方式处理最终读取的数据呢? 在调用ReadAsync后返回一个Task<Int32>对象,可以在该对象上调用ContinueWith来登记任务完成时执行的回调方法,然后在回调方法中处理数据。

  1. 当硬件设备处理好IRP后(步骤a)。

  2. 硬件设备会把IRP放到CLR的线程池中队列中(步骤b)。

  3. 将来某个时候,一个线程池会提取完成的IRP并执行任务的代码

    • 最终要么设置异常(如果发生异常),

    • 要么返回结果(步骤c)。

在知道这些之后,就知道使用异步I/O可以尽量的减少同步I/O访问存在的那些问题。

异步方式执行I/O的优势:

  • 将资源利用率降到最低
  • 减少上下文切换
  • 增快垃圾回收运行速度
  • 增强调试性能
  • 减少操作时间
  • 不会堵塞线程

C#的异步函数

private static async Task<String> IssueClientRequestAsync(String serverName, String message)
{
    using (var pipe = new NamedPipeClientStream(serverName, "PipeName", PipeDirection.InOut,
        PipeOptions.Asynchronous | PipeOptions.WriteThrough))
    {
        pipe.Connect(); // 必须在设置ReadMode之前连接
        // 将传入的消息转换成一个Byte[]
        pipe.ReadMode = PipeTransmissionMode.Message;

        // 将数据异步发送给服务器
        Byte[] request = Encoding.UTF8.GetBytes(message);
        // WriteAsync内部分配一个Task对象, 返回给此方法
        // 此时await操作符实际会在Task对象上调用ContinueWith
        // 向它传递用于恢复状态机的方法
        // 然后线程从IssueClientRequestAsync返回至调用处
        await pipe.WriteAsync(request, 0, request.Length);

        // 异步读取服务器的响应
        Byte[] response  = new Byte[1000];
        Int32  bytesRead = await pipe.ReadAsync(response, 0, response.Length);
        return Encoding.UTF8.GetString(response, 0, bytesRead);
    } // 关闭管道
}

一旦将方法标记为async, 编译器就会将方法的代码转换成实现了状态机的一个类型。 这就允许线程执行状态机中的一些代码并返回, 方法不需要一直执行到结束。

线程调用该方法时发生事情如下:

  1. 线程会构造一个NamedPipeClientStream

  2. 调用Connect,设置它的ReadMode属性

  3. 将传入的消息转换成一个Byte[],然后调用WriteAsync

  4. WriteAsync内部分配一个Task对象, 返回给此方法

    此时await操作符实际会在Task对象上调用ContinueWith,向它传递用于恢复状态机的方法

  5. 然后线程IssueClientRequestAsync返回至调用处。


  6. 将来某个时候,设备驱动程序会结束向管道的写入

    一个线程池线程会通知Task对象, 后来激活ContinueWith回调方法, 造成一个线程恢复状态机。

    具体地说, 一个线程会重新进入IssueClientRequestAsync方法, 但这次是从await操作符的位置开始。

  7. 方法现在执行编译器生成的,用于查询Task对象状态的代码。

    如果操作成功完成,await操作符会返回结果, 如果操作失败,会设置代表错误的一个异常。

    在本例中,WriteAsync返回一个Task而不是Task<TResult>,所以无返回值。

  8. 现在方法继续执行, 分配一个Byte[]并调用NamedPipeClientStream异步ReadAsync方法。方法内部创建一个Task<Int32>对象并返回它。

  9. 同样的, await操作符实际会在Task<Int32>对象上调用ContinueWith向它传递用于恢复状态机的方法

  10. 然后线程IssueClientRequestAsync返回至调用处。

  11. 将来某个时候, 服务器向客户机发送一个响应, 网络设备驱动程序获得这个响应, 一个线程池线程通知Task<Int32>对象, 恢复状态机。

  12. await操作符造成编译器生成代码来查询Task对象Result属性(含有一个Int32)并将结果赋给局部变量bytesRead; 如果操作失败则抛出异常。

  13. 然后执行IssueClientRequestAsync剩余的代码, 返回结果字符串并关闭管道.

由于异步函数在状态机执行完毕之前返回, 所有在IssueClientRequestAsync执行它的第一个await操作符之后, 调用IssueClientRequestAsync的方法会继续执行。

但是调用者如何知道IssueClientRequestAsync已经执行完毕它的状态机呢? 一旦将方法标记了async,编译器会自动生成代码, 在状态机开始执行时创建一个Task对象该Task对象在状态机执行完毕时自动完成.。

IssueClientRequestAsync方法靠近尾部的地方, 我返回了一个字符串, 这造成编译器生成的代码完成它创建的Task<String>对象, 把对象的Result属性设为返回的字符串。

异步函数存在以下限制:

  1. 不能将应用程序的Main方法转变成异步函数。构造器,属性访问器方法事件访问器方法也不能转变成异步函数.
  2. 异步函数不能使用任何outref参数
  3. 不能在catch,finally,unsafe块中使用await操作符
  4. 不能在await操作符前获得一个支持线程所有权或递归的锁,并在await之后释放它. 这是因为await操作符之前的代码由一个线程执行, 之后的代码则可能由另一个线程执行. 在lock语句中使用await会报错, 如果显式调用Monitor的Enter和Exit方法,虽然能编译,但是运行时会抛出一个SynchronzizationLockException
  5. 在查询表达式中, await操作符只能在初始from子句的第一个集合表达式中使用,或者join子句的集合表达式中使用.

以上限制会在你违反时,编译器会提醒你。

不要让线程等待一个线程同步构造从而造成线程的阻塞。相反可以等待await从SemaphoreSlim的WaitAsync方法或者我自己的OneManyLock的AcquireAsync方法所返回的任务,从而避免线程被阻塞。

编译器如何将异步函数转换成状态机

private sealed class Type1 {}
private sealed class Type2 {}
private static Task<Type1> Method1Async()
{
    // 以异步方式执行一些操作, 最后返回一个Type1对象
    return Task.Run(() =>
    {
        /*Task.Yield(); */
        return new Type1();
    });
}
private static Task<Type2> Method2Async()
{
    // 以异步方式执行一些操作, 最后返回一个Type2对象
    return Task.Run(() =>
    {
        /*Task.Yield(); */
        return new Type2();
    });
}
// 通过异步函数来使用这些简单的类型和方法
private static async Task<String> MyMethodAsync(Int32 argument)
{
    Int32 local = argument;
    try
    {
        Type1 result1 = await Method1Async();
        for (Int32 x = 0; x < 3; x++)
        {
            Type2 result2 = await Method2Async();
        }
    }
    catch (Exception)
    {
        Console.WriteLine("Catch");
    }
    finally
    {
        Console.WriteLine("Finally");
    }
    return "Done";
}

编译上述代码,对IL代码进行逆向工程以转回C#源代码。下面展示的是编译器转换后的精华代码。

// AsyncStateMachine 特性指出这是一个异步方法(对反射的工具有用)
// 类型指出实现状态机的是哪个结构
[DebuggerStepThrough, AsyncStateMachine(typeof(StateMachine))]
private static Task<String> MyMethodAsync_ActualImplementation(Int32 argument)
{
    // 创建状态机实例并初始化它
    StateMachine stateMachine = new StateMachine()
    {
        // 创建builder,这个存根方法返回Task<String>
        // Statemachine(状态机)实例访问builder来设置Task 完成/异常
        m_builder = AsyncTaskMethodBuilder<String>.Create(),

        m_state    = -1,      // 初始化状态机
        m_argument = argument // 将实参拷贝到状态机字段
    };

    // 开始执行状态机
    stateMachine.m_builder.Start(ref stateMachine);
    // 返回状态机的Task
    return stateMachine.m_builder.Task;
}


// 这是状态机结构
[CompilerGenerated, StructLayout(LayoutKind.Auto)]
private struct StateMachine : IAsyncStateMachine
{
    // 代表状态机builder(Task)及其位置的字段
    public AsyncTaskMethodBuilder<String> m_builder;
    public Int32                          m_state;

    // 实参和局部变量现在成了字段
    public Int32 m_argument, m_local, m_x;
    public Type1 m_resultType1;
    public Type2 m_resultType2;

    // 每个awaiter类型一个字段
    // 任何时候这些字段只有一个是重要的, 那个字段引用最近执行的,以异步方式完成的await
    private TaskAwaiter<Type1> m_awaiterType1;
    private TaskAwaiter<Type2> m_awaiterType2;

    // 状态机方法本身
    void IAsyncStateMachine.MoveNext()
    {
        // Task的结果值
        String result = null;

        // 编译器插入try块来确保状态机的任务完成
        try
        {
            // 先假定逻辑上离开try块
            Boolean executeFinally = true;
            if (m_state == -1)
            {
                // 如果第一次在状态机方法中
                // 原始方法就从头开始执行
                m_local = m_argument;  // 异步方法里的第一句代码
            }

            // 原始代码中的try块
            try
            {
                TaskAwaiter<Type1> awaiterType1;
                TaskAwaiter<Type2> awaiterType2;

                switch (m_state)
                {
                    case -1: // 开始执行try块中的代码
                        // 调用'Method1Async'并获得它的awaiter
                        awaiterType1 = Method1Async().GetAwaiter();
                        if (!awaiterType1.IsCompleted)
                        {
                            // 'Method1Async'要以异步方式完成
                            m_state        = 0;
                            //保存 awaiter 以便将来返回
                            m_awaiterType1 = awaiterType1;

                            // 告诉awaiter在操作完成时调用MoveNext
                            // 引用状态机的MoveNext方法
                            m_builder.AwaitUnsafeOnCompleted(ref awaiterType1, ref this);
                            // 上述代码调用awaiterType1的OnCompleted, 它会在被等待的任务上
                            // 调用ContinueWith(t => MoveNext())
                            // 任务完成后, ContinueWith任务调用MovedNext

                            executeFinally = false; // 逻辑上不离开try块
                            return;                 // 线程返回至调用者
                        }

                        // 'Method1Async' 以同步方式完成了
                        break;

                    case 0: // 'Method1Async' 以异步方式完成了
                        awaiterType1 = m_awaiterType1; // 恢复最新的awaiter
                        break;

                    case 1: // 'Method2Async' 以异步方式完成了
                        awaiterType2 = m_awaiterType2; // 恢复最新的awaiter
                        goto ForLoopEpilog;
                }

                // 在第一个await后, 我们捕捉结果并启动for循环
                m_resultType1 = awaiterType1.GetResult(); // 获取awaiter的结果

                // for循环开场
                ForLoopPrologue:
                m_x = 0;          // for循环初始化
                goto ForLoopBody; // 跳到for循环主体ForLoopBody

                // for循环尾声
                ForLoopEpilog:
                m_resultType2 = awaiterType2.GetResult();
                m_x++; // 每次循环迭代都递增
                // 直通到for循环主体

                // for循环主体
                ForLoopBody:
                if (m_x < 3)
                {
                    // for循环测试
                    // 调用Method2Async并获取它的awaiter
                    awaiterType2 = Method2Async().GetAwaiter();
                    if (!awaiterType2.IsCompleted)
                    {
                        m_state        = 1;  // 'Method2Async' 以异步方式完成
                        m_awaiterType2 = awaiterType2; // 保存awaiter以便将来返回

                        // 告诉awaiter在操作完成时调用MoveNext
                        m_builder.AwaitUnsafeOnCompleted(ref awaiterType2, ref this);
                        executeFinally = false; // 逻辑上不离开try块
                        return; // 线程返回至调用者
                    }

                    // 'Method2Async' 以同步方式完成了
                    goto ForLoopEpilog; // 以同步方式完成就再次循环
                }
            }
            catch (Exception)
            {
                Console.WriteLine("Catch");
            }
            finally
            {
                // 只要线程物理上离开try就会执行finally
                // 我们希望在线程逻辑上离开try时才执行这些代码
                if (executeFinally)
                {
                    Console.WriteLine("Finally");
                }
            }
            // 这是最终从异步函数返回的东西
            result = "Done";
        }
        catch (Exception exception)
        {
            // 从未处理的异常: 通过设置异常来完成状态机的Task
            m_builder.SetException(exception);
            return;
        }

        // 无异常,通过返回结果来完成状态机的Task
        m_builder.SetResult(result);
    }

    [DebuggerHidden]
    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0)
    {
        m_builder.SetStateMachine(param0);
    }
}

如何将被等待的对象状态机粘合起来?

任何时候需要使用await操作符,编译器都会获取操作数, 并尝试在它上面调用GetAwaiter方法. 这可能是实例方法或者扩展方法.

调用GetAwaiter方法所返回的对象称为awaiter(等待者), 正是它将被等待的对象状态机粘合起来.

状态机获得awaiter后, 会查询其IsCompleted属性.

  • 如果操作已经以同步方式完成了, 属性将返回true, 而作为一项优化措施, 状态机将继续执行并调用awaiterGetResult方法. 该方法要么抛出异常, 要么返回结果. 状态机继续执行以处理结果.

  • 如果操作以异步方式完成, IsCompleted属性将返回false. 状态机调用awaiterOnCompleted方法, 并向它传递一个委托(引用状态机的MoveNext方法).

    现在状态机允许它的线程回到原地以执行其他代码. 将来某个时候, 封装了底层任务的awaiter会在完成时调用委托以执行MoveNext. 可根据状态机中的字段知道如何到达代码中的正确位置. 使方法能从它当初离开的位置继续. 这时, 代码调用awaiterGetResult方法, 执行将从这里继续, 以便对结果进行处理.

这就是异步函数的工作原理.

异步函数的扩展性

在扩展性方面, 能用Task对象包装一个将来完成的操作, 就可以用await操作符来等待该操作. 用一个类型Task来表示各种异步操作. 可以实现组合操作(WhenAllWhenAny)和其他有用的操作. 之后会演示用Task包装一个CancellationToken,在等待异步操作的同事利用超时和取消功能.

分享一个TaskLogger类, 用它显示尚未完成的异步操作. 这在调试时特别有用, 尤其是当应用程序因为错误的请求或者未响应的服务器而挂起的时候.

public static class TaskLogger
    {
        public static async Task Go()
        {
#if DEBUG
            // 使用TaskLogger会影响内存和性能, 所以只在调试生成中启用它
            TaskLogger.LogLevel = TaskLogger.TaskLogLevel.Pending;
#endif

            // 初始化3个任务, 为了测试TaskLogger,我们显式控制其持续时间
            var tasks = new List<Task>
            {
                Task.Delay(2000).Log("2s op"),
                Task.Delay(5000).Log("5s op"),
                Task.Delay(6000).Log("6s op")
            };

            try
            {
                // 等待全部任务, 但在3秒后取消; 只有一个任务能按时完成
                // 注意: WithCancellation扩展方法将在本章之后进行讲述
                await Task.WhenAll(tasks).WithCancellation(new CancellationTokenSource(3000).Token);
            }
            catch (OperationCanceledException)
            {
            }

            // 查询logger哪些任务尚未完成, 按照从等待时间最长到最短的顺序排序
            foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tle.LogTime))
                Console.WriteLine(op);
        }

        public enum TaskLogLevel
        {
            None,
            Pending
        }

        public static TaskLogLevel LogLevel { get; set; }

        public sealed class TaskLogEntry
        {
            public Task     Task             { get; internal set; }
            public String   Tag              { get; internal set; }
            public DateTime LogTime          { get; internal set; }
            public String   CallerMemberName { get; internal set; }
            public String   CallerFilePath   { get; internal set; }
            public Int32    CallerLineNumber { get; internal set; }

            public override string ToString()
            {
                return String.Format("LogTime={0}, Tag={1}, Member={2}, File={3}({4})",
                    LogTime, Tag ?? "(none)", CallerMemberName, CallerFilePath, CallerLineNumber);
            }
        }

        private static readonly ConcurrentDictionary<Task, TaskLogEntry> s_log =
            new ConcurrentDictionary<Task, TaskLogEntry>();

        public static IEnumerable<TaskLogEntry> GetLogEntries()
        {
            return s_log.Values;
        }

        public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null,
            [CallerMemberName]                        String        callerMemberName = null,
            [CallerFilePath]                          String        callerFilePath   = null,
            [CallerLineNumber]                        Int32         callerLineNumber = -1)
        {
            return (Task<TResult>) Log((Task) task, tag, callerMemberName, callerFilePath, callerLineNumber);
        }

        public static Task Log(this Task   task, String tag = null,
            [CallerMemberName]      String callerMemberName = null,
            [CallerFilePath]        String callerFilePath   = null,
            [CallerLineNumber]      Int32  callerLineNumber = -1)
        {
            if (LogLevel == TaskLogLevel.None) return task;
            var logEntry = new TaskLogEntry
            {
                Task             = task,
                LogTime          = DateTime.Now,
                Tag              = tag,
                CallerMemberName = callerMemberName,
                CallerFilePath   = callerFilePath,
                CallerLineNumber = callerLineNumber
            };
            s_log[task] = logEntry;
            task.ContinueWith(t =>
                {
                    TaskLogEntry entry;
                    s_log.TryRemove(t, out entry);
                },
                TaskContinuationOptions.ExecuteSynchronously);
            return task;
        }
    }

// 输出以下结果:
// LogTime=2019/9/27 13:30:46, Tag=6s op, Member=Go, File=D:\TD_ET\ConsoleApp1\ConsoleApp1\Program.cs(41)
// LogTime=2019/9/27 13:30:46, Tag=5s op, Member=Go, File=D:\TD_ET\ConsoleApp1\ConsoleApp1\Program.cs(40)

Callation类,用于取消正在执行的异步操作:

static class Cancellation
{
    public struct Void { }

    public static async Task WithCancellation(this Task originalTask, CancellationToken ct)
    {
        //创建在Cancellation被取消时完成的一个Task
        var cancelTask = new TaskCompletionSource<Void>();

        using (ct.Register(t => ((TaskCompletionSource<Void>)t).TrySetResult(new Void()), cancelTask)) {

            //创建在原始Task或CancellationToken Task完成时都完成的一个Task
            Task any = await Task.WhenAny(originalTask,cancelTask.Task);

            //任务Task因为CancellationToken而完成,就抛出OperationCanceledException
            if (any == cancelTask.Task)
                ct.ThrowIfCancellationRequested();
        };

        //等待原始任务;若任务失败,它将抛出一个异常
        await originalTask;
    }
}

除了增强使用Task时的灵活性, 异步函数另一个对扩展性有力的地方在于编译器可以在await的任何操作数上调用GetAwaiter. 所以操作数不一定是Task对象. 可以是任意类型, 只要提供一个可以调用一个可以调用的GetAwaiter方法. 下例展示我自己的awaiter,在异步方法的状态机和被引发的事件之间, 它扮演粘合剂的角色.

internal static class EventAwaiterDemo
{
    // 演示这一切是如何工作的
    public static void Go()
    {
        ShowExceptions();

        for (Int32 x = 0; x < 3; x++)
        {
            try
            {
                switch (x)
                {
                    case 0: throw new InvalidOperationException();
                    case 1: throw new ObjectDisposedException("");
                    case 2: throw new ArgumentOutOfRangeException();
                }
            }
            catch
            {
            }
        }
    }

    // 以下方法使用我的EventAwaiter类在事件发生时候从await操作符返回,
    // 在本例中, 一旦AppDomain中的任何线程抛出异常, 状态机就会继续
    private static async void ShowExceptions()
    {
        var eventAwaiter = new EventAwaiter<FirstChanceExceptionEventArgs>();
        AppDomain.CurrentDomain.FirstChanceException += eventAwaiter.EventRaised;
        while (true)
        {
            Console.WriteLine("AppDomain exception: {0}",
                (await eventAwaiter).Exception.GetType());
        }
    }

    public sealed class EventAwaiter<TEventArgs> : INotifyCompletion
    {
        private ConcurrentQueue<TEventArgs> m_events = new ConcurrentQueue<TEventArgs>();
        private Action                      m_continuation;

        #region 状态机调用的成员

        // 状态机先调用这个来获得awaiter:我们自己返回自己
        public EventAwaiter<TEventArgs> GetAwaiter()
        {
            return this;
        }

        // 告诉状态机是否发生了任何事件
        public Boolean IsCompleted
        {
            get { return m_events.Count > 0; }
        }

        // 状态机告诉我们最后要调用什么方法, 我们把它保存起来
        public void OnCompleted(Action continuation)
        {
            Volatile.Write(ref m_continuation, continuation);
        }

        // 状态机查询结果; 这是await操作符的结果
        public TEventArgs GetResult()
        {
            TEventArgs e;
            m_events.TryDequeue(out e);
            return e;
        }

        #endregion

        //如果引发了事件, 多个线程可能同时调用
        public void EventRaised(Object sender, TEventArgs eventArgs)
        {
            m_events.Enqueue(eventArgs); // 保存EventArgs以便从GetResult/await返回

            // 如果有一个等待进行的延续任务, 该线程会运行它
            Action continuation = Interlocked.Exchange(ref m_continuation, null);
            if (continuation != null) continuation(); // 恢复状态机
        }
    }
}

笔者自定义的EventAwaiter<TEventArgs>提供了GetAwaiter()isCompleted()onCompleted(Action continuation)GetResult()几个重要的方法,其实这几个方法恰好对应了第3.1中“异步函数如何转化为状态机”中状态机需要操作的各个方法,在3.1中笔者给出一张状态机执行的流程图

笔者接下来结合这个案例,说一说本例的流程:

  1. 当执行到await eventAwaiter时,会去调用eventAwaiterGetAwaiter()方法,然后得到Awaiter对象。
  2. 查询Awaiter对象和IsCompleted()方法,判断当前Awaiter是否发生了事件。
  3. Awaiter还没有发生事件,就调用OnCompleted(Action)方法,并且传递一个Action委托给OnCompleted()方法,其中的Action委托里就包含了恢复状态机的逻辑。
  4. 此时还没有线程执行恢复状态机的代码,await eventWaiter 的线程将会被阻塞。
  5. 当结合本例的程序逻辑,当出现异常时EventRaised会被调用,然后在EventRaised中会恢复状态机,唤醒await eventWaiter阻塞的线程。
  6. 状态机然后会再次调用IsCompleted方法判断是否有事件,这时m_events 已经有一个事件了,所以IsCompleted会返回true
  7. 状态机接着调用GetResult,并且将结果值赋值给await关键字的表达式。

异步函数和事件处理程序

异步函数的返回类型一般是TaskTask<TResult>, 它们代表函数的状态机完成. 但异步函数是可以返回void的. 实现异步事件处理程序时, C#编译器允许你利用这个特殊情况简化编码. 几乎所有事件处理程序都遵循以下方法签名:

void EventHandlerCallback(Object sender, EventArgs e);

要在返回void的事件处理方法中写这样的代码, C#编译器就要允许异步函数返回void,这样才能利用await操作符执行不阻塞的I/O操作. 编译器仍然为返回void的异步函数创建状态机, 但不再创建Task对象, 因为创建了也没法使用. 所以没办法知道返回void的异步函数的状态机在什么时候运行完毕.

正是这个原因, 不能将入口方法Main标记为async修饰符. 进程的主线会在遇到第一个await之后立即从Main返回, 由于调用Main的代码无法获得一个可以进行监视并等待完成的Task, 所以进程终止, 因此编译器认为这是一个错误.

FCL的异步函数

规范要求为方法名附加Async后缀.

其他基于异步的编程模型已经过时, 使用Task的新模型才是你的首要选择.

异步函数和异常处理

Windows设备驱动程序处理异步I/O请求时可能出错, Windows需要向应用程序通知这个情况.

设备驱动程序会向CLR的线程池post已完成的IRP. 一个线程池线程会完成Task对象并设置异常. 你的状态机方法恢复时, await操作符发现操作失败并引发该异常.

Task对象通常抛出一个AggregateExcetpiton可查询该异常的InnerExceptions属性来查看正在发生了什么异常. 但是, 将await用于Task时, 抛出的是第一个内部异常而不是AggregateExcetpiton. 是TaskAwaiterGetResult方法抛出的第一个内部异常.

如果状态机出现未处理的异常, 那么代表异步函数的Task对象会因为未处理的异常而完成. 然后正在等待该Task的代码会看到异常, 但异步函数也可能使用了void返回类型, 这时调用者就没有办法发现未处理异常. 所以当返回void的异步函数抛出未处理的异常时,编译器生成的代码将捕捉它, 并使用调用者的同步上下文重新抛出它. 重新抛出这种异常通常造成整个进程终止.

异步函数的其他功能(调试功能)

如果调试器在await操作符上停止,

  • F10(逐过程)

    会在异步操作完成后,在抵达下一个语句时重新由调试器接管.

    在这个时候,执行代码可能已经不是当初发起异步操作的线程

  • F11(逐语句)

    进入异步函数后, 可以按跳出(Shift+F11)函数并返回至调用者

    但是必须在位于异步函数的起始大括号的时候执行这个操作, 一旦越过大括号, 除非异步函数执行完成,否则跳出操作无法中断异步函数.

  • 要在状态机运行完毕前对调用方法进行调试,在调用方法中插入断点并运行至断点(F5) 即可.

可以利用Task静态Run方法非调用线程的其他线程中执行异步函数.

// Task.Run在GUI线程上调用
Task.Run(async ()=>
{
  // 这里的代码在一个线程池线程上运行

  // 发起异步操作
  await XxxAsync();
  // 在这里执行更多处理...
});

上述代码演示了C#的异步lambda表达式. 可以看出, 在lambda表达式前面添加async,编译器就能将lambda表达式转换成状态机方法来返回一个TaskTask<TResult>的任何Func委托变量.

如果在异步函数(async)中没有使用await操作符, C#编译器会显示警告, 为了去掉警告, 可以为异步方法返回的Task赋给一个变量. 然后忽略这个变量. 还可以定义一个扩展方法.

// 会发出警告
InnerAsyncFunc();// 此处没有添加await操作符, 会发出警告

// 不会发出警告, 需要定义一个忽略的变量
var noWarning = InnerAsyncFunc();//为异步方法返回的Task赋给一个变量. 然后忽略这个变量.
// 后续方法也会继续执行


// 更好的方法
[MethodImpl(MethodImplOptions.AggressiveInlining)] // 造成编译器优化调用
public static void NoWarning(this Task task) { }
// 这样使用
private static async Task OuterAsyncFunction()
{
    // 这里故意不添加await操作符,
    InnerAsyncFunction().NoWarning();

    // 在InnerAsyncFunction执行期间, 这里代码也继续执行
    await Task.Delay(0);
}

异步I/O操作最好的一个地方是可以同时发起许多这样的操作, 让它们并发执行, 从而显著提升应用程序的性能.

// 这会等待所有的request都完成后,才能处理responses
// 这样处理并不好
List<Task<string>> requsts = new List<Task<string>>(1000);
String[] responses = await Task.WhenAll(requests);


// 使用以下方法
// 使用while去判断是否有任务完成,
while(request.Count > 0)
{
   // 顺序处理每个完成的响应, 完成一个处理一个
   Task<String> response = await Task.WhenAny(requests);
   requests.Remove(response);// 从集合中删除已完成的任务
   // 处理响应
}

WhenAll方法在内部创建了一个Task<String[]>对象,他在列表中的所有Task对象都完成之后才完成。

如果希望收到一个响应就处理一个,而不是在完成全部后再处理,那么使用Taks的静态WhenAny方法可以轻松的实现。

应用程序及其线程处理模型

.Net Framework支持几种不同的应用程序模型, 而每种模型都可能引入它自己的线程处理模型.

GUI应用程序(windows窗体,WPF,Silverlight,WindowStore应用程序) 引入了一个线程处理模型.在这个模型中, UI元素只能由创建它的线程更新. 在GUI线程中,经常都需要生成一个异步操作,使GUI线程不至于阻塞并停止用户输入. 当异步操作完成时, 是由一个线程池线程完成Task对象并恢复状态机.

对于某些应用程序模型,这样做没什么问题, 因为非常高效, 但是对于另一些应用程序模型(比如GUI应用程序), 这个做法会造成问题, 因为一旦通过线程池线程更新UI元素就会抛出异常. 线程池线程必须以某种方式告诉GUI线程更新UI元素.

线程池生成一个异步操作后, 它可能由另一个线程池线程完成, 该线程将处理异步操作的结果. 代表原始客户端工作时, 语言文化和身份标识信息需要”流向”新的线程池线程. 这样一来,代表客户端执行的任何额外的工作才能使用客户端的语言文化和身份标识信息.

幸好,FCL定义了一个名为System.Threading.SynchronizationContext的基类. 它解决了所有这些问题. 简单的说,SynchronizationContext派生对象将应用程序模型连接到它的线程处理模型. FCL定义了几个SynchronizationContext派生类, 但你一般不直接和这些类打交道;

等待一个Task时会获取调用线程的SynchronizationContext对象, 线程池线程完成Task后, 会使用该SynchronizationContext对象, 确保为应用程序模型使用正确的线程处理模型. 所以,当一个GUI线程等待一个Task时, await操作符后面的代码保证在GUI线程上执行, 使代码能更新UI元素.

让状态机使用应用程序模型的线程处理模型来恢复, 这在多数时候都很有用, 也很方便. 但偶尔也会带来问题. 下面是WPF应用程序死锁的一个例子:

protected override void OnActivated(EventArgs e)
{
    // 发出HTTP请求, 让线程从GetHttp3线程返回
    // 查询Result属性会阻止GUI线程返回;
    // 线程阻塞等待结果
    String http = GetHttp1().Result; // 以同步方式获取字符串!
    base.OnActivated(e);
}

private async Task<String> GetHttp1()
{
    // 发出HTTP请求, 让线程从GetHttp返回
    HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/");

    // 这里永远执行不到, GUI线程在等待这个方法结束.
    // 等待执行, 但是永远执行不到,因为 需要同一个主线程执行,但是主线程在等待该方法执行结束, 死锁!!!     
    return await msg.Content.ReadAsStringAsync();
}

尤其要注意SynchronizationContext类, 由于许多类库代码都要求不依赖特定的应用程序模型, 所以要避免因为使用SynchronizationContext对象而产生的额外开销.此外,还要帮助开发人员防止死锁。为了解决这两方面的问题, TaskTask<TResult>类提供了一个ConfigureAwait方法, 签名如下:

// 定义这个方法的Task
public ConfiguredAwaitable ConfigureAwait(Boolean continueOnCapturedContext);
//定义这个方法的Task<TResult>
public ConfiguredAwaitable ConfigureAwait<TResult>(Boolean continueOnCapturedContext);

向方法传递true相当于根本没有调用方法, 但是如果传递false, await操作符就不查询调用线程的SynchronizationContext对象, 当线程池线程结束Task时会直接完成它, await操作符后面的代码通过线程池线程执行.

通过ConfigureAwait方法解决死锁问题:

private async Task<String> GetHttp2()
{
    // 发出HTTP请求, 让线程从GetHttp返回
    // 必须将ConfigureAwait(false)应用于等待的每个Task对象
    // ConfigureAwait(false) 不依赖 线程处理模型,可用线程池线程唤醒状态机
    HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/").ConfigureAwait(false);

    // 这里能执行到了,可用线程池线程唤醒状态机,线程池线程可以执行这里的代码,
    // 而非被迫由GUI线程执行 ,
    return await msg.Content.ReadAsStringAsync().ConfigureAwait(false);
}

必须将ConfigureAwait(false)应用于等待的每个Task对象. 这是由于异步操作可能同步完成,而且在发生这个情况时, 调用线程直接继续执行, 不会返回至它的调用者; 你根本不知道哪个操作要求忽略SynchronizationContext对象, 所以只能要求所有操作都忽略它. 这还以为着类库代码不能依赖于任何特定的应用程序模型. 也可以用一个线程池线程执行所有操作:

private Task<String> GetHttp3()
{
    return Task.Run(async () =>
    {
        // 运行一个无SynchronizationContext的线程池线程
        HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/");
        // 这里的代码真的能执行, 因为某个线程池线程能执行这里的代码

        return await msg.Content.ReadAsStringAsync();
    });
}

这个版本中GetHttp方法不再是异步函数, 从方法签名中删除了async关键字,因为方法中没有了await操作符.

SynchronizationContext只决定await之后的代码在哪个线程运行,至于异步操作是否开了新线程跟synccontext没关系. 也就是说有context的话就能确保一段代码看起来是一个函数的时候就真的在一个线程里执行.

以异步方法实现服务器

FCL内建了对伸缩性很好的一些异步服务器的支持。下面列举中MSDN文档中值的参考的地方。

  1. 要构建异步ASP.NET Web窗体,在.aspx文件中添加Async="true"的网页指令,并参考System.Web.UI.PageRegisterAsyncTask方法。
  2. 要构建异步ASP.NET MVC控制器,使你的控制器类从System.Web.Mvc.AsyncController派生,让操作方法返回一个Task<ActionResult>即可。
  3. 要构建异步ASP.NET 处理程序,使你的类从System.Web.HttpTaskAsyncHandler派生,重写其ProcessRequestAsync方法。
  4. 要构建异步WCF服务,将服务作为异步函数来实现,让它返回TaskTask<TResult>

取消I/O操作

Windows一般没有提供取消未完成I/O操作的途径,这是许多开发人员都想要的功能,实现起来却很困难。毕竟,如果向服务器请求了1000个字节,然后决定不再需要这些字节,那么其实没有办法告诉服务器忘掉你的请求。在这种情况下,只能让字节照常返回,再将他们丢弃。此外,这里还发生竞态条件-取消请求的请求可能正在服务器发送响应的时候到来,要在代码中处理这种潜在的竞态条件,决定是丢弃还是使用数据

建议实现一个WithCancellation扩展方法Task<TResult>

internal static class Cancellation
{
    public static async Task Go()
    {
        // 创建一个CancellationTokenSource,它在3秒之后将自己取消
        var cts = new CancellationTokenSource(3000);
        // 提前取消需要调用 cts.Cancel()
        var ct  = cts.Token;

        try
        {
            // 可能需要6秒后完成任务, 但是ct最多允许进行3秒,然后取消
            await Task.Delay(6000).WithCancellation(ct);
            Console.WriteLine("Task completed");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Task cancelled");
        }
        // 因此完成不了任务, 会抛出OperationCanceledException异常
        // 输出 Task cancelled
    }

    // 因为没有非泛型的TaskCompletionSource类
    private struct Void { }

    /// <summary>
    /// 扩展Task<TResult> 方法
    /// </summary>
    /// <param name="orignalTask"> 原始Task对象 </param>
    /// <param name="ct"> CancellationToken中的Task对象 </param>
    /// <typeparam name="TResult"> 返回类型 </typeparam>
    public static async Task<TResult> WithCancellation<TResult>(this Task<TResult> orignalTask, CancellationToken ct)
    {
        // 创建在CancellationToken被取消时完成的一个Task
        var cancelTask = new TaskCompletionSource<Void>();

        // 当CancellationToken取消时, 使cancelTask的Task完成
        // cancelTask对象会传入回调方法
        using (ct.Register(t => ((TaskCompletionSource<Void>) t).TrySetResult(new Void()), cancelTask))
        {
            // 创造在原始Task或CancellationToken里的Task完全时都完成一个Task
            // 哪个先完成any就引用哪个
            Task any = await Task.WhenAny(orignalTask, cancelTask.Task);

            // 如果any是cancelTask, 则说明是取消了任务, 抛出OperationCanceledException
            if (any == cancelTask.Task) ct.ThrowIfCancellationRequested();
        }

        // 等待原始任务(以同步的方式); 若任务失败,等待它抛出第一个内部异常
        //而不是抛出AggregateException
        return await orignalTask;
    }

    public static async Task WithCancellation(this Task task, CancellationToken ct)
    {
        var tcs = new TaskCompletionSource<Void>();
        using (ct.Register(t => ((TaskCompletionSource<Void>) t).TrySetResult(default(Void)), tcs))
        {
            if (await Task.WhenAny(task, tcs.Task) == tcs.Task) ct.ThrowIfCancellationRequested();
        }

        // 等待原始任务(以同步的方式); 若任务失败,等待它抛出第一个内部异常
        //而不是抛出AggregateException
        await task;

    }
}

有的I/O操作必须同步进行

有的方法不允许异步方式执行I/O, 例如Win32 CreateFile方法总是以同步方式执行. 因此FCL不能以异步方式高效地打开文件. Windos也没有提供函数以异步方式访问注册表,访问事件日志,获取目录的文件/子目录或者更改文件/目录的属性等.

FileSteam特有的问题

使用FileStream时应该想好是以同步方式还是以异步方式执行I/O操作,并指定FileOptions.Asynchronous标志来指明自己的选择。如果指定了该标志,就总是调用ReadAsync。如果没有使用这个标志,就总是调用Read。这样能够获得最佳性能。

如果想先对FileStream执行一些同步操作,再执行一些异步操作,那么更高效的做法是使用FileOptions.Asynchronous标志来构造它。另外也可针对同一个文件,创建两个FileStream对象,一个FileStream进行同步操作,另一个FileStream执行异步操作。

FileStream的辅助方法(Create,OpenOpenWrite)创建并返回FileStream对象,这些方法都没有指定FileOptions.Asynchronous标志,所以为了实现响应灵敏的、可伸缩性的应用程序,应避免使用这些方法.

I/O请求优先级

线程还要执行I/O请求以便从各种硬件设备中读写数据.

如果一个低优先级线程获得了CPU事件, 它可以在非常短的时间里轻易地将成百上千的I/O请求放入队列, 由于I/O请求需要时间来执行, 所以一个低优先级线程可能挂起高优先级线程, 使后者不能快速完成工作. 因此, 当系统执行一些耗时的低优先级服务(磁盘碎片整理程序, 病毒扫描程序,内容索引程序)时, 机器的响应能力可能会变得非常差.

Windows允许线程在发出I/O请求时指定优先级. 但是FCL还没有包含这个功能. 如果想用,可以采取P/Invoke本机Win32函数 的方式:

internal static class ThreadIO
{
    public static void Go()
    {
        using (ThreadIO.BeginBackgroundProcessing())
        {
            // 在这里执行低优先级I/O请求, 例如WirteAsync/ReadAsync
        }
    }

    public static BackgroundProcessingDisposer BeginBackgroundProcessing(Boolean process = false)
    {
        ChangeBackgroundProcessing(process, true);
        return new BackgroundProcessingDisposer(process);
    }

    public static void EndBackgroundProcessing(Boolean process = false)
    {
        ChangeBackgroundProcessing(process, false);
    }

    private static void ChangeBackgroundProcessing(Boolean process, Boolean start)
    {
        Boolean ok = process
            ? SetPriorityClass(GetCurrentWin32ProcessHandle(),
                start ? ProcessBackgroundMode.Start : ProcessBackgroundMode.End)
            : SetThreadPriority(GetCurrentWin32ThreadHandle(),
                start ? ThreadBackgroundgMode.Start : ThreadBackgroundgMode.End);
        if (!ok) throw new Win32Exception();
    }

    // 这个结构使C#的using语句能终止后台处理模式
    public struct BackgroundProcessingDisposer : IDisposable
    {
        private readonly Boolean m_process;

        public BackgroundProcessingDisposer(Boolean process)
        {
            m_process = process;
        }

        public void Dispose()
        {
            EndBackgroundProcessing(m_process);
        }
    }


    // See Win32’s THREAD_MODE_BACKGROUND_BEGIN and THREAD_MODE_BACKGROUND_END
    private enum ThreadBackgroundgMode
    {
        Start = 0x10000,
        End   = 0x20000
    }

    // See Win32’s PROCESS_MODE_BACKGROUND_BEGIN and PROCESS_MODE_BACKGROUND_END   
    private enum ProcessBackgroundMode
    {
        Start = 0x100000,
        End   = 0x200000
    }

    [DllImport("Kernel32", EntryPoint = "GetCurrentProcess", ExactSpelling = true)]
    private static extern SafeWaitHandle GetCurrentWin32ProcessHandle();

    [DllImport("Kernel32", ExactSpelling = true, SetLastError = true)]
    [return: MarshalAs(UnmanagedType.Bool)]
    private static extern Boolean SetPriorityClass(SafeWaitHandle hprocess, ProcessBackgroundMode mode);


    [DllImport("Kernel32", EntryPoint = "GetCurrentThread", ExactSpelling = true)]
    private static extern SafeWaitHandle GetCurrentWin32ThreadHandle();

    [DllImport("Kernel32", ExactSpelling = true, SetLastError = true)]
    [return: MarshalAs(UnmanagedType.Bool)]
    private static extern Boolean SetThreadPriority(SafeWaitHandle hthread, ThreadBackgroundgMode mode);

    // http://msdn.microsoft.com/en-us/library/aa480216.aspx
    [DllImport("Kernel32", SetLastError = true, EntryPoint = "CancelSynchronousIo")]
    [return: MarshalAs(UnmanagedType.Bool)]
    private static extern Boolean CancelSynchronousIO(SafeWaitHandle hThread);

要调用ThreadIOBeginBackgroundProcessing方法, 告诉windows你的线程要发出低优先级I/O请求, 注意, 这同时会降低线程的CPU调度优先级.

可以调用BeginBackgroundProcessing或者EndBackgroundProcessing返回的值上调用Dispose(上述代码用了using), 使线程恢复为发出普通优先级的I/O请求(以及普通CPU的调度优先级).

如果希望一个进程中的所有线程都发出低优先级I/O请求和进行低优先级的CPU调度, 可以调用BeginBackgroundProcessing,为它的process参数递true值.

线程只能影响它自己的后台处理模式. 不允许更改另一个线程的后台处理模式.

一个例子

类比通常有帮助。你在一家餐馆做饭。鸡蛋和烤面包的订单。

  • 同步:你煮鸡蛋,然后你做烤面包。
  • 异步,单线程:你开始煮鸡蛋并设置一个计时器。你开始吐司做饭,并设置一个计时器。虽然他们都在做饭,但你要清理厨房。当计时器响起时,你可以将鸡蛋从烤箱中取出,然后将烤面包机从烤面包机中取出并送达。
  • 异步,多线程:你雇两个厨师,一个煮鸡蛋,一个煮烤面包。现在你遇到了协调厨师的问题,这样他们在共享资源时就不会在厨房里相互冲突。你必须付钱给他们。

现在,多线程只是一种异步才有意义吗? 线程是关于工人的; 异步是关于任务。在多线程工作流中,您可以将任务分配给工作人员。在异步单线程工作流程中,您有一个任务图表,其中某些任务取决于其他任务的结果; 当每个任务完成时,它会调用代码来调度可以运行的下一个任务,给定刚刚完成的任务的结果。但是你(希望)只需要一个工人来执行所有任务,而不是每个任务一个工人。

它将有助于意识到许多任务不受处理器约束。对于处理器绑定的任务,雇用与处理器一样多的工作者(线程),为每个工作者分配一个任务,为每个工作者分配一个处理器,让每个处理器完成其他工作,除了将结果计算为尽快。但是对于没有在处理器上等待的任务,您根本不需要分配工作人员。您只是等待消息到达,结果可用并在您等待时执行其他操作。当该消息到达时,您可以安排继续完成的任务作为待办事项列表中的下一个要检查的事项。

有人问你要一份文件。您通过邮件发送文件,继续做其他工作。当它到达邮件时,您会收到信号,当您有这样的信息时,您会完成工作流程的其余部分 - 打开信封,支付运费,无论如何。您不需要聘请其他工作人员为您完成所有这些工作。

扩展C#中的异步方法(网络摘要)

翻译文章的地址

关于如何控制异步方法机制有3种方法:

  • System.Runtime.CompilerServices命名空间中提供你自己的async method builder
  • 使用自定义的task awaiter
  • 定义你自己的**“类任务”(task-like)** 类型

异步方法被C#编译器转换从而生成的状态机是依靠于某些预定义的类型的。但是C#编译器却并不一定要求这些众所周知的类型来自于某个特定的程序集。例如,你可以在你的项目中提供自己对AsyncVoidMethodBuilder的实现,然后C#编译器就会把异步机制“绑定”到你的自定义类型。

“类任务”(Task-like)类型

翻译文章的地址

从支持async/await的编译器的第一个版本(即C# 5)开始,就可以自定义awaiter了。这个可扩展性十分有用但是却是有限的,因为所有的异步方法都必须返回void,TaskTask<T>。从C# 7.2开始,编译器支持“类任务”类型。

“类任务”类型是一个class或者struct,它与一个通过AsyncMethodBuilderAttribute标识builder类型相关联。要使“类任务”类型有用,它应该像我们前面描述的awaiter那样是可等待的。基本上,“类任务”类型结合了前面描述的两种可扩展性的方法,并且使第一种方法得到了正式支持。

现在你还必须自己定义这个attribute,例子:github

相关异步的文档

C#中的异步任务类型

Licensed under CC BY-NC-SA 4.0
0