java学习笔记 --21-- 并发(4)

Java / 2020-04-17

死锁

死锁:比如有三个任务,任务1正在等待任务2完成,任务2正在等待任务3完成,结果任务正在等待任务1完成,然后大家就这样一直等着,哪个任务都动不了,称为死锁


哲学家就餐问题

假设:五个哲学家一起吃饭,五筷子,圆桌子,每根筷子放在两名哲学家的中间,哲学家两个行为:思考和就餐,就餐需要使用一双(两根)筷子
每个哲学家都可能出现这种情况:拿到一根筷子,但是另一根筷子正在被使用,需要等待另一个
死锁:所有的哲学家手里都有一根筷子,都在等另一根筷子

package com.Philosophers;

/**
 * @Author redarm
 * @Date 2020/4/17 6:38 下午
 **/
public class Chopstick {

    private boolean taken = false;

    public synchronized void take() throws InterruptedException{
        while (taken){
            wait();
        }
        taken = true;
    }

    public synchronized void drop() {
        taken = false;

        notifyAll();
    }
}

定义筷子

package com.Philosophers;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/17 6:39 下午
 **/
public class Philosopher implements Runnable {

    private Chopstick left;

    private Chopstick right;

    private final int id;

    private final int ponderFactor;

    private Random random = new Random(1);

    public Philosopher(Chopstick left, Chopstick right, int id, int pronderFactor){
        this.left = left;
        this.right = right;
        this.id = id;
        this.ponderFactor = pronderFactor;
    }

    private void pauser() throws InterruptedException{
        if (ponderFactor == 0) return;
        TimeUnit.MILLISECONDS.sleep(random.nextInt(ponderFactor * 250));
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                System.out.println(this + " thinking");
                pauser();
                System.out.println(this + " grabbing right");
                right.take();
                System.out.println(this + " grabbing left");
                left.take();
                System.out.println(this + " eating");
                pauser();
                right.drop();
                left.drop();
            }
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public String toString(){
        return "Philosopher " + id;
    }
}

定义哲学家

package com.Philosophers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/17 6:47 下午
 **/
public class DeadLockingDiningPhilosophers {

    public static void main(String[] args) throws InterruptedException {
        int ponder = 0;
        int size = 5;

        ExecutorService executorService = Executors.newCachedThreadPool();

        Chopstick[] chopsticks = new Chopstick[size];

        for(int i=0;i<size;i++){
            chopsticks[i] = new Chopstick();
        }

        for(int i=0;i<size;i++){
            executorService.execute(new Philosopher(chopsticks[i],chopsticks[(i+1)%size],i,ponder));
        }

        TimeUnit.SECONDS.sleep(5);

        executorService.shutdownNow();
    }
}

五个哲学家五个筷子观察死锁情况
参数ponder为哲学家们思考的时间,ponder越大,哲学家们思考时间越长,ponder为0,哲学家们不思考
ponder越大,发生死锁的概率越小,但是仍然有概率发生
ponder为0时,很快就会发生死锁

死锁四个条件(同时满足,产生死锁):

  • 互斥条件:资源不能共享
  • 一个任务持有一个资源,并且等待另一个被别的任务持有的资源
  • 资源不能被任务抢占
  • 必须循环等待:一个任务在等待别的任务,这个任务又被另一个任务等待

预防死锁:破坏四个中的一个条件就行

第四个容易破坏

每个Philosopher都是先拿右边的,然后拿左边的,这样可能每个人都手里拿着左边的在等右边的

如果最后一个人先拿左边的,那么他就不会阻止第一个人拿他左边的筷子

package com.Philosophers;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/17 6:47 下午
 **/
public class DeadLockingDiningPhilosophers {

