Semaphore:信号灯
特点:控制每次执行的线程数,达到控制线程并发的效果
- 测试代码
package com.zhiwei.thread;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;/** * 信号灯:当线程空闲时自动去执行阻塞的线程,实现运行最优化 */public class SemaphoreTest {public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); // 定义信号灯,一次最多能处理3个线程 Semaphore sp = new Semaphore(3); for (int i = 0; i < 20; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { // 获取信号,信号灯处理阻塞线程,最多允许3个线程访问 sp.acquire(); System.out.println(Thread.currentThread().getName() + ":进入信号灯,还有" + sp.availablePermits() + "个信号"); Thread.sleep(new Random().nextInt(2000)); // 回收信号灯信号,供别的线程使用 sp.release(); System.out.println(Thread.currentThread().getName() + ":离开信号灯,还有" + sp.availablePermits() + "个信号"); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown();}}
效果:
CyclicBarrier
作用:控制线程运行的任务总量同步,例如等待所有人完成工作才可以下班
测试代码
package com.zhiwei.thread;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CyclicBarrierTest { public static void main(String[] args) { // 思想:只有各个子任务都完成了采取执行下一步,如果有线程提前完成则等待 ExecutorService threadPool = Executors.newCachedThreadPool(); // 规定总的任务量:只有全部完成才会进行下一步处理 CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + ":完成分任务,剩余任务:" + (2 - cb.getNumberWaiting())); //如果前面2个线程阻塞 + 正在运行的线程 = 3,表明总任务完成 if (cb.getNumberWaiting() == 2) { System.out.println("恭喜,总任务已完成!"); } //分任务完成则等待,直到搜索的任务都完成,才执行await后面的代码 cb.await(); } catch (Exception e) { e.printStackTrace(); } } }); } threadPool.shutdown();}}
效果:
CountDownLatch 发令枪
CountDownLatch:可理解为所有线程都就绪之后就一起执行,类似旅游跟团,只有所有人都到了才可以触发
测试代码:
package com.zhiwei.thread;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CountDownLatchTest { public static void main(String[] args) { // 缓存线程池:自动创建线程执行任务,如果线程执行完成任务则保存,供下次使用,如果线程不够则动态创建 ExecutorService threadPool = Executors.newCachedThreadPool(); // 表示将完成3个任务量:任务计数器:多线程完成一些列操作 CountDownLatch ct = new CountDownLatch(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); ct.countDown(); // 减1 System.out.println(Thread.currentThread().getName() + "准备分任务,剩余任务:" + ct.getCount()); // 如果ct计数器不为0则阻塞,为0 则一起执行 ct.await(); System.out.println(Thread.currentThread().getName() + "完成分任务"); } catch (Exception e) { e.printStackTrace(); } } }); } if (ct.getCount() == 0) { System.out.println("恭喜,总任务已完成!"); } threadPool.shutdown();}}
效果:
ArrayBlockingQueue
ArrayBlockingQueue: JDK内部提供的阻塞队列,能够保证线程安全
主要同步方法:
put方法
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
测试代码:
package com.zhiwei.thread;import java.util.concurrent.ArrayBlockingQueue;/** * 阻塞队列:可用于处理生产消费的问题 * * 实现机制:put/take利用重入锁ReentrantLock实现同步效果 */public class ArrayBlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueueabq = new ArrayBlockingQueue (3); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(1000); abq.put("Hello Java World"); System.out.println(Thread.currentThread().getName()+":放入数据,剩余数据:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(10000); abq.take(); System.out.println(Thread.currentThread().getName()+":取出数据,剩余数据:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); }}
效果:
Exchanger
作用:两个线程之间交换数据,不过要两个线程都先拿出数据,然后才能进行数据交换
测试代码
package com.zhiwei.thread;import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** *特点: 一定要等双方将数据都拿出来后才能交换(只能是两个线程) * @author Yang ZhiWei * */public class ExchangerTest {public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); Exchangerexchanger = new Exchanger (); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+":交换数据:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+":收到数据:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"交换数据:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+"收到数据:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown();}}
效果: