[并发实践]基于synchronized的并发机制

2017-01-13 14:59:28来源:csdn作者:qq_24759299人点击

基于synchronized的并发机制

当想要梳理Java并发机制的时候,我首先想到的是synchronized关键字,其次才是线程、锁以及并发、竞争、活跃性等更深层次的理论,这也正是我们学习任一编程语言的常规路径,先实践而后探索原理。

synchronized是Java最基础的并发关键字,在jdk1.5提供concurrent框架之前唯一实现并发控制的途径。以至于每个Javaer都会潜心学习,网上针对synchronized实例、原理解析的posts数不胜数。本文也是其中之一,并尽量写下我对synchronized并发机制的所有了解,希望new javaer读到后能有收获。

1. synchronized基本概念

作为最基础的并发原语,synchronized有两种形式,即并发方法和并发代码块,示例如下。

/**
* 唯一ID生成器
*/
public class AtomID {private long id = 0;public synchronized long gen() {
return ++id;
}public long gen(long incr) {
if (incr <= 0) {
throw new IllegalArgumentException("increment should be gt 0");
}
synchronized (this) {
this.id += incr;
return this.id;
}
}
}

针对同一个AtomID实例,我们会得到如下同步特性和保证:


任意时刻只有一个线程能够进入同步的gen方法,多线程并发时不会出现id重复,即强制的原子性。
两个gen方法能够保证彼此同步, synchronized的非静态方法即是从当前实例(this)上获得锁,与synchronized (this)语义一致。
同步块或方法块退出时,不管中间是否出现异常,都会自动释放当前锁。同时,当前实例状态的变更对后续的线程都是可见的,即heppens-before的可见性。
执行synchronized块或方法时,如果其他线程已经进入,当前线程只能等待,无法终止,即不可中断。
不同的AtomID实例间,不具有上述同步特性。
并发的活跃度,在多线程编程中永远需要细致考量,力求在代码复杂度和性能间寻求平衡,也就是说synchronized块要尽可能小,但也不要过小而破坏并发安全性。示例中,增量gen方法对于入参校验没必要放在同步块中,对于非法请求可以最快返回,从而提高并发性能。

此外,synchronized static方法是在对象的Class实例上获得锁,语义上与synchronized (AtomID.class)一致。我们知道Class信息位于JVM的方法区中(HotSpot的pemgen),唯一而线程共享,因此相对来说是更全局的安全控制。

1.1 监视器

Java中每个对象都可以作为一把锁来用,这个锁就是所谓的监视器1。监视器在JVM内部实现,是synchronized的基础,只有持有监视器的线程才能执行同步块或方法,执行完成后自动释放监视器。 监视器的自动释放通过同步块的字节码可以清楚地看到,以gen(long incr)方法为例:

// Method descriptor #20 (J)J
// Stack: 5, Locals: 4
public long gen(long incr);
0lload_1 [incr]
// 1-15,入参校验,省略
16aload_0 [this] // 从局部变量0中加载当前实例引用到栈顶
17dup // 从栈顶复制当前实例引用
18astore_3 // 存储到局部变量3中
19monitorenter // **进入监视器,必要的话等待**
20aload_0 [this]
21dup
22getfield AtomID.id : long [12]
25lload_1 [incr]
26ladd
27putfield AtomID.id : long [12]
30aload_0 [this]
31getfield AtomID.id : long [12]
34aload_3 // 从局部变量3中加载当前实例引用
35monitorexit // **退出监视器**
36lreturn // 正常返回
37aload_3 // 从局部变量3中加载当前实例引用
38monitorexit // **退出监视器**
39athrow // 抛出异常
Exception Table: // 异常表
[pc: 20, pc: 36] -> 37 when : any // 20-36行发生任何异常,执行37行
[pc: 37, pc: 39] -> 37 when : any
Line numbers:
......

获得监视器的唯一途径,就是进入synchronized的方法或代码块。而释放监视器主要有下面三种情况:


