基于jgroups分布式缓存实现redis服务的fallback处理

2018-02-07 10:15:32来源:https://www.iflym.com/index.php/code/201802060001.html作者:i flym人点击

分享
第七城市

现在的系统当中,实现缓存均是采用redis来完成,一个典型的场景即是将用户的会话信息存放于redis当中,以实现分布式缓存以及服务的高可用。当nginx在进行请求分发时,不需要采用ip hash,随机分发请求,后端直接读取中央缓存即可完成相应的业务处理。但这种场景同样存在一个问题,即是缓存的高可用问题。特别是相应的redis服务由外部提供时,这个问题更加明显。初期redis服务为一个单点时,当服务挂掉时,会导致整个服务将不再可用。


解决此问题有2个方案,一个是再使用同样的策略保证redis的高可用,如部署redis集群或者是sentinel模式,另外一种是直接放弃对redis的高依赖性,在应用层直接进行缓存的fallback支持。在本场景中,我们仅将redis作为一个缓存层使用,它的一些其它特性如发布订阅暂不考虑。


在本文中,我们使用了hystrix+caffeine+jgroups的方案,来完成整个redis的fallback化处理,通过hystrix实现对redis调用的断路及fallback处理,使用caffine提供高效本地缓存,使用jgroups完成多系统间的数据复制分发。 主要实现的目的在于,当redis挂掉之后,能够切换到本地分布式缓存,保证系统后续可用(不过之前存放在redis中的数据完全不再可见,会导致一部分的问题,如会话信息会丢失。可考虑仅将会话信息直接复制在本地缓存当中)


本方案适用于redis短时间由于网络或其它原因连接不上,但在相对短的时间内(如几分钟,不超过1个小时等)即会恢复的情况,这样本地缓存不会存储很多东西的情况.


hystrix失败重试及短路

这里要处理的问题包括如何检测访问redis失败以及什么时候触发本地fallback处理的情况,并且当确定redis已经挂掉的情况下,将不会再尝试连接redis以减少在调用上所花费的时间。 当然,也需要在某些情况下尝试访问redis以检测redis是否可正常工作.


这里使用了一个在微服务中常用的组件hystrix,以检测相应的调用和处理相应的短路问题。在本配置中,由于整个方案是基于对redis的fallback处理,并不局限于微服务之间的调用隔离以及频率限制,因此没有使用线程池作为隔离级别,而是采用信号量来实现调用频率的限制。相应的隔离设置如下参考所示:


//允许失败回调
.withFallbackEnabled(true)
//隔离策略
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
//最大并发数 500
.withExecutionIsolationSemaphoreMaxConcurrentRequests(500)
//失败回调 最大并发数 500
.withFallbackIsolationSemaphoreMaxConcurrentRequests(500)
//不需要调用超时
.withExecutionTimeoutEnabled(false);

可以理解为基本不限制调用,同时,因为使用信号量作为隔离策略,因此在相应的线程池上不需要为每个调用配置单独的线程池,这里将整个对redis的调用均设置为同一个命令组。


同时,因为是redis的访问可以认为是相对稳定的,当访问出现问题时,基本认为是redis不可用。因此在设置短路策略时,设置了一个相对较快的短路策略,以快速切换到本地缓存上来。这样避免因为在系统调用过程中,因为redis不可用时创建连接花费太多时间。短路的设置参考如下:


//开启短路
.withCircuitBreakerEnabled(true)
//10%就算有问题
.withCircuitBreakerErrorThresholdPercentage(10)
//过5秒重试一次
.withCircuitBreakerSleepWindowInMilliseconds(5_000)
//30秒计数,以30秒为1个统计单位
.withMetricsRollingStatisticalWindowInMilliseconds(30_000)
//最少5个请求(5个请求都连不上redis...),触发短路,否则认为请求数太小,不需要触发短路,这样保证能够有足够的尝试调用次数
.withCircuitBreakerRequestVolumeThreshold(5)

在上面的设置中,设置的触发短路的频率为10%,可以解决为当访问6次时,只要有1次访问不了,就会触发相应的短路。这样可以快速地进行fallback切换。


当触发了短路之后,后续的调用将直接失败或进行fallback,因此为了支持缓存切换,我们在command中实现了相应的fallback处理,这样,当调用redis失败或者触发短路时,即会直接调用fallback处理。并且这里的fallback调用是不会失败的。如一个参考的fallback即为如下代码:


localCache.get(realKey)

这样的调用是不会出错的。(正常情况下本地cache均不会出错…)


支持单对象ttl的缓存caffeine

当触发redis command的fallback之后,相应的处理即会落到本地cache中,这里的一个本地cache即相当于要支持redis缓存的大部分操作,如kv,list,map等。这里我们可以理解为实现一个基本的map结构即可,相应的数据结构为Map<String,Any>,后面的value类型根据业务中使用的类型进行调整。如kv结构就直接是简单的object类型,而如果是list结构,则v就可以是一个list结构。


同时,我们需要支持针对单个key进行ttl的设置和处理,因此redis中对于缓存数据是会设置ttl的,以避免数据长时间占用内存。在这里,我们需要寻找一个可以单独为每个key设置ttl的缓存。有几个参考的cache可以选择。如guava cache, concurrenthashmap,ehcache,以及新出的caffeine。


