何为异步流

我们使用C#中的yield关键字可以实现迭代器,使用async和await关键字可以实现异步方法。异步流是这两种功能的结合体,它用异步方式生成和消费数据的迭代器。异步流是在C#8中引入的,它以IAsyncEnumerable<out T>IAsyncEnumerator<out T>: IAsyncDisposable两个接口为基础,这两个接口的代码如下:

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator ();
}
public interface IAsyncEnumerator<out T>: IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}
复制代码

在上面的代码中,从序列中获取每个元素的方法MoveNextAsync是一个异步操作,元素以零散的方式到达,就是异步流。IAsyncEnumerator接口中的的ValueTask是Task类型的轻量化封装,它的类型是结构类型,使用方式和Task差不多,但是它在同步完成任务时或者在返回马上可以使用的结果时可以减少内存开销,因此它比Task更加高效。
下面我们来看一看异步流的用法,首先我们定义一个计算斐波那契数列的方法Fibonacci。其中我们使用Thread.Sleep模拟一个耗时操作。代码如下:

IEnumerable<int> Fibonacci(int count)
{
    int prev = 1;
    int curr = 1;
    for (int i = 0; i < count; i++)
    {
        yield return prev;
        Thread.Sleep(1000000);
        int temp = prev + curr;
        prev = curr;
        curr = temp;
    }
}
复制代码

在代码中Thread.Sleep(1000000)是同步的,也就是说只有Thread.Sleep(1000000)执行完成,后续代码才能继续执行。因此为了提高执行效率我们需要把Thread.Sleep(1000000)改成异步的,在这里我们就可以让它生成异步流。要生成异步流就需要同用到迭代器和异步方法。要实现这个功能就必须使用IAsyncEnumerable接口作为方法的返回类型。修改后的代码如下:

async IAsyncEnumerable<int> FibonacciAsync(int count)
{
    int prev = 1;
    int curr = 1;
    Random r = new();
    for (int i = 0; i < count; i++)
    {
        yield return prev;
        await Task.Delay(1000000);
        int temp = prev + curr;
        prev = curr;
        curr = temp;
    }
}
复制代码

经过修改后,FibonacciAsync方法允许调用方以异步方式消费生成的数列,也就是说可以使用await foreach来遍历消费这个方法的返回结果。调用如下:

await foreach (var f in FibonacciAsync(200))
    Console.Write("{0} ", f);
复制代码

修改后的代码虽然可以以一步的方式消费生成的数列,但是如果在LINQ查询语句中消费异步流是无法使用的。这时就需要引入System.Linq.Async,调用如下:

IAsyncEnumerable<int> query =
    from f in FibonacciAsync(200)
    where f % 2 == 1
    select f * 2;
await foreach (var number in query)
    Console.WriteLine(number);
复制代码

如果想在ASP.NET Core的Action中返回异步流可以像下面这样做:

[HttpGet]
public async IAsyncEnumerable<string> Get()
{
    using var dbContext = new BookContext();
    await foreach (var userName in dbContext.Users
        .Select(u => u.Name)
        .AsAsyncEnumerable())
    yield return userName;
}
复制代码

总结

异步流解决的是零散数据异步生成和消费问题。在 C#8以前一组数据只能以整体异步的方式返回给调用者。