前言
最近在解析银联对账单,因为银联对账单很大,我们按照一定大小切割银联对账单,然后多线程读取银联对账单,用到了CountDownLatch,每个线程解析结束切割好银联对账,CountDownLatch减一。在与乐刷对接读取回盘文件,回盘可能存在多个,用到了CyclicBarrier,每个回盘文件读取结束,CyclicBarrier加一,读取结束,进行后续业务逻辑处理。
读取银联对账单多线程主要代码:
//切割银联对账单个数作为CountDownLatch的个数 CountDownLatch platformLatch = new CountDownLatch(csvFiles.size()); //银联对账单切割的个数,也就是提交给线程池任务个数 for (File file : csvFiles) { //解析银联对账单的主要逻辑类LoadDataThread实现Runnable LoadDataThread importer = new LoadDataThread(dbName, date, platformLatch, new File(file.getAbsolutePath()) , CharsetEnum.ALIPAY_PLATFORM_CSV_CHARSET.getCharset() , fileType, orderFinanceDOList, DataSetContants.aliPlatformSetMold, integerMap); //任务提交给线程处理 platformFileExecutor.execute(importer); } //线程池处理超时,异常抛出 if (!platformLatch.await(ComConstans.threadTimeout, TimeUnit.MINUTES)) { return ResultModel.commonError("loadPayData >> 加载平台方支付宝数据,线程等待超时,超过" + ComConstans.threadTimeout + "分钟"); }
CyclicBarrier和CountDownLatch区别
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
CyclicBarrier和CountDownLatch的API
CountDownLatch类只提供了一个构造器
public CountDownLatch(int count) { }; //参数count为计数值
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void countDown() { }; //将count值减1
CyclicBarrier提供2个构造器:
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
CyclicBarrier中最重要的方法就是await方法:
public int await() throws InterruptedException, BrokenBarrierException { };//挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };//让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务
CyclicBarrier和CountDownLatch的用例
public class CountDownLatchExampleTwo { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(3); int size = 3; for (int i = 0; i < size; i++) { int index = i; pool.submit(() -> { try { TimeUnit.SECONDS.sleep(index); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + index + "位运动员准备好了"); }); } countDownLatch.await(); System.out.println(size + "位运动员都准备好了,可以起跑!"); } }
输出结果:
第1位运动员准备好了
第2位运动员准备好了
3位运动员都准备好了,可以起跑!
第0位运动员准备好了
public class CyclicBarrierExampleTwo { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); int size = 3; CyclicBarrier cyclicBarrier = new CyclicBarrier(size, () -> { System.out.println(size + "位运动员都准备好了,可以起跑!"); pool.shutdownNow(); }); for (int i = 0; i < size; i++) { int index = i; pool.submit(() -> { try { TimeUnit.SECONDS.sleep(index); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + index + "位运动员准备好了"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } } }
输出结果:
第0位运动员准备好了
第1位运动员准备好了
第2位运动员准备好了
3位运动员都准备好了,可以起跑!
参考文章
参考文献地址:
CountDownLatch CyclicBarrier 原理 总结
CountDownLatch CyclicBarrier 原理 总结