同步块或方法执行完成,正常退出。
同步块或方法执行失败,抛出异常。
执行了一个等待命令,Object.wait(long timeout)。

并发编程最基本的就是线程互斥,而监视器是Java实现互斥的基础,在Java Language层面通过synchronized关键字体现。前面示例AtomID,即是通过互斥实现了唯一ID的生成。

1.2 可重入

在字符串操作中,StringBuffer是我们常用的工具,并且是线程安全的,其中substring方法摘录如下:

@Override
public synchronized String substring(int start) {
return substring(start, count);
}@Override
public synchronized String substring(int start, int end) {
return super.substring(start, end);
}

两个方法都是同步的,substring(int start)调用了第二个方法,前后两次获取了当前实例的监视器,并不会因此造成阻塞,程序支持运行。这就是synchronized的可重入性。 可重入——线程可以再次获得自身已经持有的锁,意味着对锁持有状态的更新是根据线程来的,而不是根据调用。JVM会维护锁的当前持有者及其持有计数,当锁的释放次数等于持有计数时即会彻底释放锁。


1、在substring这个示例中,稍作修改(return super.substring(start, count);),即可减少不必要的锁重入操作,并发编程中可以关注。 2、StringBuffer的线程安全特性,实际项目中你是否用对了场景呢?


1.3 互斥、自旋与公平

监视器,在同一时刻只能有一个线程获得,实际上就是一种互斥锁(Mutual exclusion lock)。 当线程尝试进入监视器时,若监视器正被其他线程持有,则当前线程必须等待并一直处于暂停状态,只到持有线程释放了它。当前线程暂停而不是一直循环以尝试获取,因此不属于自旋锁(Spin lock)。 相对而言,concurrent框架中的ReadWriteLock及StampedLock,读锁不互斥,多线程可以并发读,并发效率更优。而concurrent也广泛应用了自旋机制。

此外,我们知道ReentrantLock有公平、非公平之分,所谓公平锁即是等待时间最长的线程优先获得锁,也即FIFO策略。对于synchronized来说,JVM规范中并未明确要求,依赖于JVM的具体实现。 针对HotSpot虚拟机的测试,证明其synchronized的非公平性。测试代码如下:

public class NofairSync implements Runnable {private static Object LOCK = new Object();@Override
public void run() {
Thread th = Thread.currentThread();
System.out.println(th.toString() + " starts...");
synchronized (LOCK) {
try {
LOCK.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(th.toString() + " done");
}public static void main(String[] args) throws Exception {
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
Thread th = new Thread(new NofairSync(), "T-" + i);
th.setPriority(10 - i);
th.start();
threads[i] = th;
}
Thread.sleep(1000);synchronized (LOCK) {
Thread th = Thread.currentThread();
System.out.println(th.toString() + " get lock, and will notify all");
LOCK.notifyAll();
}for (Thread th : threads) {
th.join();
}
System.out.println("all threads done");
}}// 单次执行结果
Thread[T-0,10,main] starts...
Thread[T-2,8,main] starts...
Thread[T-1,9,main] starts...
Thread[main,5,main] get lock, and will notify all
Thread[T-1,9,main] done
Thread[T-0,10,main] done
Thread[T-2,8,main] done
all threads done

从执行结果可知,等待线程并非按FIFO的顺序被唤醒。

1.4 死锁与活锁

当两个线程,彼此等待对方持有的锁时,即会出现死锁。当有多个线程时,这种相互等待形成一个环时,也会出现死锁。synchronized的死锁极易模拟。

活锁,是指两个或更多的线程,彼此之前过于积极响应,以至于总是忙着彼此响应而不能推进工作。比如两个相向而行的过路人,A为了避开B而向右靠时,B为了避开A向左靠,然后A又向左靠B又向右靠,以至于彼此不能通行。活锁的例子在实际项目中并不多见。

此外,还有一种饿死现象,即长耗时的synchronized块或方法一直被频繁调用,以至于其他小方法不能正常获得监视器,一直处于等待状态,如同饿死。

2 基于synchronized的并发机制

互斥和协作,是并发机制的两面。理解好这两点也就掌握了并发编程的核心所在。 基于synchronized的并发机制中,使用synchronized来实现线程互斥,使用wait和notify来促成线程协作。从而使得多线程间,不仅可以安全无误的共享数据,而且可以协同工作以实现统一目标。

2.1 线程互斥——synchronized

在JAVA并发编程中,通过synchronized关键字来实现线程互斥,是最常见的方法。在JDK的API中也随处可见,例如Thread中用于生产默认线程编号的一段代码:

/* For autonumbering anonymous threads. */
private static int threadInitNumber;
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}

