一 背景
我们的工业互联网解析服务,主要是负责解析遥测、遥信等采集数据为平台的物模型,其中每个点号都涉及到四五个redis缓存数据的查询,而我们的单个解析服务并发超过3000每秒左右,即每秒查redis的次数过万次,虽然redis撑得住,但是解析服务的网络IO非常高,导致CPU超高,而且压测时发现,并发基本已快达到瓶颈,即如果突发数据量暴增,会出现解析不过来的情况。
所以,就需要考虑提高解析并发能力,于是考虑到了增加本地缓存。
而本地缓存的方式有很多,可以直接用Java的常量,或者单例中的变量等都可以用来作为本地缓存,但是有个问题,无法自动过期,或者要主动去更新本地缓存。而且还有一个很大的问题,如果本地缓存的数据很多,会占用太多服务的JVM内存,并且不容易更新。
于是,考虑使用guava cache,这是google guava中的一个内存缓存模块,可以把常用的数据放到本地缓存中,并且有缓存时长,到期后会重新加载,可以不用主动去刷新数据,唯一的缺点是,物模型有调整,无法及时生效,需要在本地缓存失效后重新加载后才能生效。
二 进一步封装Guava Cache
依赖如下:1
com.google.guava:guava:16.0(版本可以替换)
由于在spring boot中直接使用Guava Cache不太友好,故进一步封装了Guava Cache,如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Data;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author mafgwo
* @since 2022/01/10
**/
public abstract class CustomLocalCache<K, V> {
private Cache<K, Optional<V>> cache;
public CustomLocalCache(long cacheSize, long cacheDuration) {
cache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(cacheDuration, TimeUnit.SECONDS)
.build();
}
/**
* 从数据库加载数据
*
* @param key
* @return
*/
private Optional<V> load(K key) {
return this.load(Collections.singletonList(key)).get(key);
}
/**
* 批量查询必须继承该方法
*
* @param keys
* @return
*/
private Map<K, Optional<V>> load(List<K> keys) {
if (CollectionUtils.isEmpty(keys)) {
return Collections.emptyMap();
}
Map<K, V> kvMap = this.batchLoad(keys);
return keys.stream().collect(Collectors.toMap(Function.identity(), it -> Optional.ofNullable(kvMap.get(it)), (v1, v2) -> v2));
}
/**
* 批量加载
* @param keys
* @return
*/
protected abstract Map<K, V> batchLoad(List<K> keys);
public V findByKey(K key) {
Optional<V> v = cache.getIfPresent(key);
if (v == null) {
v = this.load(key);
cache.put(key, v);
}
return v.orElse(null);
}
/**
* 该方法会返回乱序 如果不能接受乱序 子类覆盖重写
* @param keys
* @return
*/
public List<V> findByKeys(List<K> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<V> resultList = new ArrayList<>();
List<K> noCacheKeys = new ArrayList<>();
for (K key : keys) {
Optional<V> v = cache.getIfPresent(key);
if (v != null) {
v.ifPresent(resultList::add);
continue;
}
noCacheKeys.add(key);
}
if (!noCacheKeys.isEmpty()) {
Map<K, Optional<V>> loadMap = this.load(noCacheKeys);
cache.putAll(loadMap);
resultList.addAll(loadMap.values().stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()));
}
return resultList;
}
/**
* @param keys
* @return
*/
public Map<K, V> findMapByKeys(List<K> keys) {
if (CollectionUtils.isEmpty(keys)) {
return Collections.emptyMap();
}
Map<K, V> resultMap = new HashMap<>();
List<K> noCacheKeys = new ArrayList<>();
for (K key : keys) {
Optional<V> v = cache.getIfPresent(key);
if (v != null) {
v.ifPresent(it -> resultMap.put(key, it));
continue;
}
noCacheKeys.add(key);
}
if (!noCacheKeys.isEmpty()) {
Map<K, Optional<V>> loadMap = this.load(noCacheKeys);
cache.putAll(loadMap);
resultMap.putAll(loadMap.entrySet().stream().filter(it -> it.getValue().isPresent()).collect(Collectors.toMap(Map.Entry::getKey, it -> it.getValue().get())));
}
return resultMap;
}
}
使用方式如下(在service实现层增加一个CustomLocalCache变量,并在service实例化的时候,初始化缓存的加载方式):
1 | private CustomLocalCache<String, ProductModelAttrCacheVO> modelLocalCache; |
三 效果
增加四五个本地缓存之后,解析压力骤降,CPU直接下降到了没使用本地缓存的1/4的,效果相当显著。