Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 499|回复: 0

javaWeb 使用线程池+队列解决"订单并发"问题

[复制链接]
  • TA的每日心情
    奋斗
    2024-11-24 15:47
  • 签到天数: 804 天

    [LV.10]以坛为家III

    2053

    主题

    2111

    帖子

    72万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    726782
    发表于 2021-5-16 16:12:18 | 显示全部楼层 |阅读模式

    解决方式:使用线程池+队列

    项目基于Spring,如果不用spring需要自己把

    ThreadPoolManager.java

    改成单例模式

     

    1.写一个Controller(Spring mvc)

    /**
     * @author HeyS1
     * @date 2016/12/1
     * @description
     */
    @Controller public class ThreadPoolController { @Autowired ThreadPoolManager tpm; @RequestMapping("/pool") public @ResponseBody Object test() { for (int i = 0; i < 500; i++) {   //模拟并发500条记录 tpm.processOrders(Integer.toString(i)); } return "ok"; } }

     

    2.线程池管理

    /**
     * @author HeyS1 * @date 2016/12/1 * @description threadPool订单线程池, 处理订单 * scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单 */ @Component public class ThreadPoolManager implements BeanFactoryAware { private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class); private BeanFactory factory;//用于从IOC里取对象 // 线程池维护线程的最少数量 private final static int CORE_POOL_SIZE = 2; // 线程池维护线程的最大数量 private final static int MAX_POOL_SIZE = 10; // 线程池维护线程所允许的空闲时间 private final static int KEEP_ALIVE_TIME = 0; // 线程池所使用的缓冲队列大小 private final static int WORK_QUEUE_SIZE = 50; // 消息缓冲队列 Queue<Object> msgQueue = new LinkedList<Object>(); //用于储存在队列中的订单,防止重复提交 Map<String, Object> cacheMap = new ConcurrentHashMap<>(); //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //System.out.println("太忙了,把该订单交给调度线程池逐一处理" + ((DBThread) r).getMsg()); msgQueue.offer(((DBThread) r).getMsg()); } }; // 订单线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); // 调度线程池。此线程池支持定时以及周期性执行任务的需求。 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); // 访问消息缓存的调度线程,每秒执行一次 // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中 final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (!msgQueue.isEmpty()) { if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { System.out.print("调度:"); String orderId = (String) msgQueue.poll(); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } // while (msgQueue.peek() != null) { // } } } }, 0, 1, TimeUnit.SECONDS); //终止订单线程池+调度线程池 public void shutdown() { //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止 System.out.println(taskHandler.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } public Queue<Object> getMsgQueue() { return msgQueue; } //将任务加入订单线程池 public void processOrders(String orderId) { if (cacheMap.get(orderId) == null) { cacheMap.put(orderId,new Object()); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } } //BeanFactoryAware @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } }

    3.线程池中工作的线程

    //线程池中工作的线程
    @Component @Scope("prototype")//spring 多例 public class DBThread implements Runnable { private String msg; private Logger log = LoggerFactory.getLogger(DBThread.class); @Autowired SystemLogService systemLogService; @Override public void run() { //模拟在数据库插入数据 Systemlog systemlog = new Systemlog(); systemlog.setTime(new Date()); systemlog.setLogdescribe(msg); //systemLogService.insert(systemlog); log.info("insert->" + msg); } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }

     

    浏览器输入地址127.0.0.1/pool

    几秒后关闭tomcat。

    模拟500条数据,订单线程池处理了117条。调度线程池处理5条

    关闭tomcat,后还有378条未处理(这里的实现需要用到spring监听器)。加起来一共500

    OK。完毕

    spring监听器,监听tomcat关闭事件:

    public class MyApplicationListener implements ApplicationListener<ApplicationEvent> { @Autowired ThreadPoolManager threadPoolManager; @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextClosedEvent) { XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource(); //防止执行两次。root application context 没有parent,他就是老大 if (x.getDisplayName().equals("Root WebApplicationContext")) { threadPoolManager.shutdown(); Queue q = threadPoolManager.getMsgQueue(); System.out.println("关闭了服务器,还有未处理的信息条数:" + q.size()); } } else if (event instanceof ContextRefreshedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStartedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStoppedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else { // System.out.println("有其它事件发生:"+event.getClass().getName()); } } }

    spring配置一下

    <bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-2-3 10:20 , Processed in 0.076753 second(s), 28 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表