Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 378|回复: 0

.NET异步多线程,Thread,ThreadPool,Task,Parallel,异常处理,线程取消

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-5-19 09:48:18 | 显示全部楼层 |阅读模式

    今天记录一下异步多线程的进阶历史,以及简单的使用方法

    主要还是以Task,Parallel为主,毕竟用的比较多的现在就是这些了,再往前去的,除非是老项目,不然真的应该是挺少了,大概有个概念,就当了解一下进化史了

    1:委托异步多线程,所有的异步都是基于委托来实现的

    #region 委托异步多线程
    {
      //委托异步多线程
      Stopwatch watch = new Stopwatch();
      watch.Start();
      Console.WriteLine($"开始执行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{Thread.CurrentThread.ManagedThreadId}");
      Action<string> act = DoSomethingLong;
      for (int i = 0; i < 5; i++)
      {
        //int ThreadId = Thread.CurrentThread.ManagedThreadId;//获取当前线程ID
        string name = $"Async {i}";
        act.BeginInvoke(name, null, null);
      }
      watch.Stop();
      Console.WriteLine($"结束执行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{watch.ElapsedMilliseconds}");
    }
    #endregion

     

    2:多线程的最老版本:Thread(好像是2.0的时候出的?记不得了)

    //Thread
    //Thread默认属于前台线程,启动后必须完成
    //Thread有很多Api,但大多数都已经不用了
    Console.WriteLine("Thread多线程开始了");
    Stopwatch watch = new Stopwatch();
    watch.Start();
    //线程容器
    List<Thread> list = new List<Thread>();
    for (int i = 0; i < 5; i++)
    {
           //int ThreadId = Thread.CurrentThread.ManagedThreadId;//获取当前线程ID
           string name = $"Async {i}";
           ThreadStart start1 = () =>
           {
              DoSomethingLong(name);
           };
           Thread thread = new Thread(start1);
           thread.IsBackground = true;//设置为后台线程,关闭后立即退出
           thread.Start();
           list.Add(thread);
           //thread.Suspend();//暂停,已经不用了
           //thread.Resume();//恢复,已经不用了
           //thread.Abort();//销毁线程
           //停止线程靠的不是外部力量,而是线程自身,外部修改信号量,线程检测信号量
    }
    //判断当前线程状态,来做线程等待
    while (list.Count(t => t.ThreadState != System.Threading.ThreadState.Stopped) > 0)
    {
      Console.WriteLine("等待中.....");
      Thread.Sleep(
    500); } //统计正确全部耗时,通过join来做线程等待 foreach (var item in list) {   item.Join();//线程等待,表示把thread线程任务join到当前线程,也就是当前线程等着thread任务完成   //等待完成后统计时间 } watch.Stop();

    Thread的无返回值回调:

    封装一个方法

    /// <summary>
    /// 回调封装,无返回值
    /// </summary>
    /// <param name="start"></param>
    /// <param name="callback"></param>
    private static void ThreadWithCallback(ThreadStart start, Action callback)
    {
      ThreadStart nweStart = () =>
      {
        start.Invoke();
        callback.Invoke();
      };
      Thread thread = new Thread(nweStart);
      thread.Start();
    }
    //回调的委托
    Action act = () =>
    {
        Console.WriteLine("回调函数");
    };
    //要执行的异步操作
    ThreadStart start = () =>
    {
        Console.WriteLine("正常执行。。");
    };
    ThreadWithCallback(start, act);

    有返回值的回调:

    /// <summary>
    /// 回调封装,有返回值
    /// 想要获取返回值,必须要有一个等待的过程
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="func"></param>
    /// <returns></returns>
    private static Func<T> ThreadWithReturn<T>(Func<T> func)
    {
      T t = default(T);
      //ThreadStart本身也是一个委托
      ThreadStart start = () =>
        {
          t = func.Invoke();
        };
      //开启一个子线程
      Thread thread = new Thread(start);
      thread.Start();
      //返回一个委托,因为委托本身是不执行的,所以这里返回出去的是还没有执行的委托
      //等在获取结果的地方,调用该委托
      return () =>
      {
        //只是判断状态的方法
        while (thread.ThreadState != System.Threading.ThreadState.Stopped)
        {
          Thread.Sleep(500);
        }
        //使用线程等待
        //thread.Join();
        //以上两种都可以
        return t;
      };
    }
    Func<int> func = () =>
    {
      Console.WriteLine("正在执行。。。");
      Thread.Sleep(10000);
      Console.WriteLine("执行结束。。。");
      return DateTime.Now.Year;
    };
    Func<int> newfunc = ThreadWithReturn(func);
    //这里为了方便测试,只管感受回调的执行原理
    Console.WriteLine("Else.....");
    Thread.Sleep(100);
    Console.WriteLine("Else.....");
    Thread.Sleep(100);
    Console.WriteLine("Else.....");
    //执行回调函数里return的委托获取结果
    //newfunc.Invoke();
    Console.WriteLine($"有参数回调函数的回调结果:{newfunc.Invoke()}");

    关于有返回值的回调,因为委托是在执行Invoke的时候才会去调用委托的方法,所以在调用newfunc.Invoke()的时候,才会去委托里面抓取值,这里会有一个等待的过程,等待线程执行完成

     

    3:ThreadPool:线程池

    线程池是在Thread后出的,算是一种很给力的进化

    在Thread的年代,线程是直接从计算机里抓取的,而线程池的出现,就是给开发人员提供一个容器,可以从容器中抓取线程,也就是线程池

    好处就在于可以避免频繁的创建和销毁线程,直接从线程池拿线程,用完在还回去,当不够的时候,线程池在从计算机中给你分配,岂不是很爽?

    线程池的数量是可以控制的,最小线程数量:8

    ThreadPool.QueueUserWorkItem(p =>
    {
      //这里的p是没有值的
      Console.WriteLine(p);
      Thread.Sleep(2000);
      Console.WriteLine($"线程池线程,{Thread.CurrentThread.ManagedThreadId}");
    });
    ThreadPool.QueueUserWorkItem(p =>
    {
      //这里的p就是传进来的值
      Console.WriteLine(p);
      Thread.Sleep(2000);
      Console.WriteLine($"线程池线程,带参数,{Thread.CurrentThread.ManagedThreadId}");
    }, "这里是参数");

    线程池用起来还是很方便的,也可以控制线程数量

    线程池里有两种线程:普通线程,IO线程,我比较喜欢在操作IO的时候使用ThreadPool

    int workerThreads = 0;
    int completionPortThreads = 0;
    //设置线程池的最大线程数量(普通线程,IO线程)
    ThreadPool.SetMaxThreads(80, 80);
    //设置线程池的最小线程数量(普通线程,IO线程)
    ThreadPool.SetMinThreads(8, 8);
    ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
    Console.WriteLine($"当前可用最大普通线程:{workerThreads},IO:{completionPortThreads}");
    ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
    Console.WriteLine($"当前可用最小普通线程:{workerThreads},IO:{completionPortThreads}");

    ThreadPool并没有Thread的Join等待接口,那么想让ThreadPool等待要这么做呢?

    ManualResetEvent:通知一个或多个正在等待的线程已发生的事件,相当于发送了一个信号

    mre.WaitOne:卡住当前主线程,一直等到信号修改为true的时候,才会接着往下跑

    //用来控制线程等待,false默认为关闭状态
    ManualResetEvent mre = new ManualResetEvent(false);
    ThreadPool.QueueUserWorkItem(p =>
    {
      DoSomethingLong("控制线程等待");
      Console.WriteLine($"线程池线程,带参数,{Thread.CurrentThread.ManagedThreadId}");
      //通知线程,修改信号为true
      mre.Set();
    });
    //阻止当前线程,直到等到信号为true在继续执行
    mre.WaitOne();
    //关闭线程,相当于设置成false
    mre.Reset();
    Console.WriteLine("信号被关闭了");
    ThreadPool.QueueUserWorkItem(p =>
    {
      Console.WriteLine("再次等待");
      mre.Set();
    });
    mre.WaitOne();

     

    4:Task,这也是现在用的最多的了,毕竟是比较新的玩意

    关于Task就介绍几个最常用的接口,基本上就够用了(一招鲜吃遍天)

    Task.Factory.StartNew:创建一个新的线程,Task的线程也是从线程池中拿的(ThreadPool)

    Task.WaitAny:等待一群线程中的其中一个完成,这里是卡主线程,一直等到一群线程中的最快的一个完成,才能继续往下执行(20年前我也差点被后面的给追上),打个简单的比方:从三个地方获取配置信息(数据库,config,IO),同时开启三个线程来访问,谁快我用谁。

    Task.WaitAll:等待所有线程完成,这里也是卡主线程,一直等待所有子线程完成任务,才能继续往下执行。

    Task.ContinueWhenAny:回调形式的,任意一个线程完成后执行的后续动作,这个就跟WaitAny差不多,只不是是回调形式的。

    Task.ContinueWhenAll:回调形式的,所有线程完成后执行的后续动作,理解同上

    //线程容器
    List<Task> taskList = new List<Task>(); Stopwatch watch = new Stopwatch(); watch.Start(); Console.WriteLine("************Task Begin**************"); //启动5个线程 for (int i = 0; i < 5; i++) {   string name = $"Task:{i}";   Task task = Task.Factory.StartNew(() =>   {     DoSomethingLong(name);   });   taskList.Add(task); } //回调形式的,任意一个完成后执行的后续动作 Task any = Task.Factory.ContinueWhenAny(taskList.ToArray(), task => {   Console.WriteLine("ContinueWhenAny"); }); //回调形式的,全部任务完成后执行的后续动作 Task all = Task.Factory.ContinueWhenAll(taskList.ToArray(), tasks => {   Console.WriteLine($"ContinueWhenAll{tasks.Length}"); }); //把两个回调也放到容器里面,包含回调一起等待 taskList.Add(any); taskList.Add(all); //WaitAny:线程等待,当前线程等待其中一个线程完成 Task.WaitAny(taskList.ToArray()); Console.WriteLine("WaitAny"); //WaitAll:线程等待,当前线程等待所有线程的完成 Task.WaitAll(taskList.ToArray()); Console.WriteLine("WaitAll"); Console.WriteLine($"************Task End**************{watch.ElapsedMilliseconds},{Thread.CurrentThread.ManagedThreadId}");

    关于Task其实只要熟练掌握这几个接口,基本上就够了,完全够用

     

    5:Parallel(并行编程):其实就是在Task基础上又进行了一次封装,当然,Parallel也有很酷炫功能

    问:首先是为什么叫做并行编程,跟Task有什么区别?

    答:使用Task开启子线程的时候,主线程是属于空闲状态,并不参与执行(我是领导,有5件事情需要处理,我安排了5个手下去做,而我本身就是观望状态 PS:到底是领导。),Parallel开启子线程的时候,主线程也会参与计算(我是领导,我有5件事情需要处理,我身为领导,但是我很勤劳,所以我做了一件事情,另外四件事情安排4个手下去完成),很明显,减少了开销。

    你以为Parallel就只有这个炫酷的功能?大错特错,更炫酷的还有;

    Parallel可以控制线程的最大并发数量,啥意思?就是不管你是脑溢血,还是心脏病,还是动脉大出血,我的手术室只有2个,同时也只能给两个人做手术,做完一个在进来一个,同时做完两个,就同时在进来两个,多了不行。

    当前,你想使用Task,或者ThreadPool来实现这样的效果也是可以的,不过这就需要你动动脑筋了,当然,有微软给封装好的接口直接使用,岂不美哉?

     //并行编程 
    Console.WriteLine($"*************Parallel start********{Thread.CurrentThread.ManagedThreadId}****");
    //一次性执行1个或多个线程,效果类似:Task WaitAll,只不过Parallel的主线程也参与了计算
    Parallel.Invoke(
      () => { DoSomethingLong("Parallel`1"); },
      () => { DoSomethingLong("Parallel`2"); },
      () => { DoSomethingLong("Parallel`3"); },
      () => { DoSomethingLong("Parallel`4"); },
      () => { DoSomethingLong("Parallel`5"); });
    //定义要执行的线程数量
    Parallel.For(0, 5, t =>
    {
      int a = t;
      DoSomethingLong($"Parallel`{a}");
    });
    {
      ParallelOptions options = new ParallelOptions()
      {
        MaxDegreeOfParallelism = 3//执行线程的最大并发数量,执行完成一个,就接着开启一个
      };
      //遍历集合,根据集合数量执行线程数量
      Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, t =>
      {
        DoSomethingLong($"Parallel`{t}");
      });
    }
    {
      ParallelOptions options = new ParallelOptions()
      {
        MaxDegreeOfParallelism = 3//执行线程的最大并发数量,执行完成一个,就接着开启一个
      };
      //遍历集合,根据集合数量执行线程数量
      Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, (t, status) =>
      {
        //status.Break();//这一次结束。
        //status.Stop();//整个Parallel结束掉,Break和Stop不可以共存
        DoSomethingLong($"Parallel`{t}");
      });
    }
    Console.WriteLine("*************Parallel end************");

    可以多了解一下Parallel的接口,里面的用法有很多,我这里也只是列出了比较常用的几个,像ParallelOptions类可以控制并发数量,当然,不可以也可以,Parallel的重载方法很多,可以自己看看

     

    6:线程取消,异常处理

    关于线程取消这块呢,要记住一点,线程只能自身终结自身,也就是除非我自杀,否则你干不掉我,不要妄想通过主线程来控制计算中的子线程。

    关于线程异常处理这块呢,想要捕获子线程的异常,最好在子线程本身抓捕,也可以使用Task的WaitAll,不过这种方法不推荐,如果用了,别忘了一点,catch里面放的是AggregateException,不是Exception,不然捕捉不到别怪我

    TaskFactory taskFactory = new TaskFactory();
    //通知线程是否取消
    CancellationTokenSource cts = new CancellationTokenSource();
    List<Task> taskList = new List<Task>();
    //想要主线程抓捕到子线程异常,必须使用Task.WaitAll,这种方法不推荐
    //想要捕获子线程的异常,最好在子线程本身抓捕
    //完全搞不懂啊,看懵逼了都
    try
    {
      for (int i = 0; i < 40; i++)
      {
        int name = i;
        //在子线程中抓捕异常
        Action<object> a = t =>
        {
          try
          {
            Thread.Sleep(2000);
            Console.WriteLine(cts.IsCancellationRequested);
            if (cts.IsCancellationRequested)
            {
              Console.WriteLine($"放弃执行{name}");
            }
            else
            {
              if (name == 1)
              {
                throw new Exception($"取消了线程{name}{t}");
              }
              if (name == 5)
              {
                throw new Exception($"取消了线程{name}{t}");
              }
              Console.WriteLine($"执行成功:{name}");
            }
          }
          catch (Exception ex)
          {
            //通知线程取消,后面的都不执行
            cts.Cancel();
            Console.WriteLine(ex.Message);
          }
        };
      taskList.Add(taskFactory.StartNew(a, name, cts.Token));
      }  
      Task.WaitAll(taskList.ToArray());
    }
    catch (AggregateException ex)
    {
      foreach (var item in ex.InnerExceptions)
      {
        Console.WriteLine(item.Message);
      }
    }
    

     

    7:给线程上个锁,防止并发的时候数据丢失,覆盖等

    //先准备三个公共变量
    private static int iIndex;
    private static object obj = new object();
    private static List<int> indexList = new List<int>();
    List<Task> taskList = new List<Task>();
    //开启1000个线程
    for (int i = 0; i < 10000; i++)
    {
      int newI = i;
      Task task = Task.Factory.StartNew(() =>
      {
      iIndex += 1;
      indexList.Add(newI);
      });
      taskList.Add(task);
    }
    //等待所有线程完成
    Task.WhenAll(taskList.ToArray());
    //打印结果
    Console.WriteLine(iIndex);
    Console.WriteLine(indexList.Count);

    给你们看一下我这里运行三次打印出的结果

    第一次:

    第二次:

    第三次:

    看的出来,还是比较稳定的只丢失那么几个数据的对吧?

    为啥会这样呢?因为当两个线程同时操作一个数据的时候,你觉得会以谁的操作为标准来保存?就好像我们操作IO的时候,不允许多多个线程同时操作一个IO,因为计算机不知道以谁的标准来保存修改

    下面给它加个锁,稍微修改一下代码:

    List<Task> taskList = new List<Task>();
    //开启1000个线程
    for (int i = 0; i < 10000; i++)
    {
        int newI = i;
        Task task = Task.Factory.StartNew(() =>
        {
            //加个锁
            lock (objLock)
            {
                iIndex += 1;
                indexList.Add(newI);
            }
        });
        taskList.Add(task);
    }
    //等待所有线程完成
    Task.WhenAll(taskList.ToArray());
    //打印结果
    Console.WriteLine(iIndex);
    Console.WriteLine(indexList.Count);

    在看一下运行结果:

    线程锁会破坏线程,增加耗时,降低效率,不要看效果很爽就到处加锁,万一你钥匙丢了呢(死锁);

    共有变量:都能访问的局部变量/全局变量/数据库的值/硬盘文件,这些都有可能是数据不安全的,如果有需求,还是得加个锁

    如果确实是要用到锁,推荐大家就使用一个:私有的,静态的,object类型的变量就可以了;

     

    漏掉了一个方法,我给补上:

    /// <summary>
    /// 一个比较耗时的方法,循环1000W次
    /// </summary>
    /// <param name="name"></param>
    public static void DoSomethingLong(string name)
    {
        int iResult = 0;
        for (int i = 0; i < 1000000000; i++)
        {
            iResult += i;
        }
        Console.WriteLine($"********************{name}*******{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}****{Thread.CurrentThread.ManagedThreadId}****");
    }

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-4 13:15 , Processed in 0.078282 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表