检查中断
interrupt发生的时刻:任务要进入到阻塞队列中或者任务已经在阻塞队列中了
package com.company;
/**
* @Author redarm
* @Date 2020/4/15 5:39 下午
**/
public class NeedsCleanup {
private final int id;
public NeedsCleanup(int id){
this.id = id;
System.out.println("NeedsCleanuo " + id);
}
public void cleanup(){
System.out.println("Cleaning up " + id);
}
}
package com.company;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/15 5:40 下午
**/
public class Blocked3 implements Runnable {
private volatile double d = 0.0;
@Override
public void run() {
while (!Thread.interrupted()){
NeedsCleanup n1 = new NeedsCleanup(1);
try {
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(1);
NeedsCleanup n2 = new NeedsCleanup(2);
try {
System.out.println("Calculating");
for (int i=1;i<2500000;i++){
d = d+ (Math.PI + Math.E) / d;
}
System.out.println("Finished time-consuming operation");
} finally {
n2.cleanup();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
n1.cleanup();
System.out.println("Exiting test");
}
}
}
}
package com.company;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/15 5:46 下午
**/
public class InterruptingIdiom {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(30);
t.interrupt();
}
}
使用Thread.interrupted来判断是否中断过
线程之间协作
之前讲了资源互斥保证任务之间可以独立的互不干涉的进行工作
下面讲如何让多个任务协作一起去解决某个问题
某些任务需要并行执行,某些任务需要一些任务完成之后才可以开始执行
关键:任务之间的握手
互斥:只有一个任务可以相应某个信号
方法:
- wait,notify
- Condition对象的await,signal
wait,notify
wait:线程的执行被挂起,线程的锁被释放,等待notify或者notifyAll
表示我所能做的已经做完了,能其他的线程对数据进行改变后然后我再继续工作
sleep和yield:线程的锁不会被释放
wait两种工作方式:
- 毫秒为参数,效果与sleep相同,不同的是对象锁是被释放的,而且可以使用notify和notifyAll唤醒中断睡眠
- 无参数,一直睡眠下去,直到等到notify或者notifyAll唤醒
wait和notify和notifyAll是属于Object中的一部分,而不是属于Thread的
只能用在同步控制块中或者同步方法中,因为这些方法都需要先获取对象的锁
sleep只能用在同步方法之外
比如,如果想对x发送notifyAll,就需要先获取x的锁,可以把x放到同步控制块中
汽车镀膜:
一个汽车,两道工序,镀膜和抛光,镀膜完了就抛光,抛光完了就镀膜
对一辆车反复执行这个两个工序
package com.company;
/**
* @Author redarm
* @Date 2020/4/15 8:31 下午
**/
public class Car {
private boolean waxOn = false;
public synchronized void waxed(){
waxOn = true;
notifyAll();
}
public synchronized void buffed(){
waxOn = false;
notifyAll();
}
public synchronized void waitForWaxing() throws InterruptedException{
while (waxOn == false){
wait();
}
}
public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn == true){
wait();
}
}
}
定义一辆车,wait:线程被挂起,notifyAll:唤醒所有挂起的线程
package com.company;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/15 8:39 下午
**/
public class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("Wax on!");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("WaxOn Interrupt");
}
System.out.println("Ending Wax on task");
}
}
package com.company;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/15 8:42 下午
**/
public class WaxOff implements Runnable {
private Car car;
public WaxOff(Car car){
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
car.waitForWaxing();
System.out.println("Wax Off!");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("Wax Off interrupt");
}
System.out.println("Ending Wax Off task");
}
}
镀膜和抛光两个工序
package com.company;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/15 8:47 下午
**/
public class WaxOMatic {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new WaxOff(car));
service.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(3);
service.shutdownNow();
}
}
三秒的镀膜和抛光,然后中断所有的线程
两个线程,一个镀膜,一个抛光,一辆车
抛光线程:检查车是否已经镀膜,如果还没镀膜,就把抛光这个线程挂起,直到镀膜后进行抛光,然后唤醒镀膜的线程,告诉他你可以镀膜了
镀膜:看车是否已经抛光,如果没抛光就把镀膜这个线程挂起来,直到抛光后唤醒镀膜线程,告诉他你可以镀膜了
通常while与wait一起使用,用来判断wait挂起的条件
错失信号
防止错失信号,把wait的条件放到锁之内判断
错:
while (someCondition){
synchronized (monitor){
monitor.wait();
}
}
对:
synchronized (monitor){
while (someCondition){
monitor.wait();
}
}
notify和notifyAll
notifyAll:所有等待特定锁的任务都会被唤醒
notify:所有等待同一个锁的任务只有一个会被唤醒
生产者-消费者队列
wait和notifyAll是一种非常低级的方式
同步队列:任何时刻只允许一个元素插入或者移除元素
BlockingQueue接口有很多实现:
- LinkedBlockingQueue:无界队列
- ArrayBlockingQueue:尺寸固定的队列
吐司BlockingQueue
制作吐司:
三个步骤,制作,抹黄油,涂果酱
package com.company;
/**
* @Author redarm
* @Date 2020/4/16 7:47 下午
**/
public class Toast {
public enum Status{ DRY, BUTTERED, JAMMED }
private Status status = Status.DRY;
private int id;
public Toast(int id){
this.id = id;
}
public void butter(){
status = Status.BUTTERED;
}
public void jam(){
status = Status.JAMMED;
}
public Status getStatus(){
return this.status;
}
public int getId(){
return id;
}
public String toString(){
return "Toast " + id + ": " + status;
}
}
定义一个吐司类,用enum定义吐司的三种状态
package com.company;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @Author redarm
* @Date 2020/4/16 7:53 下午
**/
public class ToastQueue extends LinkedBlockingDeque<Toast> {
}
定义吐司同步队列
package com.company;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/16 7:53 下午
**/
public class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random random = new Random(1);
public Toaster(ToastQueue toastQueue) {
this.toastQueue = toastQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
Toast t = new Toast(count++);
System.out.println(t);
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println(e);
}
System.out.println("Toaster Off");
}
}
制作吐司的任务:制作一个吐司并放到制待涂黄油的同步队列中
package com.company;
/**
* @Author redarm
* @Date 2020/4/16 7:59 下午
**/
public class Butter implements Runnable {
private ToastQueue dryQueue, bufferedQueue;
public Butter(ToastQueue dryQueue, ToastQueue bufferedQueue){
this.dryQueue = dryQueue;
this.bufferedQueue = bufferedQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
bufferedQueue.put(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Butter Off");
}
}
从等待涂黄油的同步队列中取出吐司,然后涂黄油,然后放到等待涂果酱的同步队列中
package com.company;
/**
* @Author redarm
* @Date 2020/4/16 8:04 下午
**/
public class Jammer implements Runnable {
private ToastQueue butteredQueue, finashedQueue;
public Jammer(ToastQueue butteredQueue , ToastQueue finashedQueue){
this.butteredQueue = butteredQueue;
this.finashedQueue = finashedQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finashedQueue.put(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Jammer Off");
}
}
从等待涂果酱的同步队列中取出吐司,涂果酱,放到完成同步队列中
package com.company;
/**
* @Author redarm
* @Date 2020/4/16 8:06 下午
**/
public class Eater implements Runnable {
private ToastQueue finashedQueue;
private int counter = 0;
public Eater(ToastQueue finashedQueue){
this.finashedQueue = finashedQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Toast t = finashedQueue.take();
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED){
System.out.println(">>>> Error: " + t);
System.exit(1);
} else {
System.out.println("Chomp !" + t);
}
}
} catch (InterruptedException e) {
System.out.println("Eater Interrupted");
}
System.out.println("Eater Off");
}
}
吃吐司任务:从完成的吐司同步队列中取出吐司,吃掉
package com.company;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/16 8:11 下午
**/
public class ToastOMatic {
public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue(),
bufferedQueue =new ToastQueue(),
finashedQueue = new ToastQueue();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Toaster(dryQueue));
service.execute(new Butter(dryQueue,bufferedQueue));
service.execute(new Jammer(bufferedQueue,finashedQueue));
service.execute(new Eater(finashedQueue));
TimeUnit.SECONDS.sleep(5);
service.shutdownNow();
}
}
三个线程同时工作5秒,然后中断所有线程
没有使用synchronized和Lock,单单使用BlockingQueue完成了进程之间的协作还有资源的同步
在任何时刻同一个Toast只能有一个任务操作
任务间使用管道进行输入输出
- PipedWriter:向任务管道中写
- PipedReader:从任务管道中读
package com.PipedReader;
import java.io.IOException;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/17 3:42 下午
**/
public class Sender implements Runnable {
private PipedWriter pipedWriter;
private Random random = new Random(1);
public Sender(){
pipedWriter = new PipedWriter();
}
public PipedWriter getPipedWriter(){
return pipedWriter;
}
@Override
public void run() {
try {
while (true){
for(char c = 'a'; c<'z' ; c++){
pipedWriter.write(c);
TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
}
}
} catch (IOException e) {
System.out.println("IOExeception");
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
}
}
package com.PipedReader;
import java.io.IOException;
import java.io.PipedReader;
/**
* @Author redarm
* @Date 2020/4/17 3:47 下午
**/
public class Receiver implements Runnable {
private PipedReader pipedReader;
public Receiver(Sender sender) throws IOException {
this.pipedReader = new PipedReader(sender.getPipedWriter());
}
@Override
public void run() {
try {
while (true){
System.out.println("Read " + (char)pipedReader.read() + ", ");
}
} catch (IOException e) {
System.out.println(e);
}
}
}
package com.PipedReader;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author redarm
* @Date 2020/4/17 3:50 下午
**/
public class PipedIO {
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(sender);
executorService.execute(receiver);
TimeUnit.SECONDS.sleep(5);
executorService.shutdownNow();
}
}
无论是BlockingQueue还是PipedReader还是PipedWriter,都是任务把要操作的需要同步的对象放到这些管道中,从这些队列或者管道中取出对象,加工后再放到一个队列或者管道中,对象一直是在队列或者管道中传输的,实现资源的同步