Java学习笔记--并行API

2017-01-14 08:43:37来源:CSDN作者:championhengyi人点击

第七城市

使用Thread建立多线程程序,必须亲自处理synchronized,对象锁定,wait方法,notify方法,notifyAll方法等细节。如果需要的是线程池,读写锁等高级操作,从JDK5之后提供了java.util.concurrent包,可基于其中的API建立更稳固的并行应用程序。

对于线程池的总结,在后面我会补上,如今我就先总结一下读写锁。


Lock,ReadWriteLock与Condition

synchronized要求线程必须取得对象锁定,才执行所标示的区块范围,然而使用synchronized有许多的限制,未取得锁定的线程会直接被阻断,如果我们希望线程能够尝试取得锁定,无法取得锁定时就先做其他事情,直接使用synchronized就会很麻烦。此时java.util.concurrent.locks包中提供的Lock,ReadWriteLcok,condition接口以及操作类就会派上用场。


使用Lock

Lock接口的主要操作类是ReentrantLock,可以达到synchronized的作用,我们来看看怎么使用它改写ArrayList为具线程安全的类。

import java.util.Arrays;import java.util.concurrent.locks.*;public class ArrayList <E>{    private Lock lock = new ReentrantLock();       //使用ReentrantLock    private Object[] elems;    private int next = 0;    public ArrayList(int capacity){        elems = new Object[capacity];    }    public ArrayList(){        this(16);    }    public void add(E elem){        lock.lock();    //进行锁定        try{            if(next == elems.length){                elems = Arrays.copyOf(elems, elems.length* 2);            }            elems[next++] = elem;        }finally {            {                lock.unlock();       //解除锁定            }        }    }    public E get(int index){        lock.lock();        try {            return (E) elems[index];        }finally {            lock.unlock();        }    }    public int size(){        lock.lock();        try{            return next;        }finally {            lock.unlock();        }    }}

ReentrantLock表示如果已经有线程取得Lock对象锁定,尝试再次锁定同一个Lock对象是可以的。想要锁定Lock对象,可以调用其lock方法,只有取得Lock对象锁定的线程,才可以继续往后执行程序代码,要解除其锁定,可以调用Lock对象的unlock方法。

为了避免Lock对象的lock方法后,在后续执行流程中抛出异常而无法解除锁定,一定要在finally中调用Lock对象的unlock方法。

Lock接口还定义了tryLock方法,如果线程调用tryLock方法成功的取得了锁定会返回true,若无法取得锁定并不会发生阻断,而是返回false,我们可以使用这个方法来解决一下上篇博客出现的线程“死结”问题。

import java.util.concurrent.locks.*;class Resource{    private ReentrantLock lock = new ReentrantLock();  //操作ReentrantLock    private String name;    Resource(String name){        this.name = name;    }    void cooperate(Resource res){        while(true){            try{                if(lockMeAnd(res)){      //获得目前与传入的Resource的Lock锁定                    System.out.printf("%s 整合 %s 的资源/n", this.name, res.name);                    break;                }            }finally {                unLockMeAnd(res);            }        }    }    private boolean lockMeAnd(Resource res){       //当线程同时获得两个锁定时才可以执行下面的程序        return this.lock.tryLock() && res.lock().tryLock();    }    private void unLockMeAnd(Resource res){        //同时解除所有的锁        if(this.lock.isHeldByCurrentThread()){            this.lock.unlock();        }        if(res.lock.isHeldByCurrentThread()){            res.lock.unlock();        }    }}public class NoDeadLockDemo {    public static void main(String[] args){        Resource res1 = new Resource("Resource1");        Resource res2 = new Resource("Resource2");        Thread thread1 = new Thread(() -> {            for(int i = 0; i < 10; i++){                res1.cooperate(res2);            }        });        Thread thread2 = new Thread(() -> {            for(int i = 0; i < 10; i++){                res2.cooperate(res1);            }        });        thread1.start();        thread2.start();    }}

对于前面的死结,由于不能同时取得两个锁定而阻断。既然如此,那么如果不能同时取得两个类的锁定,干脆释放已取得的锁定,就可以解决问题。

改写后的cooperate方法会在while循环中,执行lockMeAnd(res),在该方法中使用目前Resource的Lock的tryLock()尝试取得锁定,以及被传入Resource的Lock的tryLock方法尝试取得锁定,只有两次tryLock方法返回值都是true,也就是两个Resource都取得锁定之后,才进行资源的整合并离开while循环,无论哪个tryLock方法成功,都要在finally中调用unLockMeAnd(res),在该方法中测试并解除锁定。


使用ReadWriteLock

前面设计了线程安全的ArrayList,如果由两个线程都想调用get方法和size方法,由于锁定的关系,其中一个线程只能等待另一个线程解除锁定,无法让两个线程同时调用get方法和size方法。但是这两个方法都只是读取对象状态,并没有变更对象状态,如果只是读取操作,可允许线程同时并行的话,那对读取效率将会有所改善。

ReadWriteLock接口定义了读取锁和写入锁行为,可以使用readLock方法,writeLock方法返回Lock操作对象。ReentrantReadWriteLock是ReadWriteLock的主要操作类,readLock方法会返回ReentrantReadWriteLock.ReadLock实例,writeLock方法会返回ReentrantReadWriteLock.WriteLock实例。

ReentrantReadWriteLock.ReadLock操作了Lock接口,调用其Lock方法,若没有任何ReentrantReadWriteLock.WriteLock调用过Lock方法,也就是没有任何写入锁定时,就可以获得读取锁定

ReentrantReadWriteLock.WriteLock操作了Lock接口,调用其lock方法时,若没有任何ReentrantReadWriteLock.ReadLock或ReentrantReadWriteLock.WriteLock调用过lock方法,也就是没有任何读取和写入锁定时,就可以获得写入锁定

这些概念都和Linux中的读锁和写锁是非常相近的,只是调用的API不一样。

我们可以使用ReadWriteLock改写前面的ArrayList,改进他的读取效率。

import java.util.Arrays;import java.util.concurrent.locks.*;public class ArrayList2<E> {    private ReadWriteLock lock = new ReentrantReadWriteLock();       //使用ReentrantLock    private Object[] elems;    private int next = 0;    public ArrayList2(int capacity){        elems = new Object[capacity];    }    public ArrayList2(){        this(16);    }    public void add(E elem){        lock.writeLock().lock();    //进行锁定        try{            if(next == elems.length){                elems = Arrays.copyOf(elems, elems.length* 2);            }            elems[next++] = elem;        }finally {            {                lock.writeLock().unlock();       //解除锁定            }        }    }    public E get(int index){        lock.readLock().lock();        try {            return (E) elems[index];        }finally {            lock.readLock().unlock();        }    }    public int size(){        lock.readLock().lock();        try{            return next;        }finally {            lock.readLock().unlock();        }    }}

对于这个程序我不在赘述。


使用StampedLock

ReadWriteLock在没有任何读取和写入锁定时,才取得了写入锁定,这是悲观读取

当读取线程很多,写入线程很少的时候,使用ReadWriteLock可能会使线程遭受饥饿问题,也就是写入线程可能迟迟无法竞争到锁定而一直处于等待状态。

JDK8新增了StampedLock类,可以支持乐观读取,也就是若读取线程很多,写入线程很少的情况下,你可以乐观的认为,写入与读取同时发生的几率很少,因此不悲观的使用完全的读取锁定,程序可以查看数据读取之后,是否遭到写入线程的变更,在采取后续的措施(重新读取变更后的数据或抛出例外)。

我们使用StampedLock类来实现ArrayList:

import java.util.Arrays;import java.util.concurrent.locks.*;public class ArrayList3<E> {    private StampedLock lock = new StampedLock();    private Object[] elems;    private int next = 0;    public ArrayList3(int capacity){        elems = new Object[capacity];    }    public ArrayList3(){        this(16);    }    public void add(E elem){        long stamp = lock.writeLock();    //取得写入锁定        try{            if(next == elems.length){                elems = Arrays.copyOf(elems, elems.length* 2);            }            elems[next++] = elem;        }finally {            {                lock.unlockWrite(stamp);       //解除锁定            }        }    }    public E get(int index) {        long stamp = lock.tryOptimisticRead();     //尝试乐观读取锁定        Object elem = elems[index];        if (!lock.validate(stamp))       //查询是否有排他的锁        {            stamp = lock.readLock();     //真正的读取锁定            try {                return (E) elems[index];            } finally {                lock.unlockRead(stamp);            }        }        return (E) elem;    }    public int size(){        long stamp = lock.tryOptimisticRead();        int size = next;        if (!lock.validate(stamp))               {            stamp = lock.readLock();            try {                size = next;            } finally {                lock.unlockRead(stamp);            }        }        return size;    }}

使用writeLock方法取得写入锁定会返回一个long整数代表锁定戳记(stamp),可用于解除锁定或转换为其他锁定。

lock.tryOptimisticRead()不会真正读取锁定,而是返回锁定戳记,如果有其他排他性锁定的话,戳记会是0,validate方法用来验证戳记是不是被其他排他性锁定取得了,如果是的话就返回false,如果戳记是0的话也返回false。然后if验证如果戳记真的被其他排他性锁定取得,则重新使用readLock方法做真正的读取锁定,并在锁定时更新变量,然后解除锁定。如果if不成立,直接返回变量的值。

在validate方法之后发生写入而结果不一致是可能发生的,如果你在意它,应采用完全的锁定。


使用Condition

Condition接口搭配Lock,最基本的用法就是达到Object的wait(),notify(),notifyAll()方法的作用,在之前的生产者,消费者,店员的例子中,店员对象调用wait方法,会造成无论是消费者还是生产者线程都会至店员对象的等待集合。在多个生产者,消费者的情况下,等待集合中会有消费者,生产者线程,调用notify()时,有可能通知到生产者线程,也可能通知到消费者线程,如果在消费者取走产品后,又通知消费者线程,实际上只是让消费者线程再次执行到wait方法而重复进出等待集合罢了。

现在我们有Condition对象,一个Condition对象代表一个等待集合,可以重复调用Lock的newCondition方法,取得多个Condition实例,就代表了有多个等待集合。

所以我们如果有两个等待集合:一个消费者集合,一个生产者集合,消费者只通知生产者等待集合,生产者只通知消费者等待集合,就会比较有效率。

来看一下实际代码:

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Clerk2 {    private int product = -1;    private Lock lock = new ReentrantLock();    private Condition producerCond = lock.newCondition();    private Condition consumerCond = lock.newCondition();    public void setProduct(int product) throws InterruptedException{        lock.lock();        try{            waitIfFull();       //看看店员有没有空间收产品,没有的话就稍后            this.product = product;            System.out.printf("生产者设定 %d /n", this.product);            consumerCond.signal();         //通知等待中的线程(消费者)        }finally {            lock.unlock();        }    }    private void waitIfFull() throws InterruptedException{        while(this.product != -1){         //店员由产品,没有空间            producerCond.await();        }    }    public int getProduct() throws InterruptedException{        lock.lock();        try{            waitIfEmpty();              //看看店员有没有货,没有的话就稍后            int p = this.product;            this.product = -1;           //表示货物被取走            System.out.printf("消费者取走 %d /n", p);            producerCond.signal();                  //通知等待集合中的线程(生产者)            return p;        }finally {            lock.unlock();        }    }    private void waitIfEmpty() throws InterruptedException{        while(this.product == -1){            consumerCond.await();        }    }}
第七城市

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台