死锁
死锁:比如有三个任务,任务1正在等待任务2完成,任务2正在等待任务3完成,结果任务正在等待任务1完成,然后大家就这样一直等着,哪个任务都动不了,称为死锁
哲学家就餐问题
假设:五个哲学家一起吃饭,五根筷子,圆桌子,每根筷子放在两名哲学家的中间,哲学家两个行为:思考和就餐,就餐需要使用一双(两根)筷子
每个哲学家都可能出现这种情况:拿到一根筷子,但是另一根筷子正在被使用,需要等待另一个
死锁:所有的哲学家手里都有一根筷子,都在等另一根筷子
package com.Philosophers;
/**
* @Author redarm
* @Date 2020/4/17 6:38 下午
**/
public class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException{
while (taken){
wait();
}
taken = true;
}
public synchronized void drop() {
taken = false;
notifyAll();
}
}
定义筷子
package com.Philosophers;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/17 6:39 下午
**/
public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random random = new Random(1);
public Philosopher(Chopstick left, Chopstick right, int id, int pronderFactor){
this.left = left;
this.right = right;
this.id = id;
this.ponderFactor = pronderFactor;
}
private void pauser() throws InterruptedException{
if (ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(random.nextInt(ponderFactor * 250));
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
System.out.println(this + " thinking");
pauser();
System.out.println(this + " grabbing right");
right.take();
System.out.println(this + " grabbing left");
left.take();
System.out.println(this + " eating");
pauser();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
System.out.println(e);
}
}
public String toString(){
return "Philosopher " + id;
}
}
定义哲学家
package com.Philosophers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/17 6:47 下午
**/
public class DeadLockingDiningPhilosophers {
public static void main(String[] args) throws InterruptedException {
int ponder = 0;
int size = 5;
ExecutorService executorService = Executors.newCachedThreadPool();
Chopstick[] chopsticks = new Chopstick[size];
for(int i=0;i<size;i++){
chopsticks[i] = new Chopstick();
}
for(int i=0;i<size;i++){
executorService.execute(new Philosopher(chopsticks[i],chopsticks[(i+1)%size],i,ponder));
}
TimeUnit.SECONDS.sleep(5);
executorService.shutdownNow();
}
}
五个哲学家五个筷子观察死锁情况
参数ponder为哲学家们思考的时间,ponder越大,哲学家们思考时间越长,ponder为0,哲学家们不思考
ponder越大,发生死锁的概率越小,但是仍然有概率发生
ponder为0时,很快就会发生死锁
死锁四个条件(同时满足,产生死锁):
- 互斥条件:资源不能共享
- 一个任务持有一个资源,并且等待另一个被别的任务持有的资源
- 资源不能被任务抢占
- 必须循环等待:一个任务在等待别的任务,这个任务又被另一个任务等待
预防死锁:破坏四个中的一个条件就行
第四个容易破坏
每个Philosopher都是先拿右边的,然后拿左边的,这样可能每个人都手里拿着左边的在等右边的
如果最后一个人先拿左边的,那么他就不会阻止第一个人拿他左边的筷子
package com.Philosophers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/17 6:47 下午
**/
public class DeadLockingDiningPhilosophers {
public static void main(String[] args) throws InterruptedException {
int ponder = 0;
int size = 5;
ExecutorService executorService = Executors.newCachedThreadPool();
Chopstick[] chopsticks = new Chopstick[size];
for(int i=0;i<size;i++){
chopsticks[i] = new Chopstick();
}
for(int i=0;i<size;i++){
if (i< (size - 1)){
executorService.execute(new Philosopher(chopsticks[i],chopsticks[i+1],i,ponder));
} else {
executorService.execute(new Philosopher(chopsticks[0],chopsticks[i],i,ponder));
}
}
TimeUnit.SECONDS.sleep(5);
executorService.shutdownNow();
}
}
让最后一个人先拿左边的筷子,就不会产生死锁
新类库中的构件
CountDownLatch(倒数计时)
同步一个或多个任务,使他们等待别的任务完成
可以向CountDownLatch对象设置一个初始值,调用countDown减小这个计数值
在CountDownLatch对象上调用wait的方法都会阻塞,直到对象的初始值降为零,调用countDown的方法不会阻塞
package com.countDownLatchTest;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/18 6:01 下午
**/
public class TaskPortion implements Runnable{
private static int counter = 0;
private final int id = counter ++;
private static Random random = new Random(1);
private final CountDownLatch latch;
public TaskPortion(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
try {
doWork();
latch.countDown();
} catch (InterruptedException e) {
System.out.println(e);
}
}
public void doWork() throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
System.out.println(this + " completed");
}
public String toString(){
return String.format("%1$-3d",id);
}
}
package com.countDownLatchTest;
import java.util.concurrent.CountDownLatch;
/**
* @Author redarm
* @Date 2020/4/18 6:07 下午
**/
public class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
public WaitingTask(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException e) {
System.out.println(e);
}
}
public String toString(){
return String.format("WaitingTask %1$ - 3d",id);
}
}
package com.countDownLatchTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author redarm
* @Date 2020/4/18 6:11 下午
**/
public class CountDownLatchDemo {
private static final int SIZE = 100;
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i=0;i<10;i++){
service.execute(new WaitingTask(latch));
}
for(int i=0;i<SIZE;i++){
service.execute(new TaskPortion(latch));
}
System.out.println("Launched all tasks");
service.shutdown();
}
}
CyclicBarrier(循环壁垒)
创建一组并行执行的任务,在进行下一个步骤之前,所有的任务都要完成
赛马
package com.cyclicBarrierTest;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @Author redarm
* @Date 2020/4/18 6:20 下午
**/
public class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random random = new Random(1);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier barrier){
this.barrier = barrier;
}
public synchronized int getStrides(){
return strides;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
strides += random.nextInt(3);
}
barrier.await();
}
} catch (InterruptedException e) {
System.out.println("InterruptException");
} catch (BrokenBarrierException e) {
System.out.println("BrokenBarrierException");
}
}
public String toString(){
return "Horse "+ id + " ";
}
public String tracks(){
StringBuilder sb = new StringBuilder();
for(int i=0;i<getStrides();i++){
sb.append("*");
}
sb.append(id);
return sb.toString();
}
}
定义马
package com.cyclicBarrierTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/18 6:27 下午
**/
public class HorseRace {
static final int FINISH_LINE=75;
private List<Horse> horses = new ArrayList<>();
private ExecutorService service = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause){
barrier = new CyclicBarrier(nHorses, new Runnable() {
@Override
public void run() {
StringBuilder sb = new StringBuilder();
for(int i=0;i<FINISH_LINE;i++){
sb.append("=");
}
System.out.println(sb.toString());
for (Horse horse : horses){
System.out.println(horse.tracks());
}
for (Horse horse : horses){
if (horse.getStrides() >= FINISH_LINE){
System.out.println(horse + "WON !!");
service.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("InterruptException");
}
}
});
for (int i=0;i< nHorses;i++){
Horse horse = new Horse(barrier);
horses.add(horse);
service.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses,pause);
}
}
跑道以及赛马
每匹马的步子大小是随机的,0-3步,但是每一匹马的频率是一样的
每一个马跑一步就是一个任务,把所有的马放到同一个CyclicBarrier中,这样每个马跑完一步之后就会等CyclicBarrier中其他没跑这一步的马,等所有的马都跑完了一步之后,再开始一起跑
这就是CyclicBarrier的限制:每一个任务执行完了之后等其他的任务,所有的任务都是同步前进的
CyclicBarrier参数:计数值以及一个匿名内部类实现的栅栏动作
栅栏动作:计数值为0的时候自动执行
计数值为7,当七匹马都走了一步,都调用了await后,计数值减少到了0,执行一次栅栏动作,然后循环这个过程,最后中断所有的线程
与CountDownLatch区别:
- 可以循环反复执行
- 提供栅栏动作
DelayQueue
无界的BlockingQueue,用于放置实现了Delay接口的对象,其中的对象只能在到期时才能从队列中取走
队列时有序的,延迟到期时间最长的在队头,如果没有任何的延迟到期,poll返回null,队列中不能放null
package com.delayQueueTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* @Author redarm
* @Date 2020/4/18 7:23 下午
**/
public class DelayedTask implements Delayed , Runnable {
public static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();
public DelayedTask(int delayInMillisecondes){
this.delta = delayInMillisecondes;
trigger = System.nanoTime() + NANOSECONDS.convert(delta,MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit){
return unit.convert(trigger - System.nanoTime(),NANOSECONDS);
}
public int compareTo(Delayed arg){
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
@Override
public void run() {
System.out.println(this + " ");
}
public String toString(){
return String.format("[%1$-4d]" , delta) + " Task " + id;
}
public String summary(){
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask{
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e){
super(delay);
this.exec = e;
}
public void run(){
for(DelayedTask pt : sequence){
System.out.println(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
package com.delayQueueTest;
import java.util.concurrent.DelayQueue;
/**
* @Author redarm
* @Date 2020/4/18 7:37 下午
**/
public class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> e){
q=e;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
q.take().run();
}
} catch (InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Finished");
}
}
package com.delayQueueTest;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author redarm
* @Date 2020/4/18 7:39 下午
**/
public class DelayQueueDemo {
public static void main(String[] args) {
Random random = new Random(1);
ExecutorService service = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> delayedTasks = new DelayQueue<>();
for (int i=0;i<20;i++){
delayedTasks.put(new DelayedTask(random.nextInt(5000)));
}
delayedTasks.add(new DelayedTask.EndSentinel(5000,service));
service.execute(new DelayedTaskConsumer(delayedTasks));
}
}