程序员的知识教程库

网站首页 > 教程分享 正文

编程深水区之并发⑦:C#的Task(c++高并发编程)

henian88 2024-08-15 05:07:06 教程分享 10 ℃ 0 评论

我们很少直接使用ThreadPool,因为它有一些限制,比如你无法知道异步任务什么时候完成,也无法让异步操作返回值。而Task能够解决这些技术问题,它建立在ThreadPool基础之上,仍然是使用CLR线程池。除此之外,还提供了Parallel、PLINQ、Timer、async/await等并行任务API,都是建立在线程池之上。

一、Task

1.1 TaskScheduler任务调度器

Thread和ThreadPool,都没有内建的机制来返回值或让你知道异步操作什么时候完成、完成的怎么样,而Task解决了这个问题。Task引入了TaskScheduler对象,负责异步任务的调度。默认情况下,使用线程池调度器(TaskScheduler.Default),但除此之外,.NET还提供了更多调度器,比如同步上下文任务调度器(将异步任务交给GUI线程)、IO调度器(将任务交给线程池中的I/O线程)、ThreadPer调度器(启动一个单独的线程而不使用线程池)等。

1.2 Task的创建

有两种方式,构造函数new Task()和静态方法Task.Run(),两者都有多个重载。

//1、使用Task的两种方式==========================================
//注意:本例中都是向Task传入具体值,而非变量。传入变量,存在数据竞争的问题。
public class Program
{
    static void Main()
    {
        var currentThread = Thread.CurrentThread;
        Console.WriteLine(#34;{currentThread.ManagedThreadId}:主线程开始");
      
        //(1)ThreadPool.QueueUserWorkItem方式
        ThreadPool.QueueUserWorkItem(CountWorker,1);
      
        //(2)new Task()方式
        //多个重载,传递Action、CancellationToken、State(参数)等
        var t = new Task(CountWorker,2);
        t.Start();
      
        //(3)Task.Run()方式
        //直接Start,更常用
        //多个重载,传递Action、CancellationToken
        Task.Run(() => CountWorker(3));
      
        Console.ReadKey();
    }
    static void CountWorker(object state)
    {
        var ct = Thread.CurrentThread;
        Console.WriteLine(#34;{ct.ManagedThreadId}:Worker线程开始;参数:{state}");
    }
}
/*输出,三个Worker线程的数据不一定
1:主线程开始
8:Worker线程开始;参数:2
7:Worker线程开始;参数:1
9:Worker线程开始;参数:3
*/


//2、传入TaskCreationOptions枚举值================================
//任务调度器会根据传入的枚举值,调整调度行为
//Task有多个重载,其中一个重载可以传入TaskCreationOptions枚举值
new Task(CountWorker,2, TaskCreationOptions.LongRunning).Start();
/*TaskCreationOptions的枚举值
None,默认
PreferFairness,希望任务尽快执行,但不一定采纳
LongRunning,建议尽可能采用线程池,但不一定采纳
AttachedToParent,将一个Task与父Task关联,一定会采纳
DenyChildAttach,拒绝子Task的关联(作为父Task),一定会采纳
HideScheduler,强迫子任务使用默认调度器,而不使用父Task的,一定会采纳
*/


//4、new Task也可以使用Lambda======================================
new Task((state) => 
{ 
  Console.WriteLine(state); 
}, 100)
.Start();


//3、再展开说Task.Run()============================================
//3.1 以下代码会报错
//因为new Task(Callback,state),Callback的参数必须是object类型
//实参state传入Callback后,会有装箱拆箱操作
public class Program
{
    static void Main()
    {
        var t = new Task(CountWorker,2);
    }
    static void CountWorker(int num)
    {
        Console.WriteLine(#34;参数:{num}");
    }
}

//3.2 以下代码正常执行
//因为此时 ()=>CountWorker(3) 整体作为Callback
//CountWorker(3)只是在Callback的方法体内运行
public class Program
{
    static void Main()
    {
        Task.Run(() => CountWorker(3));
    }
    static void CountWorker(int num)
    {
        Console.WriteLine(#34;参数:{num}");
    }
}

//3.3 改写一下3.2代码,两段代码等效
public class Program
{
    static void Main()
    {
        Task.Run(() => 
        {
            Console.WriteLine("参数:3");
        });
        Console.ReadKey();
    }
}

1.3 等待Task完成和获取Task结果

无论是等待完成Wait()方法,还是获取结果Result属性,都会造成当前线程程序代码的等待,且可能阻塞当前线程。等待和非阻塞是两个概念,要注意区别。等待是一定会发生的,而是否阻塞,是指等待的时候,当前线程能不能回到线程池,然后被调度到其它地方干活,从而避免需要创建新的线程,以提升性能。Task是可能造成阻塞的,所以在Task的基础上,提供了async/await,可以实现非阻塞的异步操作,这部分内容放到下个章节。

//1、Wait()方法等待Task完成=====================================
/*调用Wait方法时:
1)如果Task已由线程池中的线程执行,则当前线程会发生阻塞,等待Task完成
2)如果还没有开始执行,则当前线程会去执行Task,完成后再回来执行剩下的代码
   -从程序代码执行的角度来说,代码执行在Wait()时,开始等待
   -但从整个系统或线程池角度来说,没有线程被阻塞,资源没有被浪费
   -注:如果主线程已获得一个同步锁,此时Task也会试图获取同一个,可能造成死锁
*/
public class Program
{
    static void Main()
    {
        Console.WriteLine("主线程开始");
        var t = Task.Run(() => 
        {
            Console.WriteLine("异步线程开始");
            Thread.Sleep(1000); //模拟耗时操作
            Console.WriteLine("异步线程结束");
        });
        t.Wait(); //等待异步任务完成后,才会执行下面代码
        Console.WriteLine("主线程结束");
        Console.ReadKey();
    }
}
/*输出
主线程开始
异步线程开始
异步线程结束
主线程结束
*/


//2、获取Task的结果================================================
//2.1 通过调用Task对象的Result属性,可以获得Task的返回值
//注意:Result内部会调用Wait,也可能造成阻塞
public class Program
{
    static void Main()
    {
        var t = Task.Run(() => 
        {
            return 10;
        });
        //t.Wait();//此处有没有Wait,结果都一样
        Console.WriteLine(t.Result);
        Console.ReadKey();
    }
}

//2.2 使用new Task的方式
//使用new Task<T>泛型版本构造函数,T为返回值类型
var t = new Task<int>(() =>
{
    return 10;
});
t.Start();
Console.WriteLine(t.Result);
Console.ReadKey();

//2.3 发生异常结果
/*
1)Task可能发生异常,此时调用Wait或Result时,会抛出异常对象,线程退回线程池
2)异常对象为AggregateException,该对象的InnerExceptions属性是具体异常的集合
3)如果我们一直没有调用Wait或者Result,就不知道是否发生了异常,此时可以向
  TaskScheduler的UnobservedTaskException事件,注册一个异常回调来捕获异常
*/

//2.4 Task对象的其它API
var t = Task.Run(...)
/*Task对象t,除了Wait()和Result外,其它几个常用API
1)Status属性,获取Task执行的状态
2)IsCalceled/IsCompleted/IsCompletedSuccessfully/IsFaulted,Task状态的简化版
3)Id属性,Task的唯一Id,主要用于调试
*/

1.4 使用CancellationToken终止任务

和前面章节说的CancellationToken使用方式类似,但使用了ThrowIfCancellationRequested方法。如果收到终止信号,则抛出异常。为什么要抛出异常呢?因为本例中的Task没有返回值,如果不抛出异常,外界无法知道异步任务的执行情况。

public class Program
{
    static void Main()
    {
        //创建一个 CancellationTokenSource
        CancellationTokenSource cts = new CancellationTokenSource();

        //启动一个任务,并传递 token
        //使用了Task.Run(cb,token)的重载方法,第二个参数为Token
        //此处使用了闭包的方式,将Token传入Callback方法体中
        Task task = Task.Run(() => DoWork(cts.Token), cts.Token);

        //等待一段时间,然后请求取消
        Thread.Sleep(2000);
        Console.WriteLine("请求取消任务");
        cts.Cancel();

        //等待Task完成,并捕获异常
        try
        {
            task.Wait();
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("任务被取消了");
        }
        finally
        {
            cts.Dispose(); //手动释放CancellationTokenSource
        }
    }

    static void DoWork(CancellationToken token)
    {
        int i = 0;
        while (true)
        {
            //如果获取到终止操作,则抛出异常
            //如果不抛出异常,外界无法获知异步操作的执行情况
            token.ThrowIfCancellationRequested();
            Console.WriteLine(#34;正在执行任务... {i++}");
            Thread.Sleep(500); //模拟耗时工作
        }
    }
}

1.5 编排多个Task的执行

(1)WaitAny()和WaitAll(),会阻塞调用线程

//2、Task的静态方法WaitAny和WaitAll================================
Task task1 = Task.Run(...);
Task task2 = Task.Run(...);
//阻塞调用线程,直到参数中任何一个Task完成,返回值为已完成任务的索引值
int who = Task.WaitAny(task1, task2);
//阻塞调用线程,直到参数中所有Task完成,如果都完成了,返回true
bool isAllCompleted = Task.WaitAll(task1, task2);

(2)任务完成时启动新任务,不会阻塞

//1、ContinueWith的基本使用
//Task.Run()或者new Task()的返回值,仍然是一个Task===============
public class Program
{
    static void Main()
    {
        Task t1 = Task.Run(() =>
        {
            Console.WriteLine("任务1正在执行...");
            Thread.Sleep(1000);//模拟耗时操作
        });
        //ContinueWith的Callback的参数,就是(完成态的)t1对象
        Task t2 = t1.ContinueWith((task) =>
        {
            Console.WriteLine(task.IsCompleted); //判断t1是否已经完成
            Console.WriteLine("任务2在任务1完成后开始执行...");
            
            Thread.Sleep(1000);//模拟耗时操作
        });

        Console.ReadKey();
    }
}

//2、ContinueWith的链式调用======================================
public class Program
{
    static void Main()
    {
        Task.Run(() =>
        {
            Console.WriteLine("任务1正在执行...");
            Thread.Sleep(1000);//模拟耗时操作
            return 1;
        })
        .ContinueWith((task) => 
        {
            var num = task.Result;
            num++;
            Console.WriteLine("任务2正在执行:"+ num);
            Thread.Sleep(1000);//模拟耗时操作
            return num;
        })
        .ContinueWith((task) =>
         {
             var num = task.Result;
             num++;
             Console.WriteLine("任务3正在执行:" + num);
             Thread.Sleep(1000);//模拟耗时操作
             return num;
         });

        Console.ReadKey();
    }
}

//3、ContinueWith有多个重载,可传入TaskContinuationOptions枚举=====
//略

(3)任务可以启动子任务

//父子任务最好使用new Task()
//因为Task.Run()没有提供传入TaskCreationOptions的重载
public class Program
{
    static void Main()
    {
        // 创建父任务
        var parentTask = new Task(() =>
        {
            Console.WriteLine("父任务开始");

            // 创建并启动子任务
            var childTask = new Task(() =>
            {
                Console.WriteLine("子任务开始");
                Thread.Sleep(1000); // 模拟工作
                Console.WriteLine("子任务完成");
            }, TaskCreationOptions.AttachedToParent); //关联到父任务
            childTask.Start();

            // 父任务的其他工作
            Console.WriteLine("父任务进行其他工作");
        });

        parentTask.Start();
        parentTask.Wait(); // 等待父任务和子任务都完成

        Console.WriteLine("父任务和所有子任务已完成");
        Console.ReadKey(); // 阻止主线程提前退出
    }
}

//Task还提供一个方法Task.Factory.StartNew(),可以传入TaskCreationOptions
//Task.Run实际上是Task.Factory.StartNew的简写
//但要实现父子任务,Task.Run和Task.Factory.StartNew仍然不能混用
//Task.Factory.StartNew版本如下:
var parentTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("父任务开始");

    Task.Factory.StartNew(() =>
    {
        Console.WriteLine("子任务开始");
        Thread.Sleep(1000); 
        Console.WriteLine("子任务完成");
    }, TaskCreationOptions.AttachedToParent);

    // 父任务的其他工作
    Console.WriteLine("父任务进行其他工作");
});
parentTask.Wait(); // 等待父任务和子任务都完成
Console.WriteLine("父任务和所有子任务已完成");
Console.ReadKey(); // 阻止主线程提前退出

二、Parallel

2.1 静态For、Foreach和Invoke方法

Parallel可以使用线程池的线程,并行运行异步任务,内部使用的是Task对象

//1、Parallel.For()=============================================
public class Program
{
    static void Main()
    {
        //同步执行for循环
        for (int i = 0; i < 10; i++)
        {
            var threadID = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine(#34;主线程的循环 {i},线程ID: {threadID}");
        }

        //异步并行执行for循环
        Parallel.For(0, 10, i =>
        {
            Console.WriteLine(#34;异步Task {i},TaskID: {Task.CurrentId}");
        });
    }
}

//2、Parallel.ForEach()=========================================
public class Program
{
    static void Main()
    {
        List<int> numbers = new List<int> { 1, 2, 3, 4, 5 };
        //异步并行执行foreach循环
        Parallel.ForEach(numbers, number =>
        {
            Console.WriteLine(#34;Number: {number},TaskID: {Task.CurrentId}");
        });
    }
}

//3、Parallel.Invoke()=========================================
public class Program
{
    static void Main()
    {
        //异步并行执行多个任务
        Parallel.Invoke(
            () => Console.WriteLine("Task 1 running"),
            () => Console.WriteLine("Task 2 running"),
            () => Console.WriteLine("Task 3 running")
        );
    }
}

2.2 Parallel并行方法的ParallelOptions

For、Foreach或Invoke方法,都有接受ParallelOptions对象参数的重载方法,可以配置并行调度的方式。

public class Program
{
    static void Main()
    {
        var options = new ParallelOptions()
        {
            MaxDegreeOfParallelism = 2, // 限制同时运行的最大任务数
            //CancellationToken = CancellationToken.None, //终止线程
            //TaskScheduler = TaskScheduler.Default //使用哪个调试器
        };

        Parallel.For(0, 10, options, i =>
        {
            Console.WriteLine(#34;Task {i} , TaskID: {Task.CurrentId}");
        });
    }
}

2.3 处理异常

在并行执行任务时,出现异常的任务,异常会被聚合到AggregateException中;未出现异常的任务,继续执行。

class Program
{
    static void Main()
    {
        try
        {
            Parallel.Invoke(
                () => { throw new InvalidOperationException("非常操作!"); },
                () => Console.WriteLine("This task will complete")
            );
        }
        catch (AggregateException ex)
        {
            foreach (var e in ex.InnerExceptions)
            {
                Console.WriteLine(#34;Caught exception: {e.Message}");
            }
        }
    }
}

2.4 终止并行任务

需要借助ParallelOptions对象,传入CancellationToken,Parallel内部会自动调用CancellationToken的方法ThrowIfCancellationRequested()

public class Program
{
    static void Main()
    {
        var cts = new CancellationTokenSource();
        var options = new ParallelOptions()
        {
            CancellationToken = cts.Token
        };

        Task.Run(() =>
        {
            Thread.Sleep(1000); // 模拟一些工作
            cts.Cancel(); // 取消并行操作
        });

        try
        {
            Parallel.For(0, 10, options, i =>
            {
                Console.WriteLine(#34;Task {i} ,TaskID: {Task.CurrentId}");
                Thread.Sleep(2000); // 模拟工作量
            });
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Operation was canceled");
        }
    }
}

2.5 使用Parallel的注意事项

  • Parallel本身也有开销,如果只有区区几个异步任务,或者任务处理非常快,反而可能会降低性能。
  • Parallel需要大量可用线程来处理异步任务,如果调用时,线程池的线程数量比较少,可能需要创建大量线程,结果并不一定能提升性能,所以使用Parallel最好手动配置MaxDegreeOfParallelism。
  • 在Parallel中要避免修改共享数据,虽然可以使用同步锁,但那就失去了使用Parallel执行并行任务的意义。

三、PLINQ

使用LINQ时,只有一个线程顺序处理数据集合中的所有项。类似Parallel,可以使用并行查询,以提高处理性能。PLINQ在内部使用Task(默认由TaskScheduler调度),将集合中的数据分散到多个线程上处理。


class Program
{
    static void Main()
    {
        int[] numbers = Enumerable.Range(1, 100).ToArray();

        var evenNumbers = numbers
            .AsParallel() //并行处理
            //.AsOrdered() //保留原始顺序,会影响并行的性能
            //.WithCancellation(cts.Token) //支持取消操作
            .WithDegreeOfParallelism(4) //设置并行操作最大任务数,最好设置为CPU核数
            .Where(n => n % 2 == 0)
            .ToArray();

        foreach (var num in evenNumbers)
        {
            Console.WriteLine(num);
        }
    }
}

四、Timer

可以让一个线程池的线程,定时调用一个Callback。线程池为所有Timer对象,准备了一个专用线程,这个线程知道下一个Timer对象什么时候到期,到期时,这个线程被唤醒,并调用ThreadPool的QueueUserWorkItem,将Callback添加到线程池的任务队列中,线程池会调用一个空闲线程来执行。针对不同环境,.NET提供了多种Timer类型,比如适合后端的System.Threading.Timer和WPF的System.Windows.Threading.DispatcherTimer。

//1、适合后端环境的System.Threading.Timer=========================
//1.1 基本使用
public class Program
{
    public static void Main()
    {
        //创建Timer,每隔2秒执行一次回调方法
        /*参数说明:
        TimerCallback:定时回调的方法,方法签名必须符合要求
        null:传入Callback的参数,object类型,如果没有参数,可传入null
        0:定时器立即执行的间隔时间,0表示立即执行,Timeout.Infinite表示不执行
        2000:定时器每间隔多长时间调用,Timeout.Infinite表示不执行
        */
        Timer timer = new Timer(TimerCallback, null, 0, 2000);

        Console.WriteLine("Timer started. Press Enter to stop...");
        Console.ReadLine();
    }
  
    //注意TimerCallback的签名,返回值为void,参数为object
    private static void TimerCallback(object state)
    {
        Console.WriteLine(#34;Timer callback executed at {DateTime.Now}");
    }
}

//1.2 使用Change方法手动启动
/*如果Timer的Callback执行时间很长:
  -可能出现上次还没执行完,下一个Timer又来了
  -有多个线程同时执行Callback
  -使用Change方法可以指定一个新的立即执行时间,从而避免以上情况
*/
public class Program
{
    //全局的Timer对象
    private static Timer timer;
    public static void Main()
    {
        //初始化全局Timer对象,但不立即启动
        timer = new Timer(TimerCallback, null, 
                              Timeout.Infinite, Timeout.Infinite);
        //立即启动Timer
        //参数1为立即启动的间隔时间,0表示立即启动
        //参数2为每隔多长时间重新启动,Timeout.Infinite表示不启动
        timer.Change(0,Timeout.Infinite)
    }
  
    private static void TimerCallback(object state)
    {
        ......
        //在Callback中调用Change,2秒后立即执行
        //实现循环套娃
        timer.Change(2000,Timeout.Infinite)
    }
}


//2、适合WPF的System.Windows.Threading.DispatcherTimer============
public class MainWindow : Window
{
    private DispatcherTimer dispatcherTimer;

    public MainWindow()
    {
        //创建DispatcherTimer对象
        dispatcherTimer = new DispatcherTimer();
        dispatcherTimer.Interval = TimeSpan.FromSeconds(2); //设置间隔为2秒
        dispatcherTimer.Tick += DispatcherTimer_Tick; //订阅Tick事件,Callback
        dispatcherTimer.Start(); //启动 Timer
    }

    private void DispatcherTimer_Tick(object sender, EventArgs e)
    {
        // 更新UI
        this.Title = #34;Timer ticked at {DateTime.Now}";
    }

    ......
}

五、后记

本章节内容已经比较多,async/await放到下个章节。async/await是Task的非阻塞版本,非常适合用来操作I/O。


*这是一个系列文章,将全面介绍多线程、协程和单线程事件循环机制,建议收藏、点赞哦!
*你在并发编程过程中碰到了哪些难题?欢迎评论区交流~~~


我是functionMC > function MyClass(){...}
C#/TS/鸿蒙/AI等技术问题,以及如何写Bug、防脱发、送外卖等高深问题,都可以私信提问哦!

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表