而设计模式中的单例模式(相信是我们最为熟悉的模式了),DCL版的实现也是通过synchronized保证线程互斥,示例如下:

// final类,以免继承破坏
public final class Singleton {// 私有构造子,以免外部破坏
private Singleton() {
}private static Singleton INSTANCE;// double-check locking
// 外层校验意在减少对同步块的调用,提升性能;
// 内层校验意在保证并发正确性,当多个线程通过外层校验先后进入同步块后,避免重复创建实例。
public static Singleton getInstance() {
if (INSTANCE == null) {
synchronized (Singleton.class) {
if (INSTANCE == null) {
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
……
}2.1.1 互斥的优化

通过synchronized实现互斥可谓简单直接,但并发编程中我们更应该考虑系统整体的吞吐量,也即上面提到的并发活跃度。 为了提升互斥的并发性能,所有的优化方案目标均在于减少互斥。看似矛盾,实则自然。列举几个可行的方案:


粒度细化,如果互斥不可避免,那么精简同步块大小,通常能够取得不错的性能提升,也很容易做到。比如分拆参数校验,让非法调用者可以更快返回,而不用多余的同步,就如前面的gen(long incr)方法所示。同样,对于独立的子业务逻辑及结果处理等,也不用一定包在同步块之中。细化互斥粒度,是互斥优化首先要考虑的。


多重校验, 通过添加多重校验,使得大部分调用者在尚未进入同步块之前就完成对并发状态或结果的外层校验,只有小部分线程需要真正竞争锁,并在通过内层校验后完成实际处理操作。从而降低减少线程之间的互斥消耗,一如前面的DCL的单例所示。


线程私有, 在某些情况下,可考虑对共享数据(或部分)进行私有化,一旦私有化即可开放访问,极大地减少互斥,可谓捷径。线程私有化有两种方式,一是通过使用局部变量,我们知道局部变量是存放在JVM栈中的,而栈是线程私有的,一是使用ThreadLocal工具,通过ThreadLocal存取数据都是针对当前线程的,不会出现冲突。


乐观更新,对涉及共享数据的整个处理过程加锁,最为安全但消耗也最大。而乐观地处理并发,只要定好约束条件并做好failover,也能适用于多数业务场景。所谓乐观更新主要有三点,最初通过互斥方式访问共享数据,中间无需加锁,进行各种业务逻辑处理,最后在满足约束条件下更新结果(通常也是互斥的),如果不能满足则进行失败处理。


架构分拆,在一般性的项目中,其实较少涉及并发控制,如果有也是比较粗浅的应用,因为常见的架构中已有考虑。比如MVC,SpringMVC或Struts,Controller将我们从Servlet的非线程安全中解救了出来,Service通常都是无状态的不可变的,更多是通过局部变量来传递状态也就线程安全,而对于DAO,更多是依赖底层数据库的事务来解决互斥的问题。


此外,为了提升互斥性能,对于锁本身的选择和应用也需要考虑。JDK1.5之后concurrent中所提供的显示锁、读写锁、原子类型等,都具有更好的活跃度。

2.2 线程协作——wait与notify/notifyAll

在并发编程中,出于单一职责的设计考虑,不同的线程应只承担一个明确的职责,并相互协作以完成整体工作。因而会出现责任链式的向前(或相互)依赖,依赖线程需要在约束条件满足时才继续执行,不满是就需要等待,而被依赖线程的工作会促成约束条件的成立,执行完成后需要唤醒等待中的线程。 与synchronized配套的wait、notify函数,分别处理上述的线程等待与唤醒。具体功能如下:


Object.wait,促使当前线程等待,释放Object监视器,将自身置于其等待区,直到被唤醒或者等待时间到。释放CPU资源,并休眠不再参与线程调度。当前线程持有的其他监视器将不会被释放。
Object.notify,随机唤醒一个等待中的线程,被唤醒的线程将与其他活跃线程一同竞争该监视器。notify执行后,当前线程并不会立即释放该监视器,除非同步快或方法执行完成(或异常)。
Object.notifyAll,唤醒所有等待线程,与其他活跃线程一同竞争该监视器。同样notifyAll后,直到同步快或方法执行完后才会释放监视器。

此外,对比Thread.sleep也会使当前线程休眠,但不会释放任何监视器,这是两者的最大不同。

线程协作的经典案例当属生产者-消费者模式了,一个有总限额且支持多生产者多消费者的示例如下:

public class MessageBox {private static int bufferSize = 3;
private static Queue<String> msgQueue = new LinkedList<String>();
private static int totalSize = 8;
private static int producedCount = 0, consumeCount = 0;static class Producer implements Runnable {@Override
public void run() {
while (true) {
synchronized (msgQueue) {
boolean done = false;
while (msgQueue.size() >= bufferSize) {
if ((done = isDone())) {
break;
}
try {
System.out.println(Thread.currentThread() + " 消息队列满,生产者等待");
// 等待,释放当前监视器msgQueue
msgQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (done || isDone()) {
// 唤醒其他等待线程
msgQueue.notify();
break;
}
String message = "msg-" + System.currentTimeMillis();
msgQueue.add(message);
producedCount++;
System.out.println(Thread.currentThread() + " 生产消息:" + message);
// 唤醒其他等待线程
msgQueue.notify();
}
}
}private boolean isDone() {
if (producedCount == totalSize) {
System.out.println(Thread.currentThread() + " 生产完成");
return true;
}
return false;
}}static class Consumer implements Runnable {@Override
public void run() {
while (true) {
synchronized (msgQueue) {
boolean done = false;
while (msgQueue.isEmpty()) {
if ((done = isDone())) {
break;
}
try {
System.out.println(Thread.currentThread() + " 消息队列空,消费者等待");
// 等待,释放当前监视器msgQueue
msgQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (done || isDone()) {
// 唤醒其他等待线程
msgQueue.notify();
break;
}
String message = msgQueue.poll();
consumeCount++;
System.out.println(Thread.currentThread() + " 消费消息:" + message);
// 唤醒其他等待线程
msgQueue.notify();
}
}
}private boolean isDone() {
if (consumeCount == totalSize) {
System.out.println(Thread.currentThread() + " 消费完成");
return true;
}
return false;
}}public static void main(String[] args) {
Thread producer = new Thread(new Producer(), "P1");
Thread consumer = new Thread(new Consumer(), "C1");Thread producer2 = new Thread(new Producer(), "P2");
Thread consumer2 = new Thread(new Consumer(), "C2");producer.start();
producer2.start();
consumer.start();
consumer2.start();
}}

某次运行结果如下:

Thread[P1,5,main] 生产消息:msg-1475771363376
Thread[P1,5,main] 生产消息:msg-1475771363376
Thread[P1,5,main] 生产消息:msg-1475771363376
Thread[P1,5,main] 消息队列满,生产者等待
Thread[C1,5,main] 消费消息:msg-1475771363376
Thread[C1,5,main] 消费消息:msg-1475771363376
Thread[C1,5,main] 消费消息:msg-1475771363376
Thread[C1,5,main] 消息队列空,消费者等待
Thread[P2,5,main] 生产消息:msg-1475771363376
Thread[P2,5,main] 生产消息:msg-1475771363376
Thread[P2,5,main] 生产消息:msg-1475771363376
Thread[P2,5,main] 消息队列满,生产者等待
Thread[C2,5,main] 消费消息:msg-1475771363376
Thread[C2,5,main] 消费消息:msg-1475771363376
Thread[C2,5,main] 消费消息:msg-1475771363376
Thread[C2,5,main] 消息队列空,消费者等待
Thread[P2,5,main] 生产消息:msg-1475771363376
Thread[P2,5,main] 生产消息:msg-1475771363376
Thread[P2,5,main] 生产完成
Thread[C2,5,main] 消费消息:msg-1475771363376
Thread[C2,5,main] 消费消息:msg-1475771363376
Thread[C2,5,main] 消费完成
Thread[C1,5,main] 消费完成
Thread[P1,5,main] 生产完成

再者,在线程协作时,死锁、活锁、饿死等问题是我们需要重点考虑而加以避免的,在下面的示例中会有更深入的体会。

3 基于synchronized的并发实践

为了更好地理解互斥与协作,让我们再多看两个经典示例。这两个例子网上也随处可见,相信大多读者都不陌生,希望这里给出的解答更加准确精细。

3.1 三线程打印ABC

原题:有三个线程,A、B和C,循环打印各自的名字,如ABCABCABC……,各打印十遍。

解题一:由于是循环打印,同一时刻只会有一个线程在工作,因此使用一把锁即可实现。同时设置一个打印状态,状态为A即A线程工作,否则A线程等待,为B时B线程工作否则等待,为C时C线程工作否则等待。逻辑简洁也最容易实现,代码如下:

public class ABC implements Runnable {private static final int COUNT = 10;
private static final Object LOCK = new Object();
private static String flag = "A";private String name;
private String next;public ABC(String name, String next) {
this.name = name;
this.next = next;
}@Override
public void run() {
int i = 0;
while (i < COUNT) {
synchronized (LOCK) {
// 检查是否轮到当前线程工作,否则等待
while (!name.equals(flag)) {
// System.out.print("[" + name + "]");
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 打印当前线程名称,并设置打印状态
System.out.print(name + (name.equals("C") ? '/n' : ' '));
flag = next;
// 唤醒其他线程,有可能唤醒的线程并非我们所想的
LOCK.notifyAll();
}
i++;
}
}public static void main(String[] args) {
// 设置打印状态,从'A'开始
ABC.flag = "A";
Thread a = new Thread(new ABC("A", "B"));
Thread b = new Thread(new ABC("B", "C"));
Thread c = new Thread(new ABC("C", "A"));a.start();
b.start();
c.start();
}}

执行结果如下:

A B C
A B C
A B C
A B C
A B C
A B C
A B C
A B C
A B C
A B C

代码实现简洁明了,也能输出正确结果,但却并不理想,只要打开第22行的注释再执行一遍,就会看到无效线程调度非常频繁,因为线程唤醒是无序的。

解题二:保证线程按A->B->C->A的顺序循环唤醒并打印各自名称,保证线程调度的有效性。因此需要每个线程一把锁,打印完成后唤醒指定的下一个线程。代码如下:

public class ABC2 implements Runnable {private static final int COUNT = 10;
private static String flag = "A";private String name;private final Object LOCK = new Object();
private ABC2 next;public ABC2(String name) {
this.name = name;
}@Override
public void run() {
int i = 0;
while (i < COUNT) {
i++;
// 获得当前线程的锁
synchronized (LOCK) {
// 检查是否轮到当前线程工作,否则等待
while (!name.equals(flag)) {
try {
// System.out.print("[" + name + "]");
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获得下一个线程的锁,并进行打印
synchronized (next.LOCK) {
System.out.print(name + (name.equals("C") ? '/n' : ' '));
// 设置下一个线程的打印状态
flag = next.name;
// 唤醒下一个线程
next.LOCK.notify();
}
// 打印完后,让当前线程直接等待,等待上一个线程唤醒
// 如果是最后一轮打印,不用再等待,否则线程无法正常结束
if (i < COUNT) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}public static void main(String[] args) {
// 设置打印状态,从'A'开始
ABC2.flag = "A";
ABC2 a = new ABC2("A");
ABC2 b = new ABC2("B");
ABC2 c = new ABC2("C");
a.next = b;
b.next = c;
c.next = a;new Thread(a).start();
new Thread(b).start();
new Thread(c).start();
}}3.2 哲学家就餐问题

哲学家就餐问题2是讨论线程同步的经典示例,大意是n个哲学家同桌吃饭,每两个哲学家之间有一把餐叉,设定哲学家只能拿到左右两把叉子才能吃饭,否则就思考。

此题与前面三线程打印ABC的不同之处在于并不限定就餐顺序,且可能有多个哲学家同时就餐。可以让哲学家自由竞争左右的餐叉,拿到的就开始就餐,没拿到就等待,就餐之后就思考。 这里需要重点考虑的是死锁,如果所有哲学家都先拿左侧餐叉再拿右侧的,就会构成一个获取锁的,即会死锁。可以让偶数位哲学家先拿左侧的,奇数位哲学家先拿右侧的,也就将锁按照奇偶分拆两份,必不会死锁。代码如下:

public class Philosopher implements Runnable {private static final long MILLIS = 60 * 1000 + 1;private int number;
private Object leftFork;
private Object rightFork;public Philosopher(int number, Object leftFork, Object rightFork) {
super();
this.number = number;
this.leftFork = leftFork;
this.rightFork = rightFork;
}@Override
public void run() {
long start = System.currentTimeMillis();
long eatTimes = 0;
long thinkTimes = 0;
// 执行一段时间后退出,以便统计就餐、思考的次数
while (System.currentTimeMillis() - start < MILLIS) {
// 就餐
// 偶数位哲学家先拿左侧餐叉
if (number % 2 == 0) {
synchronized (leftFork) {
synchronized (rightFork) {
this.eat();
}
}}
// 奇数位哲学家先拿右侧餐叉
else {
synchronized (rightFork) {
synchronized (leftFork) {
this.eat();
}
}
}
eatTimes++;
// 思考
this.think();
thinkTimes++;
}
System.out.println(Thread.currentThread().getName() + ": eat " + eatTimes + ", think " + thinkTimes);
}// 模拟思考
private void think() {
String name = Thread.currentThread().getName();
System.out.println(name + ": start thinking");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ": stop thinking");
}// 模拟就餐
private void eat() {
String name = Thread.currentThread().getName();
System.out.println(name + ": start eating");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ": stop eating");
}public static void main(String[] args) {
int count = 5;
Object[] forks = new Object[count];
for (int i = 0; i < count; i++) {
forks[i] = new Object();
}
for (int i = 0; i < count; i++) {
Thread phil = new Thread(new Philosopher(i, forks[i], forks[(i + 1) % count]), "p" + i);
phil.start();
}
}}

某次运行结果如下所示:

……
p2: start thinking
p3: stop thinking
p4: stop eating
p4: start thinking
p3: start eating
p0: stop thinking
p0: start eating
p0: stop eating
p3: stop eating
p0: start thinking
p2: stop thinking
p2: eat 218, think 218
p3: start thinking
p1: start eating
p4: stop thinking
p4: eat 211, think 211
p3: stop thinking
p3: eat 212, think 212
p0: stop thinking
p0: eat 248, think 248
p1: stop eating
p1: start thinking
p1: stop thinking
p1: eat 218, think 218

此外,还有其他多种解法,这里不再累述。

参考: 1. Intrinsic Locks and Synchronization 2. 深入Java虚拟机,Bill Venners 3. JAVA并发编程.设计原则与模式,Doug Lea 4. JAVA并发编程实践,Brian Goetz, Tim Peierls等

大多翻译为内置锁或内部锁,这里就用监视器,毕竟JDK API中monitor的叫法随处可见 ↩哲学家就餐问题是用来描述并发编程、线程同步的经典问题,原题描述参见百科词条 ↩

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台