多线程
线程 & 进程
进程:是程序的一次执行过程,是系统进行资源分配和处理机调度的基本单位
线程:线程是进程中的一个执行路径,线程只拥有必不可少的资源如程序计数器、一组寄存器和栈。共享同属进程的全部资源。
Java 程序运行原理:Java命令启动JVM(相当于启动了一个进程),由该进程创建启动多个线程,至少两个线程可以分析出来。执行main()函数的主线程,负责垃圾回收的线程
多线程执行时,其实每一个执行线程都有一片自己所属的栈内存空间。进行方法的压栈和弹栈
创建线程的方式
start本质是先启动线程,再由JVM取调用该线程的run方法(直接调用run方法调用的是普通方法)
继承 Thread 并重写run方法,创建Thread子类对象并调用start方法(多次调用start会抛异常 IllegalThreadStateException)
实现 Runnable 接口并重写run方法,创建Thread对象并将该接口和线程名传入,调用Thread对象的start方法
好处是避免单继承的局限性;适合多个相同线程处理同一资源;解耦,代码和线程独立;
实现 Callable 接口并重写call方法,其实现类用 FutureTask 类来封装,并将FutureTask传入Thread的构造方法中,调用Thread对象start方法。可以调用FutureTask的get方法,阻塞等待任务代码执行完毕,获取返回值。
利用Executors工厂类来创建线程池。
如何实现线程间的通讯
synchronized+wait+notifyAll
wait:等待并释放锁,线程被阻塞,被唤醒后若获得锁那么从这里执行后续代码
notify/All:唤醒该锁上的所有线程,被通知线程不能立即恢复执行线程,重新请求同步锁。但是notifyAll不会释放锁
notify有可能会再次唤醒生产者/消费者,导致最终都在wait
ReentrantLock+Condition+await+signalAll
- 可以实现更细粒度的锁控制,且可以精确控制唤醒的线程
sleep() 和 wait() 有什么区别
- sleep是 Thread 的方法,会使调用的线程休眠指定时间,且不会释放锁,休眠结束回到就绪状态
- wait 是 Object 的方法,等待并立即释放锁,线程被阻塞。被唤醒后若获得锁那么从这里执行后续代码。在同步中调用
Java 中能创建 volatile 数组吗?
可以,这个volatile变量只是指向数组的引用。若改变引用则受到volatile的保护,但是若是改变数组中的元素,volatile则没作用
volatile 能使得一个非原子操作变成原子操作吗?
一般来说volatile只能保证可见性和有序性。
在32位的操作系统、JVM中,单次操作最长只能处理32位,对于long和double的读写会分为两条指令才能完成。但是在Java Language Specification中规定 Writes and reads of volatile long and double values are always atomic.
64位操作系统、JVM的long和double操作都是原子性的
三个线程依次打印ABC
public class Test2 {
Lock lock = new ReentrantLock();
Condition A = lock.newCondition();
Condition B = lock.newCondition();
Condition C = lock.newCondition();
int num = 0;
public void soutA() {
lock.lock();
try {
while (true) {
while (num % 3 != 0) {
A.await();
}
System.out.println("A");
num++;
B.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void soutB() {
lock.lock();
try {
while (true) {
while (num % 3 != 1) {
B.await();
}
System.out.println("B");
num++;
C.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void soutC() {
lock.lock();
try {
while (true) {
while (num % 3 != 2) {
C.await();
}
System.out.println("C");
num++;
A.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
Test2 t = new Test2();
new Thread(t::soutA).start();
new Thread(t::soutB).start();
new Thread(t::soutC).start();
}
}
十个线程依次输出0123456789
public class Test {
private static int num = 0;
public static synchronized void printNum() {
System.out.println(Thread.currentThread().getName() + ":" + num++);
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(Test::printNum);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Test {
static int orderNum = 0;
static Object lockObj = new Object();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new Th(i)).start();
}
}
}
class Th implements Runnable {
int owrNum;
public Th(int owrNum) {
this.owrNum = owrNum;
}
@Override
public void run() {
synchronized (Test.lockObj) {
if (Test.orderNum < 10) {
try {
while (owrNum != Test.orderNum) {
Test.lockObj.wait();
}
System.out.println(Thread.currentThread().getName() + ":" + owrNum);
Test.orderNum++;
Test.lockObj.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
= = = = = = = = = = = = = = = = =
Synchronized和Lock的区别?Lock好处?举例
- Syn是Java的关键字属于JVM层面(在同步中块中才能调用wait/notify);Lock是接口/类,属于Api层面
- Syn和Lock默认都是非公平锁,但是Lock可以设置为公平锁
- Syn执行完代码会自动释放锁(抛异常也会释放);Lock需手动释放锁(try...finally)
- Syn不可中断;Lock可中断(tryLock设置超时方法,lockInterruptibly()方代码块中,用interrupt()方法中断)
- 使用Lock的Condition可以精确唤醒
实现多线程间顺序调用
实现多线程间顺序调用,如A、B、C。要求A打印5次,B打印10次,C打印15次;来10轮
使用Synchronized太麻烦,使用Condition来精确唤醒实现
class ShareResource {
private int flag = 1;//1A,2B,3C
private Lock lock = new ReentrantLock();
private Condition A = lock.newCondition();
private Condition B = lock.newCondition();
private Condition C = lock.newCondition();
public void print5() {
lock.lock();
try {
while (flag != 1) {
A.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + i);
}
flag++;
B.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
while (flag != 2) {
B.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + i);
}
flag++;
C.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
while (flag != 3) {
C.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + i);
}
flag = 1;
A.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class LockTest {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print15();
}
}, "C").start();
}
}
使用阻塞队列实现
JMM理解
谈谈对 volatile 理解?哪些地方用到?
- 是JVM提供的轻量级的同步机制。只保证可见性但不保证原子性,禁止指令重排(有序性)
- 单例模式DCL代码
CAS(Compare-And-Swap)
从AtomicInteger引出 CAS——Unsafe——CAS底层思想——ABA问题——原子引用更新——如何规避ABA问题
如
atomicInteger.getAndIncrement()
,类似i++
,反过来就是++i
;还有其他如修改数据compareAndSet(expect,update)
,原理都是若期望值(工作内存中的值)和主内存中的值相同,才修改。返回类型为boolean。
- CAS(CAS是CPU并发原语,原子性)底层原理:
rt.jar
下的sun.misc.Unsafe
类是CAS核心类,通过该类的native方法直接访问并操作内存,判断内存中偏移地址的值(主内存)是否是期望值(工作内存),是则修改为新值,否则继续循环获取最新值并修改。底层还有volatile修饰value,修改后其他线程也可以看到。- 自旋(CAS思想,值不同则do...while一直循环)
- CAS缺点:
- 循环时间长则CPU开销大(do...while)
- 只能保证一个共享变量的原子操作
- ABA问题
- ABA问题?
- ABA问题:提取主内存中数据到工作内存中,直到和主内存中数据相比时间段内,可能主内存中数据由A改为B又改回A。看似结果没有问题,但是主内存中数据改动过了。
- 原子引用
AtomicReference<T>
,任意类型即可
- 时间戳原子引用,即乐观锁来解决ABA问题
- 利用添加version版本号解决,每次修改添加1。使用JUC中
AtomicStampedReference<T>
解决
- 利用添加version版本号解决,每次修改添加1。使用JUC中
1、谈谈你对volatile理解
2、CAS理解
3、原子类AtomicInteger的ABA问题谈谈?原子更新引用?
4、ArrayList是线程不安全的,编写一个不安全的案例并解决
Set、Map也是这样
List<String> list = new ArrayList();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(Thread.currentThread().getName() + "\t" + list);
}).start();
}
可能会出现java.util.ConcurrentModificationException
。可以使用Vector
或Collections.synchronizedList/Set/Map
来解决
JUC包下的的
CopyOnWriteArrayList
写时复制(读写分离)CopyOnWriteArraySet
,底层是CopyOnWriteArrayList
ConcurrentHashMap
5、公平锁/非公平锁、可重入锁/递归锁、自旋锁、读写锁?手写自旋锁
ReentrantLock(默认)和Synchronized都是非公平的可重入锁
公平锁和非公平锁(
ReentrantLock(boolean b)
,默认不填为false不公平锁,填true为公平锁。)- 公平锁:多个线程按照申请锁的顺序来获取锁,类似队列FIFO
- 非公平锁:先尝试获取锁,尝试失败则采用公平锁的方式等待,高并发下可能出现优先级反转或饥饿现象。吞吐量大。
可重入锁(又称递归锁)
**线程可以进入任何一个它已经拥有的锁 所同步的代码块。**同步方法中包括同步锁。防止死锁。
public static void main(String[] args) { ReenDemo reenDemo = new ReenDemo(); new Thread(reenDemo::get,"t1").start(); new Thread(reenDemo::get,"t2").start(); } public synchronized void get(){ System.out.println(Thread.currentThread().getName()+":get"); set(); } public synchronized void set(){ System.out.println(Thread.currentThread().getName()+":set"); }
Lock lock = new ReentrantLock(); public static void main(String[] args) { ReenDemo reenDemo = new ReenDemo(); new Thread(reenDemo::get,"t1").start(); new Thread(reenDemo::get,"t2").start(); } public void get(){ lock.lock(); lock.lock();//写多个,只要一一对应,执行成功;少写一个也不会编译失败,只是运行失败 try { System.out.println(Thread.currentThread().getName()+":get"); set(); } finally { lock.unlock(); lock.unlock(); } } public void set(){ lock.lock(); try { System.out.println(Thread.currentThread().getName()+":set"); } finally { lock.unlock(); } }
自旋锁(spinlock)
指尝试获取锁失败的线程不会阻塞,而是采用循环的方式获取锁。减少线程上下文切换的消耗,但是会消耗CPU。CAS中
public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void myLock() { Thread thread = Thread.currentThread(); System.out.println(thread.getName() + " come in"); while (!atomicReference.compareAndSet(null, thread)) { } } public void myUnLock() { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread, null); System.out.println(Thread.currentThread().getName() + " invoked myUnLock"); } public static void main(String[] args) { SpinLockDemo spinLock = new SpinLockDemo(); new Thread(() -> { spinLock.myLock(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } spinLock.myUnLock(); },"T1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { spinLock.myLock(); spinLock.myUnLock(); },"T2").start(); } }
独占锁(写锁)、共享锁(读锁)、互斥锁
- 独占锁:该锁只能被一个线程所持有。Reen和Syn都是
- 共享锁:该锁可以被多个线程所持有
ReentrantReadWriteLock
:多个线程同时读一个资源可以并发执行,有一个线程需要写该资源时,其他线程不能读写//模拟一个缓存 public class MyCache { //一般主内存中资源需要volatile修饰,底层很多都是这样写的 private volatile Map<String, Object> map = new HashMap<>(); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public void put(String key, Object value) { lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + ":正在写入" + key); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName() + ":写入完成"); } finally { lock.writeLock().unlock(); } } public void get(String key) { lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "正在读取"); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } Object result = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成:" + result); } finally { lock.readLock().unlock(); } } public static void main(String[] args) { MyCache cacheDemo = new MyCache(); for (int i = 0; i < 5; i++) { int temp = i; new Thread(() -> { cacheDemo.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } for (int i = 0; i < 5; i++) { int temp = i; new Thread(() -> { cacheDemo.get(temp + ""); }, String.valueOf(i)).start(); } } }
6、CountDownLatch、CyclicBarrier、Semaphore 谈谈
CountDownLatch:计数为0阻塞解除,开始执行。类似倒计时
public enum CountryEnum { ONE(1, "韩"), TWO(2, "赵"), THREE(3, "魏"), FOUR(4, "楚"), FIVE(5, "燕"), SIX(6, "齐"); public static CountryEnum getInstance(int index){ for (CountryEnum countryEnum : CountryEnum.values()) { if (index == countryEnum.getCode()){ return countryEnum; } } return null; } private Integer code; private String country; public Integer getCode() { return code; } public String getCountry() { return country; } CountryEnum(Integer code, String country) { //访问修饰符默认且必须为private this.code = code; this.country = country; } }
public class CountDownLatchDemo { private static final int LATCH_COUNT = 6; public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(LATCH_COUNT); for (int i = 1; i <= 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "国被灭"); countDownLatch.countDown(); }, CountryEnum.getInstance(i).getCountry()).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + ":秦一统天下"); } }
CyclicBarrier(循环/屏障):计数器到指定数字,被阻塞的线程才开始执行
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("我是神龙,请说出你的愿望!")); for (int i = 1; i <= 7; i++) { int finalI = i; new Thread(() -> { System.out.println(Thread.currentThread().getName()+"收集到第"+ finalI +"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
Semaphore(信号量,可以替代Syn和Lock):用于多个共享资源的互斥使用;用于并发线程数的控制
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);//三个车位,为1则可以替代Syn和Lock for (int i = 1; i <= 6; i++) { //6辆车抢车位 new Thread(() -> { try { semaphore.acquire();//抢到车位,可以带参,抢多个 System.out.println(Thread.currentThread().getName()+"抢到车位"); TimeUnit.SECONDS.sleep(3); //停车三秒 System.out.println(Thread.currentThread().getName()+"停车3秒后离开"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release();//释放车位,可以带参,释放多个 } },String.valueOf(i)).start(); } } }
7、阻塞队列知道吗
阻塞队列
是一个队列,当队列为空时,从队列中获取元素将被阻塞;当队列为满时,往队列中添加元素将被阻塞。类似生产者消费者,但是不需要程序员来写wait、notify来控制!
阻塞队列有好的一面吗?
不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程
在JUC发布之前,程序员需要手动控制这些细节,还要兼顾效率和线程安全,复杂度上升,但使用这个后。。。不用管了
BlockingQueue的方法总结
方法类型 抛出异常 特殊值 阻塞(强大,慎用) 超时 插入 boolean add(e) boolean offer(e) put(e) offer(e.time,unit) 移除(队首元素) E remove() E poll() take() poll(time,unit) 检查(队首元素) Eelement() E peek() 不可用 不可用 抛出异常 当阻塞队列满时,再往队列里add插入元素会抛出IllegalStateException: Queue full
当阻塞队列空时,再往队列里remove移除元素会抛出NoSuchElementException特殊值 插入成功true,失败false;移除时,成功则返回出队列元素,失败则返回null 阻塞 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费线程直到队列可用超时 带等待时间的阻塞 BlockingQueue有如下实现类:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(默认值为Integer.MAX_VALUE)阻塞队列,一般不用
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,即单个元素的队列。有元素必须消费。。。同步队列
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "A").start(); new Thread(() -> { try { //只要不取值,则A线程的下一个put执行不了 TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"B").start(); } }
生产者消费者模式
线程——操作——资源类;判断——干活——通知唤醒;防止虚假唤醒while
class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() { lock.lock(); try { while (number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() { lock.lock(); try { while (number == 0) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class ProducerConsumerDemo1 { public static void main(String[] args) { ShareData shareData = new ShareData(); //生产线程,生产5个 new Thread(() -> { for (int i = 0; i < 5; i++) { shareData.increment(); } }, "A").start(); //消费线程,消费5个 new Thread(() -> { for (int i = 0; i < 5; i++) { shareData.decrement(); } }, "B").start(); } }
阻塞队列版
class MyResource { private volatile boolean FLAG = true; //默认开启,进行生产、消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); //查看传入的具体类 } public void myProducer() throws InterruptedException { String data; boolean result; while (FLAG) { data = atomicInteger.incrementAndGet() + ""; result = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if (result) { System.out.println(Thread.currentThread().getName() + "插入数据" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + "插入数据" + data + "失败"); } TimeUnit.SECONDS.sleep(1L); } System.out.println(Thread.currentThread().getName() + "停了,flag=false,生产结束"); } public void myConsumer() throws InterruptedException { String result; while (FLAG) { result = blockingQueue.poll(2L, TimeUnit.SECONDS); if (result == null || "".equalsIgnoreCase(result)) { FLAG = false; System.out.println(Thread.currentThread().getName() + "超过2秒没取到数据,消费退出"); return; } System.out.println(Thread.currentThread().getName() + "消费数据" + result + "成功"); } } public void stop() { this.FLAG = false; } } public class ProducerConsumerDemo1 { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "生产线程启动"); try { myResource.myProducer(); } catch (InterruptedException e) { e.printStackTrace(); } }, "producer").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "消费线程启动"); try { myResource.myConsumer(); } catch (InterruptedException e) { e.printStackTrace(); } }, "consumer").start(); try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } myResource.stop(); System.out.println(Thread.currentThread().getName() + "BOSS叫停"); } }
JUC、集合总结:
ArrayList
LinkedList
Vector
CopyOnWriteArrayList
ArrayBlockingQueue
LinkedBlockingQueue