java自带的concurrentHashMap,不支持ttl.相对应的weakMap等也同样不支持.
ehcache的时间太长,并没有针对新版的java进行优化和调整,在性能测试中也不尽人意。
guava cache支持统一设置读写ttl,但针对单个对象分别设置ttl是不支持的,并且在github中,guava cache也没有进一步的支持计划。
caffeine提供的主要api也是统一设置读写ttl,但是可以参考定义一个Expiry来手动指定每个key的ttl信息。同时,根据相应的性能测试,好像caffeine更胜一层.(包括最新的spring cache也使用caffine来完成)

那么,这里针对于缓存的value类型,我们需要包括3个部分的内容,实际的值,对象类型,以及相应的ttl值。这里定义了一个简单的triple结构来表示(实际这里使用了一个tuple3来表示),整个结构如下参考所示:


public class Tuple3Holder<Object, CacheValueType, Long> {
public T1 t1; //实际内容
public T2 t2; //缓存类型
public T3 t3; //ttl值
}

同时,相应的缓存构建如下实现参考:


Caffeine.newBuilder()
.expireAfter(new Expiry<String, Tuple3Holder<Object, CacheValueType, Long>>() {
@Override
public long expireAfterCreate(@NotNull String key, @Nonnull Tuple3Holder<Object, CacheValueType, Long> tuple3, long currentTime) {
return tuple3.t3;
}
@Override
public long expireAfterUpdate(...) {
return currentDuration;
}
@Override
public long expireAfterRead(...) {
return currentDuration;
}
})
.build();

在对象创建时指定过期时间,同时在更新以及读时,相应的时间变化并不作调整.


网络集群及数据复制jgroups

当数据落到本地之后,对于集群部署的系统,必须将数据同步给其它系统,以完成相应的数据同步处理。当前能用的数据复制及网络同步组件并不多,包括像Hazelcast这种组件均是独立进行部署的,相当于另一个redis,对于当前的fallback处理来说太重。而如netty这种网络通讯组件又太原始,还需要自行进行开发。同时,还要实现类似于网络发现,自动组集群的功能,数据间对等传输等功能。经过一番google,当前可用的选择即jgroups.


具体jgroups的使用可自行google,考虑到实际的网络情况下,在当前情况下,这里选择使用tcp协议进行数据传输,tcpping进行节点发现,socket进行节点可用性检测。相应的整个协议栈如下参考所示:


JChannel channel = new JChannel(tcp, tcpping, fd_sock, nakack2, gms);
//丢掉自己的消息,即不需要自己发给自己
channel.setDiscardOwnMessages(true);
channel.connect("redis");

然后,参考jgroups自带的 ReplicatedHashMap 实例,将之前的cache和当前的jgroups一起,形成一个简单的map,以完成最终的本地缓存容器。相应的构建及访问如下参考所示:


public DistributedMap(Cache<K, V> cache, JChannel channel) {
super(cache.asMap(), channel);
this.cache = cache;
this.disp.setMarshaller(new FastjsonMarshaller());
}
@Override
public V put(K key, V value) {
V v = cache.asMap().put(key, value);
//可修改为异步处理
//catch相应的异常,避免数据复制影响业务
ExceptionUtils.doActionLogE(() -> {
MethodCall call = new MethodCall(PUT, key, value);
disp.callRemoteMethods(null, call, call_options);
});
return v;
}
redis恢复之后数据回填

在经过以上整个环节之后,一个fallback的缓存即可正常工作。但当redis恢复之后,之前存放于本地缓存的数据需要进行回填,因为本地cache始终是一个临时态,当redis可用时,将进行相应的恢复式处理,以避免出现缓存的丢失。


一个可以解决的方案,即是当redis探测式调用OK之后,即开启一个数据回填线程,将本地cache复制并清空之后,再获取锁之后,重新回填至redis。具体如何回填,则利用起之前的数据类型,根据不同的数据类型调用redis的不同数据结构,直接采用merge式处理即可。


上述需要考虑的问题,主要是以下几个:


回填线程只能由1个在执行,避免在集群环境下多个回填线程同时进行。这里可以引入一个分布式锁(如zk锁)
在回填过程中的数据访问,可以设置一个状态,当redis访问失败的情况下,再调用一次本地cache获取
回填之后的数据处理,当回填结束之后,需要一次性的清除本地cache,以避免出现数据污染问题。在过程中,可以采取cas操作,直接清除即可.
总结:

整个方案,略显粗糙,但可以解决当redis挂掉的一小段时间内,系统仍可工作,并且仍能提供相应的缓存能力。避免因缓存不可用导致系统不可用(无fallback),或直接无缓存能力的问题。可以理解为是一个处于中间态的一个方案。


不过,在整个过程中,仍有一些问题需要处理,如key的路由问题,避免同一个key两个节点同时处理带来的数据同步不一致问题;jgroups框架太旧以至于不能使用像netty这样的优秀网络框架;数据对等复制带来的同步浪费以及连接问题; 解决此问题的方案,是需要一个完全一致性的分布式缓存,如chord.


考虑到当前还没有一个完整的chord算法实现,因此下一个计划即是完成一个基本的dht,以提供这样的能力(基于开源chord简版实现,并完全支持最新的语言特性以及网络访问)。


第七城市

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台