    public static void main(String[] args) throws InterruptedException {
        int ponder = 0;
        int size = 5;

        ExecutorService executorService = Executors.newCachedThreadPool();

        Chopstick[] chopsticks = new Chopstick[size];

        for(int i=0;i<size;i++){
            chopsticks[i] = new Chopstick();
        }

        for(int i=0;i<size;i++){
            if (i< (size - 1)){
                executorService.execute(new Philosopher(chopsticks[i],chopsticks[i+1],i,ponder));
            } else {
                executorService.execute(new Philosopher(chopsticks[0],chopsticks[i],i,ponder));
            }
        }

        TimeUnit.SECONDS.sleep(5);

        executorService.shutdownNow();
    }
}

让最后一个人先拿左边的筷子,就不会产生死锁


新类库中的构件

CountDownLatch(倒数计时)

同步一个或多个任务,使他们等待别的任务完成

可以向CountDownLatch对象设置一个初始值,调用countDown减小这个计数值

在CountDownLatch对象上调用wait的方法都会阻塞,直到对象的初始值降为零,调用countDown的方法不会阻塞

package com.countDownLatchTest;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/18 6:01 下午
 **/
public class TaskPortion implements Runnable{

    private static int counter = 0;
    private final int id = counter ++;
    private static Random random = new Random(1);
    private final CountDownLatch latch;

    public TaskPortion(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            doWork();
            latch.countDown();
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public void doWork() throws InterruptedException{
        TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
        System.out.println(this + " completed");
    }

    public String toString(){
        return String.format("%1$-3d",id);
    }
}

package com.countDownLatchTest;

import java.util.concurrent.CountDownLatch;

/**
 * @Author redarm
 * @Date 2020/4/18 6:07 下午
 **/
public class WaitingTask implements Runnable {

    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch latch;

    public WaitingTask(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            latch.await();
            System.out.println("Latch barrier passed for " + this);
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public String toString(){
        return String.format("WaitingTask %1$ - 3d",id);
    }
}

package com.countDownLatchTest;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author redarm
 * @Date 2020/4/18 6:11 下午
 **/
public class CountDownLatchDemo {

    private static final int SIZE = 100;

    public static void main(String[] args) {

        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(SIZE);

        for(int i=0;i<10;i++){
            service.execute(new WaitingTask(latch));
        }

        for(int i=0;i<SIZE;i++){
            service.execute(new TaskPortion(latch));
        }

        System.out.println("Launched all tasks");

        service.shutdown();
    }
}

CyclicBarrier(循环壁垒)

创建一组并行执行的任务,在进行下一个步骤之前,所有的任务都要完成


赛马

package com.cyclicBarrierTest;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author redarm
 * @Date 2020/4/18 6:20 下午
 **/
public class Horse implements Runnable {

    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;

    private static Random random = new Random(1);
    private static CyclicBarrier barrier;

    public Horse(CyclicBarrier barrier){
        this.barrier = barrier;
    }

    public synchronized int getStrides(){
        return strides;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                synchronized (this){
                    strides += random.nextInt(3);
                }
                barrier.await();
            }
        } catch (InterruptedException e) {
            System.out.println("InterruptException");
        } catch (BrokenBarrierException e) {
            System.out.println("BrokenBarrierException");
        }
    }

    public String toString(){
        return "Horse "+ id  + " ";
    }

    public String tracks(){
        StringBuilder sb = new StringBuilder();
        for(int i=0;i<getStrides();i++){
            sb.append("*");
        }
        sb.append(id);
        return sb.toString();
    }
}

定义马

package com.cyclicBarrierTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/18 6:27 下午
 **/
public class HorseRace {

    static final int FINISH_LINE=75;
    private List<Horse> horses = new ArrayList<>();
    private ExecutorService service = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;

    public HorseRace(int nHorses, final int pause){
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            @Override
            public void run() {
                StringBuilder sb = new StringBuilder();
                for(int i=0;i<FINISH_LINE;i++){
                    sb.append("=");
                }
                System.out.println(sb.toString());

                for (Horse horse : horses){
                    System.out.println(horse.tracks());
                }

                for (Horse horse : horses){
                    if (horse.getStrides() >= FINISH_LINE){
                        System.out.println(horse + "WON !!");
                        service.shutdownNow();
                        return;
                    }
                }

                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                } catch (InterruptedException e) {
                    System.out.println("InterruptException");
                }
            }
        });

        for (int i=0;i< nHorses;i++){
            Horse horse = new Horse(barrier);
            horses.add(horse);
            service.execute(horse);
        }
    }

    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;

        new HorseRace(nHorses,pause);
    }
}

