liujijiang

java学习笔记 --21-- 并发(2)

2020.04.12

原子性和易变性

错误的知识:原子性操作不需要同步机制
原子性操作:不能被线程调度机制中断,一旦执行就会执行完毕
依赖原子性很危险,你在玩火
除了long和double之外的基本类型的操作是原子性操作
Jvm将long和double操作分为了两个32位操作来执行
使用volatile关键字让long和double获得原子性
高手:可以使用原子性从而取代同步
volatile:对一个域产生写操作,其他的读操作都可以看见这个域
volatile域会立即写入主存
synchronized是最安全的,使用其他的任何方式都是有风险的
原子操作:赋值和返回都是原子性的

原子类

javaSE5加入了原子类:

  • AtomicInteger
  • AtomicLong
  • AtomicReference

提供的原子性的更新操作:compareAndSet

Atomic类的设计是用来构建java.util.concurrent中的类

临界区

为了实现防止多线程访问方法内的部分代码而不是整个方法的全部代码

线程本地存储

为使用相同变量的不同线程创建不同的存储

创建和管理本地线程存储使用java.lang.ThreadLocal类来实现


创建ThreadLocal对象后,只能通过set和get方法访问对象的内容

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/14 3:11 下午
 **/
public class Accessor implements Runnable {

    private final int ID ;

    public Accessor(int ID){
        this.ID = ID;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()){
            ThreadLocalVariableHolder.increment();
            System.out.println(this);
            Thread.yield();
        }
    }

    public String toString(){
        return "#" + ID + ": " + ThreadLocalVariableHolder.get();
    }
}

package com.company;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author redarm
 * @Date 2020/4/14 3:13 下午
 **/
public class ThreadLocalVariableHolder {

    private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
        private Random random = new Random(1);

        protected synchronized Integer initialValue(){
            return random.nextInt(10000);
        }
    };

    public static void increment(){
        value.set(value.get() + 1);
    }

    public static int get(){
        return value.get();
    }

    public static void main(String [] args){
        ExecutorService service = Executors.newCachedThreadPool();

        for(int i=0;i<10;i++){
            service.execute(new Accessor(i));
        }

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        service.shutdown();
    }
}


终结任务

装饰性花园


package com.company;

import java.util.Random;

/**
 * @Author redarm
 * @Date 2020/4/14 4:00 下午
 **/
public class Count  {

    private int count = 0;

    private Random random = new Random(1);

    public synchronized int increment(){
        int num = count;
        while (random.nextBoolean()){
            Thread.yield();
        }
        return (count = ++num);
    }

    public synchronized int value(){
        return count;
    }
}

package com.company;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author redarm
 * @Date 2020/4/14 4:02 下午
 **/
public class Entrance implements Runnable {

    private static Count count = new Count();

    private static List<Entrance> entrances = new ArrayList<>();

    private int number = 0;

    private final int id;

    private static volatile boolean canceled = false;

    public static void cancel(){
        canceled = true;
    }

    public Entrance(int id){
        this.id = id;

        entrances.add(this);
    }

    @Override
    public void run() {

        while (!canceled){
            synchronized (this){
                ++number;
            }

            System.out.println(this + " Total: " + count.increment());

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("Stopint " + this);
    }

    public synchronized int getValue(){
        return number;
    }

    public String toString(){
        return "Entrance " + id + " : " + getValue();
    }

    public static int getTotalCount(){
        return count.value();
    }

    public static int sumEntrances(){
        int num = 0;

        for(Entrance entrance : entrances){
            num += entrance.getValue();
        }

        return num;
    }
}

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 4:17 下午
 **/
public class OrnamentalGarden {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        for(int i=0;i<5;i++){
            executorService.execute(new Entrance(i));
        }

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Entrance.cancel();

        executorService.shutdown();

        if (!executorService.awaitTermination(250,TimeUnit.MILLISECONDS)){
            System.out.println("Some tasks were not terminated!");
        }

        System.out.println("Total: " + Entrance.getTotalCount());
        System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
    }
}

五个通道维持一个计数器,对计数器的增加操作加锁
通道的run方法部分加锁
awaitTermination方法:在固定时间内完成返回true,否则返回false
如果对Count的increment方法不加锁,结果的total数量和每个通道统计的数量就不一样了


sleep使线程进入阻塞状态

线程的四种状态:

  • 新建(new):当线程被创建的时候短暂的处于这个状态,已经有资格获取cpu的时间了,之后线程调度器会把这个线程转变为可运行状态或阻塞状态
  • 就绪(Runnable):在任意时间,线程可以运行也可以不运行,只要调度器把时间片给他,他就会运行
  • 阻塞(Blocked):线程可以运行,但是某个条件阻止了线程的运行,调度器不会把时间片分给他,直到他进入就绪状态,才有可能执行
  • 死亡(Dead):任务已经结束,是不可以再次运行的,调度器也不会把时间片分给他

进入阻塞状态的原因:

  • sleep
  • wait:挂起线程,直到线程得到notify或者notifyAll或者signal或者signalAll,线程才会进入就绪状态
  • 任务在等待某个输入/输出完成
  • 任务试图调用一个同步控制的方法,但是这个方法已经被别的任务调用了,对象已经被锁住了

