liujijiang

java学习笔记 --21-- 并发(3)

2020.04.15

检查中断

interrupt发生的时刻:任务要进入到阻塞队列中或者任务已经在阻塞队列中了


package com.company;

/**
 * @Author redarm
 * @Date 2020/4/15 5:39 下午
 **/
public class NeedsCleanup {

    private final int id;

    public NeedsCleanup(int id){
        this.id = id;
        System.out.println("NeedsCleanuo " + id);
    }

    public void cleanup(){
        System.out.println("Cleaning up " + id);
    }
}

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/15 5:40 下午
 **/
public class Blocked3 implements Runnable {

    private volatile double d = 0.0;

    @Override
    public void run() {
        while (!Thread.interrupted()){
            NeedsCleanup n1 = new NeedsCleanup(1);

            try {
                System.out.println("Sleeping");
                TimeUnit.SECONDS.sleep(1);

                NeedsCleanup n2 = new NeedsCleanup(2);

                try {
                    System.out.println("Calculating");

                    for (int i=1;i<2500000;i++){
                        d = d+ (Math.PI + Math.E) / d;
                    }

                    System.out.println("Finished time-consuming operation");
                } finally {
                    n2.cleanup();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                n1.cleanup();

                System.out.println("Exiting test");
            }
        }
    }
}

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/15 5:46 下午
 **/
public class InterruptingIdiom {

    public static void main(String[] args) throws InterruptedException {

        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(30);
        t.interrupt();
    }
}

使用Thread.interrupted来判断是否中断过


线程之间协作

之前讲了资源互斥保证任务之间可以独立的互不干涉的进行工作
下面讲如何让多个任务协作一起去解决某个问题

某些任务需要并行执行,某些任务需要一些任务完成之后才可以开始执行

关键:任务之间的握手
互斥:只有一个任务可以相应某个信号

方法:

  • wait,notify
  • Condition对象的await,signal

wait,notify

wait:线程的执行被挂起,线程的锁被释放,等待notify或者notifyAll
表示我所能做的已经做完了,能其他的线程对数据进行改变后然后我再继续工作

sleep和yield:线程的锁不会被释放

wait两种工作方式:

  • 毫秒为参数,效果与sleep相同,不同的是对象锁是被释放的,而且可以使用notify和notifyAll唤醒中断睡眠
  • 无参数,一直睡眠下去,直到等到notify或者notifyAll唤醒

wait和notify和notifyAll是属于Object中的一部分,而不是属于Thread的

只能用在同步控制块中或者同步方法中,因为这些方法都需要先获取对象的锁

sleep只能用在同步方法之外

比如,如果想对x发送notifyAll,就需要先获取x的锁,可以把x放到同步控制块中


汽车镀膜:
一个汽车,两道工序,镀膜和抛光,镀膜完了就抛光,抛光完了就镀膜
对一辆车反复执行这个两个工序

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/15 8:31 下午
 **/
public class Car {

    private boolean waxOn = false;

    public synchronized void waxed(){
        waxOn = true;
        notifyAll();
    }

    public synchronized void buffed(){
        waxOn = false;
        notifyAll();
    }

    public synchronized void waitForWaxing() throws InterruptedException{
        while (waxOn == false){
            wait();
        }
    }

    public synchronized void waitForBuffing() throws InterruptedException {
        while (waxOn == true){
            wait();
        }
    }


}


定义一辆车,wait:线程被挂起,notifyAll:唤醒所有挂起的线程

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/15 8:39 下午
 **/
public class WaxOn implements Runnable {

    private Car car;

    public WaxOn(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println("Wax on!");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }

        } catch (InterruptedException e) {
            System.out.println("WaxOn Interrupt");
        }

        System.out.println("Ending Wax on task");
    }
}

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/15 8:42 下午
 **/
public class WaxOff implements  Runnable {

    private Car car;

