死锁
- DeadLockSample
package deadLock;
public class DeadLockSample extends Thread {
private String first;
private String second;
public DeadLockSample(String name, String first, String second) {
super(name);
this.first = first;
this.second = second;
}
public void run() {
synchronized (first) {
System.out.println(this.getName()+this.getId() + " obtained: " + first);
try {
Thread.sleep(1000L);
synchronized (second) {
System.out.println(this.getName() + " obtained: " + second);
}
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
long pid = ProcessHandle.current().pid();
System.out.println("pid:"+pid);
String lockA = "lockA";
String lockB = "lockB";
DeadLockSample t1 = new DeadLockSample("Thread1", lockA, lockB);
DeadLockSample t2 = new DeadLockSample("Thread2", lockB, lockA);
t1.start();
t2.start();
t1.join();
t2.join();
}
}
复制代码
- DeadLockSampleV2
package deadLock;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DeadLockSampleV2 extends Thread {
private String first;
private String second;
public DeadLockSampleV2(String name, String first, String second) {
super(name);
this.first = first;
this.second = second;
}
public void run() {
synchronized (first) {
System.out.println(this.getName() + this.getId() + " obtained: " + first);
try {
Thread.sleep(1000L);
synchronized (second) {
System.out.println(this.getName() + " obtained: " + second);
}
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
long pid = ProcessHandle.current().pid();
System.out.println("pid:" + pid);
ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
Runnable dlCheck = new Runnable() {
@Override
public void run() {
long[] threadIds = mbean.findDeadlockedThreads();
if (threadIds != null) {
ThreadInfo[] threadInfos = mbean.getThreadInfo(threadIds);
System.out.println("Detected deadlock threads:");
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadName());
}
}
}
};
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(dlCheck, 5L, 10L, TimeUnit.SECONDS);
String lockA = "lockA";
String lockB = "lockB";
DeadLockSampleV2 t1 = new DeadLockSampleV2("Thread1", lockA, lockB);
DeadLockSampleV2 t2 = new DeadLockSampleV2("Thread2", lockB, lockA);
t1.start();
t2.start();
t1.join();
t2.join();
}
}
复制代码
如何预防死锁?
- 尽量避免使用多个锁,并且只有需要时才持有锁
- 如果必须使用多个锁,尽量设计好锁的获取顺序
- 辅助手法
- 使用图的方式表达
- 对象之间组合、调用的关系对比和组合,考虑可能调用时序。
- 按照可能时序合并,发现可能死锁的场景。
- 辅助手法
- 使用带超时的方法
if (lock.tryLock() || lock.tryLock(timeout, unit)) {
}
复制代码
- 通过静态代码分析
并发工具
Semaphore
package conCurrentTool;
import java.util.concurrent.Semaphore;
public class UsualSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
System.out.println("Action...GO!");
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new SemaphoreWorker(semaphore));
t.start();
}
}
}
复制代码
AbnormalSemaphore
package conCurrentTool;
import java.util.concurrent.Semaphore;
public class AbnormalSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new MyWorker(semaphore));
t.start();
}
System.out.println("Action...GO!");
semaphore.release(5);
System.out.println("Wait for permits off");
while (semaphore.availablePermits() != 0) {
Thread.sleep(100L);
}
System.out.println("Action...GO again!");
semaphore.release(5);
}
}
class MyWorker implements Runnable {
private Semaphore semaphore;
public MyWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
SemaphoreWorker
package conCurrentTool;
import java.util.concurrent.Semaphore;
public class SemaphoreWorker implements Runnable{
private String name;
private Semaphore semaphore;
public SemaphoreWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
log("is waiting for a permit!");
semaphore.acquire();
log("acquired a permit!");
log("executed!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log("released a permit!");
semaphore.release();
}
}
private void log(String msg) {
if (name == null) {
name = Thread.currentThread().getName();
}
System.out.println(name + " " + msg);
}
}
复制代码
LatchSample
package conCurrentTool;
import java.util.concurrent.CountDownLatch;
public class LatchSample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new FirstBatchWorker(latch));
t.start();
}
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new SecondBatchWorker(latch));
t.start();
}
while ( latch.getCount() != 1 ){
Thread.sleep(100L);
}
System.out.println("Wait for first batch finish");
latch.countDown();
}
}
class FirstBatchWorker implements Runnable {
private CountDownLatch latch;
public FirstBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("First batch executed!");
latch.countDown();
}
}
class SecondBatchWorker implements Runnable {
private CountDownLatch latch;
public SecondBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Second batch executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
队列
package queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LinkedBlockingQueue {
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
public static void main(String[] args) {
}
}
复制代码
队列使用场景与典型用例
package queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConsumerProducer {
public static final String EXIT_MSG = "Good bye!";
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
static class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try{
Thread.sleep(5L);
String msg = "Message" + i;
System.out.println("Produced new item: " + msg);
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println("Time to say good bye!");
queue.put(EXIT_MSG);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q){
this.queue=q;
}
@Override
public void run() {
try{
String msg;
while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
System.out.println("Consumed item: " + msg);
Thread.sleep(10L);
}
System.out.println("Got exit message, bye!");
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
Java
并发类库提供的线程池
-
newCachedThreadPool
- 处理大量短时间工作任务的线程池
-
newFixedThreadPool
- 其背后使用的是无界的工作队列,任何时候最多有
nThreads
个工作线程是活动的
- 其背后使用的是无界的工作队列,任何时候最多有
-
newSingleThreadExecutor
- 它的特点在于工作线程数目被限制为 1
-
newSingleThreadScheduledExecutor
- 进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程
-
newWorkStealingPool
- 并行地处理任务,不保证处理顺序。
线程池的几个基本组成部分
-
corePoolSize
,所谓的核心线程数,可以大致理解为长期驻留的线程数目(除非设置了 allowCoreThreadTimeOut) -
maximumPoolSize
,顾名思义,就是线程不够时能够创建的最大线程数 -
keepAliveTime
和TimeUnit
,这两个参数指定了额外的线程能够闲置多久,显然有些线程池不需要它。 -
workQueue
,工作队列,必须是BlockingQueue
构造函数的配置:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
状态如何表征:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
…
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
复制代码
完整代码请见 ExecuteMethod.java
,仅供参考
实践
- 避免任务堆积
- 排查工具 :
jmap
- 排查工具 :
- 避免过度扩展线程
- 在处理大量短时任务时,使用缓存的线程池
- 线程泄漏
- 任务逻辑有问题,导致工作线程迟迟不能被释放。
- 避免死锁等同步问题
- 尽量避免在使用线程池时操作
ThreadLocal
线程池大小选择
- 通常建议按照
CPU
核的数目N
或者N+1
。 - 较多等待的任务
Brain Goetz
推荐的计算方法:- 根据采样或者概要分析等方式进行计算,然后在实际中验证和调整。```
线程数 = CPU核数 × 目标CPU利用率 ×(1 + 平均等待时间/平均工作时间)复制代码
- 根据采样或者概要分析等方式进行计算,然后在实际中验证和调整。```
- 实际还可能受各种系统资源限制影响
- 很多时候架构上的改变更能解决问题
AtomicInteger
底层实现原理
CAS
表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS
指令试图进行更新 否则,可能出现不同的选择,要么进行重试,要么就返回一个成功或者失败的结果。
场景
如何在数据库抽象层面实现,只有一个线程能够排他性地修改一个索引分区?
- 可以考虑为索引分区对象添加一个逻辑上的锁:
public class AtomicBTreePartition {
private volatile long lock;
public void acquireLock(){}
public void releaseeLock(){}
}
复制代码
JAVA
提供的公共API
- AtomicLongFieldUpdater
private static final AtomicLongFieldUpdater<AtomicBTreePartition> lockFieldUpdater =
AtomicLongFieldUpdater.newUpdater(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!lockFieldUpdater.compareAndSet(this, 0L, t)){
…
}
}
复制代码
- Variable Handle API
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)){
…
}
}
复制代码
理解 AQS
的原理与应用
-
原理 一种同步结构往往是可以利用其他的结构实现的
- 可以使用 Semaphore 实现互斥锁
- 对某种同步结构的倾向,会导致复杂、晦涩的实现逻辑
Doug Lea
将基础的同步相关操作抽象在AbstractQueuedSynchronizer
中
-
AQS
内部数据和方法- 一个
volatile
的整数成员表征状态 FIFO
等待线程队列- 基于
CAS
的基础操作方法 - 两个基本类型的方法
acquire
操作,获取资源的独占权release
操作,释放对某个资源的独占
- 一个
-
示例
ReentrantLock
package conCurrentTool; 复制代码
/**
- @javaVersion 14
- public final void acquire(int arg) {
-
if (!tryAcquire(arg)) 复制代码
-
acquire(null, arg, false, false, false, 0L); 复制代码
-
} 复制代码
- /
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class ReentrantLockCase1 { private final Sync sync;
public ReentrantLockCase1(Sync sync) {
this.sync = sync;
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer { }
复制代码
}
`ReentrantLock` 中的 `tryAcquire` 实现:
- `NonfairSync` 和 `FairSync`
`AQS` 内部 `tryAcquire` 仅仅是个接近未实现的方法(直接抛异常)
```java
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
公平性在 ReentrantLock
构建时:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
里体现了非公平的语义:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
当前线程会被包装成为一个排他模式的节点(EXCLUSIVE
),通过 addWaiter
方法添加到队列中。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
复制代码
近期评论