多线程学习——并发流程的控制

多线程学习——并发流程的控制

Scroll Down

多线程学习——并发流程的控制

1.并发流程控制

我们使用多线程在实际开发中肯定是有业务的需求,比如有些资源需要先得到,那么就要求线程的顺序应该有我们控制,而不是交给CPU调度器,java中采用调度器来控制线程的执行顺序

2.常见的调度器

类                作用 							说明
Semaphore 	信号量,可以通过控制“许可证”的数量,来保证线程之间的配合  线程只有在拿到许可证后才能继续运行。相比于其他同步器,更灵活
CyclicBarrier   线程会等待,直到等到足够多的线程达到事先规定的数目。一旦   适合线程之间相互等处理就绪的常见
		达到触发条件,就可以进行下一步动作

Phaser         和CyclicBarrier类似,但是计数可变				JAVA7中加入

CountDownLatch 和CyclicBarrier类似,数量递减到0,触发使用			不可重复使用

Exchanger 	让两个线程在适合时交换对象				适用于当两个线程工作在同一个类的不同实例上时,用于交换数据

Condition	可以控制线程的“等待”和唤醒				是object.wait()的升级版

3.CountDownLatch(倒数栅栏)

1.用途

购物拼团,资源汇总,或者模拟并发

2.原理

倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作

3.重要方法:

CountDownLatch(int count) 
          构造一个用给定计数初始化的 CountDownLatch
void await() 
          使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。 
void countDown() 
          递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 

4.示例

用途一:多个线程结束完执行另外一个线程


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName: CountDownLatchDemo
 * @Description: 使用一 ——到达条件开始执行某个线程
 * @author: XP
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i <5 ; i++) {
            int finalI = i;
            Runnable runnable = () -> {
                try {
                    System.out.println(finalI + 1 + "执行任务");
                    Thread.sleep(5000);
                    System.out.println(finalI + 1 + "执行任务结束");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        }
        countDownLatch.await();
        System.out.println("任务结束");
        executorService.shutdown();
    }
}

用途二:多个线程同时等待一个指令

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName: CountDownLatchDemo2
 * @Description: 多个线程同时等待一个指令
 * @author: XP
 */
public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            final int no=i;
            Runnable runnable=()->{
                try {
                    System.out.println(no + 1 + "接受任务");
                    countDownLatch.await();
                    System.out.println(no + 1 + "执行任务");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        }
        Thread.sleep(5000);
        System.out.println("冲冲");
        countDownLatch.countDown();


    }

}

用途三:同时使用 1的是统一开始 多的是最终汇总


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName: CountDownLatchDemo3
 * @Description: 同时使用 1的是统一开始 多的是最终汇总
 * @author: XP
 */
public class CountDownLatchDemo3 {
    public static void main(String[] args) throws InterruptedException {
        //同时进行的控制器
        CountDownLatch wait=new CountDownLatch(1);
        //最终汇总的控制器
        CountDownLatch end=new CountDownLatch(5);

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            final int intI=i;
            Runnable runnable=()->{
                System.out.println(intI+1+"领取到任务");
                try {
                    Thread.sleep(2000);
                    wait.await();
                    System.out.println(intI+1+"开始任务");
                    Thread.sleep(2000);
                    System.out.println(intI+1+"执行完任务");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    end.countDown();
                }
            };
            executorService.submit(runnable);
        }
        Thread.sleep(5000);
        System.out.println("冲冲冲");
        wait.countDown();
        try {
            end.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("都完事了");

    }
}

5.注意事项

1.可以多个线程等待多个线程完成执行后,在同时执行
2.CountDownLatch是不能够重用的,需要重新计数的话可以考虑CyclicBarrier或者重新定义CountDownLatch

2.Semaphore(信号量)

1.用途

可以用来限制或管理数量有限资源的使用情况,对于耗时的操作,限制同时操作

2.原理

Semaphore的作用是维护一个“许可证”的计数,线程可以“获取”许可证,那信号量剩余的许可证就减一,线程也可以“释放”一个许可证,那信号量剩余的许可证就加一,当信号量所拥有的数量为0,那么下一个还想获取许可证的线程,就需要等待,直到有线程释放了许可证。

3.使用流程

1.初始化Semaphore并指定许可证的数量
2.在需要被现在的代码前加acquire()或者acquireUninterruptibly()方法
3.在任务执行结束后,调用release()释放许可证

4.常用方法

Semaphore(int permits) 
          创建具有给定的许可数和非公平的公平设置的 Semaphore。 
Semaphore(int permits, boolean fair) 
          创建具有给定的许可数和给定的公平设置的 Semaphore。是否公平,如果公平就是FIFO,否则允许插队 
void acquire() 
          从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。 可以响应中断
void acquireUninterruptibly() 
          从此信号量中获取许可,在有可用的许可前将其阻塞。不可以响应中断
boolean tryAcquire() 
          仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 
boolean tryAcquire(long timeout, TimeUnit unit) 
          如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。 
void release() 
          释放一个许可,将其返回给信号量。 

5.示例


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @ClassName: SemaphoreDemo
 * @Description: 演示信号量的使用
 * @author: XP
 */
public class SemaphoreDemo {
    private static Semaphore semaphore =new Semaphore(5);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 50; i++) {
            final int finaI=i+1;
            Runnable runnable=()->{
                try {
                    //拿到许可证
                    semaphore.acquire();
                    Thread.sleep(5000);


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                semaphore.release();
                System.out.println(finaI+"释放许可证");
            };
            executorService.submit(runnable);
        }


    }
}