中断

两种中断方式:

  • Executor对象的shutdown
  • Future对象的cancel

模拟三种阻塞类型:
sleep造成的线程阻塞

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 7:54 下午
 **/
public class SleepBlocked implements Runnable {

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Exiting SleepBlocked run");
    }
}

IO导致的线程阻塞

package com.company;

import java.io.IOException;
import java.io.InputStream;

/**
 * @Author redarm
 * @Date 2020/4/14 7:55 下午
 **/
public class IOBlocked implements Runnable {

    private InputStream in;

    public IOBlocked(InputStream in){
        this.in = in;
    }

    @Override
    public void run() {
        try {
            System.out.println("Waiting for read");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()){
                System.out.println("Interrupted from IOBlocked");
            } else {
                e.printStackTrace();
            }
        }

        System.out.println("Exiting IOBlocked run");
    }
}

锁(synchronized)导致的线程阻塞
先获取锁,然后永远不释放锁,这样SynchronizedBlocked的run就会造成锁的阻塞

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 8:02 下午
 **/
public class SynchronizedBlocked implements Runnable {

    public synchronized void f(){
        while (true){
            Thread.yield();
        }
    }

    public SynchronizedBlocked(){
        new Thread(){
            public void run(){
                f();
            }
        }.start();
    }

    @Override
    public void run() {
        System.out.println("Trying to call f");
        f();
        System.out.println("Exiting SynchronizedBlocked run");
    }
}

使用Executor的submit返回的Future的方法cancel可以执行中断(调用线程Thread的interrupt)

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 8:05 下午
 **/
public class Interrupting {

    private static ExecutorService service = Executors.newCachedThreadPool();

    public static void test(Runnable runnable){
        Future<?> future = service.submit(runnable);
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Interrupting " + runnable.getClass().getName());
        future.cancel(true);
        System.out.println("Interrupt sent ot " + runnable.getClass().getName());
    }

    public static void main(String[] args) throws Exception{
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.exit(0);
    }
}

测试中断对三种阻塞的影响

  • sleep是可中断的阻塞类型
  • IO和synchronized是不可中断的阻塞类型

IO中断

被阻塞的IO通道会自动响应中断

一个IO类

package com.company;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Author redarm
 * @Date 2020/4/14 8:53 下午
 **/
public class NIOBlocked implements Runnable {

    private final SocketChannel sc;

    public NIOBlocked(SocketChannel sc){
        this.sc = sc;
    }

    @Override
    public void run() {
        try {
            System.out.println("Waiting for read");
            sc.read(ByteBuffer.allocate(1));
        } catch (IOException e) {
            System.out.println(e);
        }

        System.out.println("Exiting NIOBlocked run");
    }
}

使用cancel中断和关闭资源自动响应中断

package com.company;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 8:56 下午
 **/
public class NIOInterrupt {

    public static void main(String[] args){
        ExecutorService executorService = Executors.newCachedThreadPool();

        try {
            ServerSocket serverSocket = new ServerSocket(8080);
        } catch (IOException e) {
            e.printStackTrace();
        }

        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost",8080);
        try {
            SocketChannel sc1 = SocketChannel.open(inetSocketAddress);
            SocketChannel sc2 = SocketChannel.open(inetSocketAddress);

            Future<?> future = executorService.submit(new NIOBlocked(sc1));

            executorService.execute(new NIOBlocked(sc2));

            executorService.shutdown();

            TimeUnit.SECONDS.sleep(1);

            future.cancel(true);

            TimeUnit.SECONDS.sleep(1);

            sc2.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

也可以使用shutdownNow中断所有的线程

被互斥所阻塞

一个任务尝试获得已经被锁住的方法,这个任务就会被挂起(阻塞),直到可以获得这个方法的锁

一个锁可以被同一个任务多次获得

javaSE5加入了ReentrantLock,是的在TeentrantLock上阻塞的任务可以被中断

BlockedMutx 是一个一直被锁住的类

package com.company;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author redarm
 * @Date 2020/4/14 9:53 下午
 **/
public class BlockedMutex {

    private Lock lock = new ReentrantLock();

    public BlockedMutex(){
        lock.lock();
    }

    public void f(){
        try {
            lock.lockInterruptibly();
            System.out.println("Lock acquired in f");
        } catch (InterruptedException e) {
            System.out.println("Interrupt from lock acquisition in f");
        }
    }
}

Blocked2是一个给任务

package com.company;

/**
 * @Author redarm
 * @Date 2020/4/14 9:56 下午
 **/
public class Blocked2 implements Runnable {

    private BlockedMutex blockedMutex = new BlockedMutex();

    @Override
    public void run() {
        System.out.println("Waiting for f in BlockedMutex");
        blockedMutex.f();
        System.out.println("Broken out of blocked call");
    }
}

使用Interrupt中断这个线程

package com.company;

import java.util.concurrent.TimeUnit;

/**
 * @Author redarm
 * @Date 2020/4/14 9:58 下午
 **/
public class Interrupting {

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Blocked2());
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Interrupting thread");
        thread.interrupt();
    }
}