java并发——构建高效且可伸缩的结果缓存

2017-04-10 08:15:27来源:CSDN作者:ly908979139人点击

几乎所有的服务器应用都会使用某种形式的缓存。重用之前的计算结果能降低延迟,提高吞吐量,但却要消耗更多内存。看上去简单的缓存,可能会将性能瓶颈转变成伸缩性瓶颈,即使缓存是用来提高单线程性能的。本文将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的计算,我们会从HashMap开始,逐步完善功能,分析它们的并发问题,并讨论如何修改它们。

下面基于一个计算任务开始缓存的设计

public interface Computable <A, R>{    R compute(A a) throws InterruptedException;}public class Function implements Computable <String, BigInteger>{    @Override    public BigInteger compute(String a) throws InterruptedException {        return new BigInteger(a);    }}

第一阶段 HashMap

public class Memorizer1<A, V> implements Computable<A, V>{    private final Computable<A, V> compute;    private final Map<A, V> cache;    public Memorizer1(Computable<A, V> compute){        this.compute = compute;        cache = new HashMap<A, V>();    }    @Override    public synchronized V compute(A a) throws InterruptedException {        V result = cache.get(a);        if(result == null){            result = compute.compute(a);            cache.put(a, result);        }        return result;    }}

如上所示,Memorizer1将Computable实现类的计算结果缓存在Map<A, V> cache。因为HashMap不是线程安全的,为了保证并发性,Memorizer1用了个很保守的方法,对整个compute方法进行同步。这导致了Memorizer1会有很明显的可伸缩性问题,当有很多线程调用compute方法,将排一列很长的队,考虑到这么多线程的阻塞,线程状态切换,内存占用,这种方式甚至不如不使用缓存。

第二阶段 ConcurrentHashMap

public class Memorizer2<A, V> implements Computable<A, V>{    private final Computable<A, V> compute;    private final Map<A, V> cache;    public Memorizer2(Computable<A, V> compute){        this.compute = compute;        cache = new ConcurrentHashMap<A, V>();    }    @Override    public V compute(A a) throws InterruptedException {        V result = cache.get(a);        if(result == null){            result = compute.compute(a);            cache.put(a, result);        }        return result;    }}

Memorizer2比Memorizer1拥有更好的并发性,并且具有良好的伸缩性。但它仍然有一些不足——当两个线程同时计算同一个值,它们并不知道有其它线程在做同一的事,存在着资源被浪费的可能。这个不足,对于缓存的对象只提供单次初始化,会带来安全性问题。

第三阶段 ConcurrentHashMap+FutureTask
事实上,第二阶段的功能已经符合大部分情况的功能,但是当计算时间很长导致很多线程进行同一个运算,或者缓存的对象只提供单次初始化,问题就会很棘手,在这里,我们引入FutureTask来让进行运算的线程获知是否已经有其它正在,或已经进行该运算的线程。

public class Memorizer3<A, V> implements Computable<A, V>{    private final Computable<A, V> compute;    private final Map<A, FutureTask<V>> cache;    public Memorizer3(Computable<A, V> compute){        this.compute = compute;        cache = new ConcurrentHashMap<A, FutureTask<V>>();    }    @Override    public V compute(A a) throws InterruptedException {        V f = cache.get(a);        if(f == null){            Callable<V> eval = new Callable<V>(){                public V call() throw InterruptedException{                    return c.compute(arg);                }            }            FutureTask<V> ft = new FutureTask<V>(eval);            f = ft;            cache.put(a, ft);            ft.run();        }        try{            return f.get();        }cache(ExecutionException e){            throw launderThrowable(e.getCause());        }    }}

Memorizer3缓存的不是计算的结果,而是进行运算的FutureTask。因此Memorizer3首先检查有没有执行该任务的FutureTask。如果有,则直接获得FutureTask,如果计算已经完成,FutureTask.get()方法可以立刻获得结果,如果计算未完成,后进入的线程阻塞直到get()返回结果;如果没有,则创建一个FutureTask进行运算,后续进了的同样的运算可以直接拿到结果或者等待运算完成获得结果。
Memorizer3的实现近乎完美,但是仍然存在一个问题,当A线程判断没有缓存是,进入到cache.put(a, ft);这一步前,B线程恰好判断缓存为空,B线程创建的FutureTask会把A创建的FutureTask覆盖掉。虽然这相比Memorizer2已经是小概率事件,但是问题还是没根本解决。

第四阶段 ConcurrentHashMap + FutureTask + Map原子操作

第三阶段的ConcurrentHashMap + FutureTask由于存在“先检查再执行“的操作,会有并发问题,我们给cache使用复合操作(“若没有则添加“),避免该问题。public class Memorizer4<A, V> implements Computable<A, V>{    private final Computable<A, V> compute;    private final Map<A, FutureTask<V>> cache;    public Memorizer4(Computable<A, V> compute){        this.compute = compute;        cache = new ConcurrentHashMap<A, FutureTask<V>>();    }    @Override    public V compute(A a) throws InterruptedException {        while(true){            V f = cache.get(a);            if(f == null){                Callable<V> eval = new Callable<V>(){                    public V call() throw InterruptedException{                        return c.compute(arg);                    }                }                FutureTask<V> ft = new FutureTask<V>(eval);                f = cache.putIfAbsent(a, ft);                if(f == null){                    f = ft;                    ft.run();                }            }            try{                return f.get();            }catch(CancellationException e){                cache.remove(arg, f);            }catch(ExecutionException e){                throw launderThrowable(e.getCause());            }        }    }}

Memorizer4做了两点改进:
1. 插入时会再次检查是否有缓存,并且这是个复合操作

f = cache.putIfAbsent(a, ft);if(f == null){    f = ft;    ft.run();}
  1. 这里考虑到了一种情况,如果正在运行的FutureTask被终止,那进行该运算的所有请求都会出问题,始料未及的遭遇CancellationException异常。Memorizer4的compute操作是一个循环,当在get()阻塞的线程catch到CancellationException异常,则会再一次申请一个创建FutureTask的机会。

至此,整个设计过程就结束了。我们得到了一个在极端环境下依然能够保证高效且可伸缩运行的结果缓存。

微信扫一扫

第七城市微信公众平台