跑道以及赛马
每匹马的步子大小是随机的,0-3步,但是每一匹马的频率是一样的
每一个马跑一步就是一个任务,把所有的马放到同一个CyclicBarrier中,这样每个马跑完一步之后就会等CyclicBarrier中其他没跑这一步的马,等所有的马都跑完了一步之后,再开始一起跑
这就是CyclicBarrier的限制:每一个任务执行完了之后等其他的任务,所有的任务都是同步前进的
CyclicBarrier参数:计数值以及一个匿名内部类实现的栅栏动作
栅栏动作:计数值为0的时候自动执行
计数值为7,当七匹马都走了一步,都调用了await后,计数值减少到了0,执行一次栅栏动作,然后循环这个过程,最后中断所有的线程

与CountDownLatch区别:

  • 可以循环反复执行
  • 提供栅栏动作

DelayQueue

无界的BlockingQueue,用于放置实现了Delay接口的对象,其中的对象只能在到期时才能从队列中取走
队列时有序的,延迟到期时间最长的在队头,如果没有任何的延迟到期,poll返回null,队列中不能放null

package com.delayQueueTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
 * @Author redarm
 * @Date 2020/4/18 7:23 下午
 **/
public class DelayedTask implements Delayed , Runnable {

    public static int counter = 0;

    private final int id = counter++;
    private final int delta;
    private final long trigger;

    protected static List<DelayedTask> sequence = new ArrayList<>();

    public DelayedTask(int delayInMillisecondes){
        this.delta = delayInMillisecondes;
        trigger = System.nanoTime() + NANOSECONDS.convert(delta,MILLISECONDS);
        sequence.add(this);
    }

    public long getDelay(TimeUnit unit){
        return unit.convert(trigger - System.nanoTime(),NANOSECONDS);
    }

    public int compareTo(Delayed arg){
        DelayedTask that = (DelayedTask)arg;

        if(trigger < that.trigger) return -1;
        if(trigger > that.trigger) return 1;
        return 0;
    }

    @Override
    public void run() {
        System.out.println(this + " ");
    }

    public String toString(){
        return String.format("[%1$-4d]" , delta) + " Task " + id;
    }

    public String summary(){
        return "(" + id + ":" + delta + ")";
    }

    public static class EndSentinel extends DelayedTask{
        private ExecutorService exec;
        public EndSentinel(int delay, ExecutorService e){
            super(delay);
            this.exec = e;
        }

        public void run(){
            for(DelayedTask pt : sequence){
                System.out.println(pt.summary() + " ");
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}


package com.delayQueueTest;

import java.util.concurrent.DelayQueue;

/**
 * @Author redarm
 * @Date 2020/4/18 7:37 下午
 **/
public class DelayedTaskConsumer implements Runnable {

    private DelayQueue<DelayedTask> q;

    public DelayedTaskConsumer(DelayQueue<DelayedTask> e){
        q=e;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                q.take().run();
            }
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Finished");
    }
}

package com.delayQueueTest;

import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author redarm
 * @Date 2020/4/18 7:39 下午
 **/
public class DelayQueueDemo {

    public static void main(String[] args) {
        Random random = new Random(1);

        ExecutorService service = Executors.newCachedThreadPool();

        DelayQueue<DelayedTask> delayedTasks = new DelayQueue<>();

        for (int i=0;i<20;i++){
            delayedTasks.put(new DelayedTask(random.nextInt(5000)));
        }

        delayedTasks.add(new DelayedTask.EndSentinel(5000,service));

        service.execute(new DelayedTaskConsumer(delayedTasks));
    }
}