wordpress主题 点赞海淀区seo多少钱
1、CyclicBarrier 介绍
从字面上看 CyclicBarrier 就是 一个循环屏障,它也是一个同步助手工具,它允许多个线程
在执行完相应的操作后彼此等待共同到达一个屏障点。
CyclicBarrier可以被循环使用,当屏障点值变为0之后,可以在接下来的的使用中重置屏障点
值,而无需重新定义一个CyclicBarrier。
Cyclic循环:所有线程释放后,屏障点的数值可以被重置
Barrier屏障:让一个或多个线程到达一个屏障点,会被阻塞;屏障点会有一个数值,当每有一
个线程到达屏障点时,屏障点数值就会减1操作,并且线程阻塞在屏障点,当屏
障点数值变为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。
在释放屏障点之后,可以先执行一个任务,然后让唤醒阻塞的线程继续执行后
续任务。
CyclicBarrier是一种同步机制,允许一组线程之间互相等待,现成达到屏障点其实是基于
await方法在屏障点阻塞;等待所有线程到达屏障点后再统一唤醒
CyclicBarrier 并不是基于AQS来实现的,其是基于ReentrantLock锁的机制来实现对屏障点的
“减减” 操作以及线程的挂起。
2、CyclicBarrier核心属性&构造方法
public class CyclicBarrier {/*** 内部类*/private static class Generation {//该类用来标记是否被中断过//用来表示阻塞时当前party有没有被强制中断boolean broken = false;//某个线程由于执行了await()方法进入了阻塞状态,若该线程被执行了中断操作,那么 broken 得值就会变为true}/** 保证操作屏障值原子性的锁 */private final ReentrantLock lock = new ReentrantLock();/*** 用于阻塞线程的条件变量:若有未到party的线程,那么等待该条件变量上* 基于当前的Condition 实现线程的挂起和唤醒* */private final Condition trip = lock.newCondition();/*** 屏障数值,与count初始值一致* todo 注意:不会对 parties 进行操作,因为 parties 是final修饰,初始化后不能修改* */private final int parties;//计数器得值/** 当屏障数值count到达0时,优先执行当前任务,然后再会唤醒所有等待的线程执行后续任务* */private final Runnable barrierCommand;/*** 表示当前party是否被中断过* */private Generation generation = new Generation();/**** 屏障数值,初始值与 parties 相等,当每有一个线程到达屏障点时,就会执行count--操作*/private int count;//构造方法/*** * @param parties 屏障点数值* * @param barrierAction 当屏障点数值达到0时,优先执行该 barrierAction 任务* 若barrierAction 为null,则直接执行唤醒的线程* */public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;//到达屏障点后优先执行的任务this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}
}
3、CyclicBarrier应用场景及示例代码
3.1、将一个任务分成若干个并行的子任务,当所有的子任务全部执行结束后,再继续执行后边的
工作。
从这一点上看,CyclicBarrier 功能与CountDownLatch 的功能差不多,但他们运行方式上却
有很大区别;在 CyclicBarrier 中,每个子任务完成后,子线程调用 CyclicBarrier的await方法
使当前子线程进入阻塞状态,直到其他所有子线都完成了任务后,他们才能退出阻塞;
注意:这里CyclicBarrier并没有干预主线程的运行,所以主线程的 “运行/阻塞” 需要我们来
手动干预。所以 CyclicBarrier 更像是把“任务分片”而不是计数器,当每个分片任务
完成后都会阻塞在“屏障点”,
把前边CountDwonLatch 的示例使用 CyclicBarrier 来实现,比较 CountDwonLatch 与
CyclicBarrier 在相同场景下使用的不同,示例代码如下:
public class CylicBarrierExample1 {public static void main(String[] args) {//先获取商品编号列表int[] products = getProductsByCategoryID();//使用Stream 流,将商品编号列表中的每个商品转换为 ProductPriceList<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());//定义 CyclicBarrier ,并设置子任务数CyclicBarrier barrier = new CyclicBarrier(list.size());//存放线程任务的集合final List<Thread> threads = new ArrayList<>();list.forEach(pp -> {//对每个商品都创建一个子任务来计算Thread thread = new Thread(() -> {System.out.println(pp.getProdID()+" -> start calculate price.");try {//模拟业务逻辑耗时TimeUnit.SECONDS.sleep(current().nextInt(10));if(pp.prodID %2 == 0){pp.setPrice(pp.prodID*0.9D);}else {pp.setPrice(pp.prodID*0.71D);}System.out.println(pp.getProdID()+" -> price calculate");} catch (InterruptedException e) {e.printStackTrace();}finally {try {//当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier pointbarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});threads.add(thread);thread.start();});//遍历所有的子任务线程,让主线程等待所有的子任务线程结束threads.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("**************************************");System.out.println("All of price calculate finished!");list.forEach(System.out::println);}//获取商品编号列表private static int[] getProductsByCategoryID(){//商品列表编号为从1,10的数字return IntStream.rangeClosed(1,10).toArray();}//定义商品类,有2个成员变量:商品编号和商品价格private static class ProductPrice{private final int prodID;//商品编号private double price;//商品价格public ProductPrice(int prodID){this(prodID,-1);}public ProductPrice(int prodID,double price){this.prodID = prodID;this.price = price;}public int getProdID(){return this.prodID;}public void setPrice(double price){this.price = price;}@Overridepublic String toString() {return "ProductPrice{" +"prodID=" + prodID +", price=" + price +'}';}}
}
上边这段代码,有个需要优化的地方,即:既然 CyclicBarrier 中所有线程都会阻塞在屏
障点,所有任务都达到屏障点时才会往下执行,那么我们可以把主线程也作为一个任务线程
,即在定义 CyclicBarrier 屏障点数值时,在原有的数值上加1,然后在主线程中执行
CyclicBarrier的await方法,这样就不用让主线程等待每个子线程执行完成了
优化代码如下:
public static void main(String[] args) {//先获取商品编号列表int[] products = getProductsByCategoryID();//使用Stream 流,将商品编号列表中的每个商品转换为 ProductPriceList<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());//定义 CyclicBarrier ,并设置子任务数CyclicBarrier barrier = new CyclicBarrier(list.size()+1);//存放线程任务的集合final List<Thread> threads = new ArrayList<>();list.forEach(pp -> {//对每个商品都创建一个子任务来计算Thread thread = new Thread(() -> {System.out.println(pp.getProdID()+" -> start calculate price.");try {//模拟业务逻辑耗时TimeUnit.SECONDS.sleep(current().nextInt(10));if(pp.prodID %2 == 0){pp.setPrice(pp.prodID*0.9D);}else {pp.setPrice(pp.prodID*0.71D);}System.out.println(pp.getProdID()+" -> price calculate");} catch (InterruptedException e) {e.printStackTrace();}finally {try {//当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier pointbarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});threads.add(thread);thread.start();});//主线程也阻塞在屏障点barrier.await();System.out.println("**************************************");System.out.println("All of price calculate finished!");list.forEach(System.out::println);}
3.2、CyclicBarrier 循环使用
使用 CyclicBarrier 模拟旅游时导游清点人数的场景
大家报团旅游时,为了安全和避免掉队的,每次登上大巴,大巴启动前,导游都会清点人数
;在到达一个景点后,游客下车后,导游也会重复清点人数,保证所有的人都下来了后,才
会通知大巴师傅去停车场停车,下边写个demo简单模拟下这个场景,
/*** CylicBarrier 的循环使用*/
public class CylicBarrierExample2 {public static void main(String[] args) throws BrokenBarrierException, InterruptedException {//定义 CyclicBarrierfinal CyclicBarrier barrier = new CyclicBarrier(11);//创建10个线程for(int i=0;i<10;i++){//定义游客子线程,传入游客编号和 barriernew Thread(new Tourist(i,barrier)).start();}//主线程也进入阻塞,等待所有游客都上车barrier.await();System.out.println("Tour Guilder: all of Tourist get on the bus");//主线程进入阻塞,所有游客都下车barrier.await();System.out.println("Tour Guilder: all of Tourist get OFF the bus");}//定义游客线程private static class Tourist implements Runnable{private final int touristID;private final CyclicBarrier barrier;private Tourist(int touristID,CyclicBarrier barrier){this.touristID = touristID;this.barrier = barrier;}@Overridepublic void run() {System.out.printf("Tourist: %d by bus\n",touristID);//上车耗时this.spendSeveralSeconds();//上车后等待其他同伴this.waitAndPrint("Tourist: %d Get on the bus, and wait other people");//todo 注意:所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrier// 所以这里不需要手动调用reset()重置方法//下车耗时this.spendSeveralSeconds();//下车后等待其他同步全部下车this.waitAndPrint("Tourist: %d Get OFF the bus, and wait other people OFF");}//模拟乘客上车耗时private void spendSeveralSeconds(){try {TimeUnit.SECONDS.sleep(current().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}}//模拟上车后等待其他同伴private void waitAndPrint(String msg){System.out.printf(msg,touristID);System.out.println();try {//所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrierbarrier.await();} catch (InterruptedException |BrokenBarrierException e) {e.printStackTrace();}}}
}
4、CyclicBarrier 常用方法解析
在 CyclicBarrier 中常用方法就2个,即:await 和带超时时间的await ,但真正执行业务
的方法其实只有 doawait 一个方法,如下图所示:
4.1、dowait(boolean timed, long nanos) 方法
dowait 方法是CyclicBarrier 的核心方法,该方法功能是先将 CyclicBarrier 计数器count减1,
然后判断减1后的count是否等于0,若等于0,则唤醒所有阻塞在屏障点的线程,并重置
CyclicBarrier;若减1后的count不等于0,则当前线程被阻塞,直到被其他线程唤醒或过了
超时时间(有超时时间的情况)
dowait 代码如下:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {//获取 Generation 对象的引用final Generation g = generation;//判断是否有现成中断if (g.broken) //表示当前party已经被中断throw new BrokenBarrierException();//有中断的线程混入其中,则干掉其他所有的线程重新开始if (Thread.interrupted()) {//判断当前执行线程是否被中断,若被中断则先调用 breakBarrier()方法,再抛出异常breakBarrier();throw new InterruptedException();}int index = --count;//计数器减1,对屏障点数据做--操作if (index == 0) { // tripped 当 count 为0 得时候,表示是最后一个线程,负责唤醒所有阻塞在条件变量上的线程,然后回调barrierCommandboolean ranAction = false;try {final Runnable command = barrierCommand;//当前任务线程//优先执行 barrierCommand 任务if (command != null)command.run();ranAction = true;//生成新的 Generation,并且直接返回nextGeneration();//进入下一个party,这时屏障值被重置了,等价与调用了reset()方法return 0;} finally {//如果 barrierCommand 方法发生了异常,则设置 broKen标志位if (!ranAction)//中断当前任务线程breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {//循环等待最后一个参与party的线程,或者被中断、等待超时try {if (!timed) //表示调用的是非超时时间的await方法,则这里也是调用Condition的不带超时时间的awaittrip.await();else if (nanos > 0L)//表示调用的是带超时时间的await方法nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {//执行到这,说明线程被中断了//g == generation:查看 generation 是否被重置//若 generation 没有被重置,且没有现成被中断,则调用 breakBarrier 方法执行线程中断后的操作if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {//表示 generation 已经被重置或者 有线程已经被中断,则表示本次CyclicBarrier已经作废,则中断当前线程// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}//执行到这里说明线程被唤醒了//查看是否因为中断唤醒,若是则抛出异常if (g.broken)throw new BrokenBarrierException();//查看当前线程是否是因为 generation 重置而被唤醒(即被reset),若是则直接返回index 数值//或者任务正常完成也会被重置if (g != generation)return index;//判断是否是因为到达超时时间被唤醒,若是则中断当前任务if (timed && nanos <= 0L) {//中断当前任务breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}private void nextGeneration() {// signal completion of last generation 唤醒屏障点阻塞中的所有线程trip.signalAll();// set up next generation 修改 count 的值使其等于构造 CyclicBarrier 时传入的parties 值count = parties;generation = new Generation();//创建新的Generation,即生成下一代party}private void breakBarrier() {generation.broken = true; //设置为中断状态count = parties;//将计数器设置为构建 CyclicBarrer 时传入得值,即重置屏障点数值counttrip.signalAll();//唤醒其他所有等待的线程}
4.2、reset() 方法
reset() 方法功能是重置CyclicBarrier
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {//干掉当前所有的线程breakBarrier(); // break the current generation//生成下一代partynextGeneration(); // start a new generation} finally {lock.unlock();}}
4.3、getNumberWaiting() 方法
该方法功能是返回正在阻塞在屏障点的线程数
public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}}