    public WaxOff(Car car){
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){

                car.waitForWaxing();
                System.out.println("Wax Off!");

                TimeUnit.MILLISECONDS.sleep(200);

                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Wax Off interrupt");
        }

        System.out.println("Ending Wax Off task");
    }
}

镀膜和抛光两个工序

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/15 8:47 下午
 **/
public class WaxOMatic {

    public static void main(String[] args) throws InterruptedException {
        Car car = new Car();

        ExecutorService service = Executors.newCachedThreadPool();

        service.execute(new WaxOff(car));
        service.execute(new WaxOn(car));

        TimeUnit.SECONDS.sleep(3);

        service.shutdownNow();
    }
}

三秒的镀膜和抛光,然后中断所有的线程

两个线程,一个镀膜,一个抛光,一辆车
抛光线程:检查车是否已经镀膜,如果还没镀膜,就把抛光这个线程挂起,直到镀膜后进行抛光,然后唤醒镀膜的线程,告诉他你可以镀膜了
镀膜:看车是否已经抛光,如果没抛光就把镀膜这个线程挂起来,直到抛光后唤醒镀膜线程,告诉他你可以镀膜了


通常while与wait一起使用,用来判断wait挂起的条件

错失信号

防止错失信号,把wait的条件放到锁之内判断
错:

while (someCondition){
            synchronized (monitor){
                monitor.wait();
            }
        }

对:

synchronized (monitor){
            while (someCondition){
                monitor.wait();
            }
        }

notify和notifyAll

notifyAll:所有等待特定锁的任务都会被唤醒
notify:所有等待同一个锁的任务只有一个会被唤醒

生产者-消费者队列

wait和notifyAll是一种非常低级的方式

同步队列:任何时刻只允许一个元素插入或者移除元素

BlockingQueue接口有很多实现:

  • LinkedBlockingQueue:无界队列
  • ArrayBlockingQueue:尺寸固定的队列

吐司BlockingQueue

制作吐司:
三个步骤,制作,抹黄油,涂果酱

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/16 7:47 下午
 **/
public class Toast {

    public enum Status{ DRY, BUTTERED, JAMMED }

    private Status status = Status.DRY;

    private int id;

    public Toast(int id){
        this.id = id;
    }

    public void butter(){
        status = Status.BUTTERED;
    }

    public void jam(){
        status = Status.JAMMED;
    }

    public Status getStatus(){
        return this.status;
    }

    public int getId(){
        return id;
    }

    public String toString(){
        return "Toast " + id + ": " + status;
    }
}

定义一个吐司类,用enum定义吐司的三种状态

package com.company;

import java.util.concurrent.LinkedBlockingDeque;

/**
 * @Author redarm
 * @Date 2020/4/16 7:53 下午
 **/
public class ToastQueue extends LinkedBlockingDeque<Toast> {
}

定义吐司同步队列

package com.company;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/16 7:53 下午
 **/
public class Toaster implements Runnable {

    private ToastQueue toastQueue;

    private int count = 0;

    private Random random = new Random(1);

    public Toaster(ToastQueue toastQueue) {
        this.toastQueue = toastQueue;
    }

    @Override
    public void run() {
        try {

            while (!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));

                Toast t = new Toast(count++);

                System.out.println(t);

                toastQueue.put(t);

            }
        } catch (InterruptedException e) {
            System.out.println(e);
        }
        System.out.println("Toaster Off");
    }

}

制作吐司的任务:制作一个吐司并放到制待涂黄油的同步队列中

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/16 7:59 下午
 **/
public class Butter implements Runnable {

    private ToastQueue dryQueue, bufferedQueue;

    public Butter(ToastQueue dryQueue, ToastQueue bufferedQueue){
        this.dryQueue = dryQueue;
        this.bufferedQueue = bufferedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                Toast t = dryQueue.take();

                t.butter();

                System.out.println(t);

                bufferedQueue.put(t);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Butter Off");
    }
}