6.注意

1.可以一次性获取或者释放多个许可证:列如方法A耗时多 那么分配给他的许可证就多 方法B耗时少 分配给他的许可证少 可以灵活安排数字保证方法只运行一个
2.最好释放和拿到的许可证的一致,避免程序卡死
3.最好将Semaphore的策略设置为公平,避免线程饥饿
4.并不是线程A拿到许可证,必须有A释放,也可以由线程B释放

3.Condition(等待和唤醒)

1.用途

线程1需要等待某个条件的时候,他去执行condition.await()方法,一旦执行了await()方法,线程就会进如阻塞状态
然后通常会有另外一个线程,去执行condition.signal()方法,这时候会把刚刚阻塞的线程变成runnable状态。

2.常用方法

 void signal() 
          唤醒一个等待线程。 
 void signalAll() 
          唤醒所有等待线程。 

3.示例


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName: ConditionDemo1
 * @Description: 演示condition的基本应用
 * @author: XP
 */
public class ConditionDemo1 {
    private static Lock lock=new ReentrantLock();
    private static Condition condition=lock.newCondition();

    void methodA(){
        lock.lock();
        try {
            System.out.println("A开始执行任务");
            condition.await();
            System.out.println("A执行完成任务");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
    void methodB(){
        lock.lock();
        try {
            System.out.println("B开始执行任务");
            Thread.sleep(5000);
            condition.signal();
            System.out.println("B执行完成任务");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                conditionDemo1.methodB();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        Thread.sleep(1000);
        conditionDemo1.methodA();

    }



}

生产者消费者:


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName: ConditionDemo2
 * @Description: 实现生产者消费者
 * @author: XP
 */
public class ConditionDemo2 {
    private  Integer size=10;
    private static Lock lock=new ReentrantLock();
    //仓库
    private  BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(size);
    //生产者的条件
    private static Condition notFull =lock.newCondition();
    //消费者的条件
    private static Condition notEmpty =lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Consumer consumer = conditionDemo2.new Consumer();
        Producer producer = conditionDemo2.new Producer();
        producer.start();
        consumer.start();
    }
     class Consumer extends Thread{

         @Override
         public void run() {
             this.consumed();
         }
         private void consumed(){
             while (true){
                 lock.lock();

                 try {
                     while (blockingQueue.size()==0){
                         System.out.println("仓库里没有东西 等待生产");
                         notEmpty.await();
                     }
                     //东西进行消费
                     blockingQueue.poll();
                     //唤醒生产者进行生产
                     notFull.signal();
                     System.out.println("从队列里面取走了一个元素,剩余"+blockingQueue.size()+"个");
                 }catch (Exception e){
                    e.printStackTrace();
                 }finally {
                     lock.unlock();
                 }
             }
         }
     }
     class  Producer extends Thread{

        @Override
        public void run() {
            this.consumed();
        }
        private void consumed(){
            while (true){
                lock.lock();

                try {
                    while (blockingQueue.size()==size){
                        System.out.println("仓库里已经满了 等待消费");
                        notFull.await();
                    }
                    //进行生产
                    blockingQueue.offer(1);
                    //唤醒消费者进行消费
                    notEmpty.signal();
                    System.out.println("我生了"+blockingQueue.size()+"个");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }
    }

}


4.注意事项

1.condition建立在锁上,condition是等同于以前synchronized的object的wait notify
2.await方法会自动释放持有的lock锁,和object.wait一样,不需要自己手动释放锁
3.调用await时候,必须持有锁,否则会报异常

4.CyclicBarrier(循环栅栏)

1.用途

1.CyclicBarrier循环栅栏和countdownlatch相似,都能阻塞一组线程
2.当有大量线程相互配合时候分别计算不同任务,并且需要统一汇总的时候,可以使用CyclicBarrier,他可以构造一个集结点,当线程执行完自己的任务就会在集结点等待,直到所有线程到达集结点,那么栅栏就撤销,所有线程统一出发,继续执行剩下的任务

2.常用方法

CyclicBarrier(int parties) 
          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 
CyclicBarrier(int parties, Runnable barrierAction) 
          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 
int await() 
          在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。 

3.实例


import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @ClassName: CyclicBarrierDemo1
 * @Description: 演示线程统一集合 汇入点
 * @author: XP
 */
public class CyclicBarrierDemo1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {
            System.out.println("冲冲");
        });
        for (int i = 0; i <5 ; i++) {

            new Thread(new Task(cyclicBarrier,i)).start();
        }
    }

    static class Task implements Runnable{
        private  CyclicBarrier  cyclicBarrier;
        private int no;

        public Task(CyclicBarrier cyclicBarrier, int no) {
            this.cyclicBarrier = cyclicBarrier;
            this.no = no;
        }

        @Override
        public void run() {
            System.out.println("线程"+no+"前往集合点");
            try {
                Thread.sleep((long) (Math.random()*1000));
                System.out.println("线程"+no+"达到集合点,等待中");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}


4.与countdownlatch区别

countdownlatch:是针对事件的 每个执行了就计数等待 不可以循环
CyclicBarrier:是针对线程的 每个线程执行等待 可以循环

5.总结

并发流程控制:
CountDownLatch:条件等待 满足一定条件执行一个方法 或者统一等待执行一个方法 不可以循环使用
Semaphore:控制同时执行的数量
Condition:结合lock 适用于线程的相互配合
CyclicBarrier:线程等待 满足一定条件统一执行 可循环使用

祝君好梦!