从等待涂黄油的同步队列中取出吐司,然后涂黄油,然后放到等待涂果酱的同步队列中

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/16 8:04 下午
 **/
public class Jammer implements Runnable {

    private ToastQueue butteredQueue, finashedQueue;

    public Jammer(ToastQueue butteredQueue , ToastQueue finashedQueue){
        this.butteredQueue = butteredQueue;
        this.finashedQueue = finashedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                Toast t = butteredQueue.take();

                t.jam();

                System.out.println(t);

                finashedQueue.put(t);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Jammer Off");
    }
}

从等待涂果酱的同步队列中取出吐司,涂果酱,放到完成同步队列中

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/16 8:06 下午
 **/
public class Eater implements Runnable {

    private ToastQueue finashedQueue;

    private int counter = 0;

    public Eater(ToastQueue finashedQueue){
        this.finashedQueue = finashedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()){
                Toast t = finashedQueue.take();

                if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED){
                    System.out.println(">>>> Error: " + t);
                    System.exit(1);
                } else {
                    System.out.println("Chomp !" + t);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Eater Interrupted");
        }

        System.out.println("Eater Off");
    }
}

吃吐司任务:从完成的吐司同步队列中取出吐司,吃掉

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/16 8:11 下午
 **/
public class ToastOMatic {

    public static void main(String[] args) throws InterruptedException {
        ToastQueue dryQueue = new ToastQueue(),
                bufferedQueue =new ToastQueue(),
                finashedQueue = new ToastQueue();

        ExecutorService service = Executors.newCachedThreadPool();

        service.execute(new Toaster(dryQueue));
        service.execute(new Butter(dryQueue,bufferedQueue));
        service.execute(new Jammer(bufferedQueue,finashedQueue));
        service.execute(new Eater(finashedQueue));

        TimeUnit.SECONDS.sleep(5);

        service.shutdownNow();
    }
}

三个线程同时工作5秒,然后中断所有线程

没有使用synchronized和Lock,单单使用BlockingQueue完成了进程之间的协作还有资源的同步

在任何时刻同一个Toast只能有一个任务操作


任务间使用管道进行输入输出

  • PipedWriter:向任务管道中写
  • PipedReader:从任务管道中读
package com.PipedReader;

import java.io.IOException;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/17 3:42 下午
 **/
public class Sender implements Runnable {

    private PipedWriter pipedWriter;

    private Random random = new Random(1);

    public Sender(){
        pipedWriter = new PipedWriter();
    }

    public PipedWriter getPipedWriter(){
        return pipedWriter;
    }

    @Override
    public void run() {
        try {
            while (true){
                for(char c = 'a'; c<'z' ; c++){
                    pipedWriter.write(c);

                    TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
                }
            }
        } catch (IOException e) {
            System.out.println("IOExeception");
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
    }
}

package com.PipedReader;

import java.io.IOException;
import java.io.PipedReader;

/**
 * @Author redarm
 * @Date 2020/4/17 3:47 下午
 **/
public class Receiver implements Runnable {

    private PipedReader pipedReader;

    public Receiver(Sender sender) throws IOException {
        this.pipedReader = new PipedReader(sender.getPipedWriter());
    }

    @Override
    public void run() {
        try {
            while (true){
                System.out.println("Read " + (char)pipedReader.read() + ", ");
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }
}

package com.PipedReader;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/17 3:50 下午
 **/
public class PipedIO {

    public static void main(String[] args) throws IOException, InterruptedException {

        Sender sender = new Sender();

        Receiver receiver = new Receiver(sender);

        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(sender);
        executorService.execute(receiver);

        TimeUnit.SECONDS.sleep(5);

        executorService.shutdownNow();
    }
}

无论是BlockingQueue还是PipedReader还是PipedWriter,都是任务把要操作的需要同步的对象放到这些管道中,从这些队列或者管道中取出对象,加工后再放到一个队列或者管道中,对象一直是在队列或者管道中传输的,实现资源的同步