新聞中心
- ?
?JSR107??定義了緩存使用規(guī)范,spring中提供了基于這個規(guī)范的接口,所以我們可以直接使用spring中的接口進行??Caffeine??和??Redis??兩級緩存的整合改造 - 在分布式環(huán)境下,如果一臺主機的本地緩存進行修改,需要通知其他主機修改本地緩存,解決分布式環(huán)境下本地緩存一致性問題
好了,在明確了需要的改進問題后,下面我們開始正式修改。

改造
在上篇文章的v3版本中,我們使用自定義注解的方式實現(xiàn)了兩級緩存通過一個注解管理的功能。本文我們換一種方式,直接通過擴展spring提供的接口來實現(xiàn)這個功能,在進行整合之前,我們需要簡單了解一下JSR107緩存規(guī)范。
JSR107 規(guī)范
在JSR107?緩存規(guī)范中定義了5個核心接口,分別是CachingProvider,CacheManager,Cache, Entry和Expiry?,參考下面這張圖,可以看到除了Entry和Expiry以外,從上到下都是一對多的包含關(guān)系。
從上面這張圖我們可以看出,一個應用可以創(chuàng)建并管理多個CachingProvider,同樣一個CachingProvider也可以管理多個CacheManager,緩存管理器CacheManager中則維護了多個Cache。
Cache是一個類似Map的數(shù)據(jù)結(jié)構(gòu),Entry就是其中存儲的每一個key-value數(shù)據(jù)對,并且每個Entry都有一個過期時間Expiry。而我們在使用spring集成第三方的緩存時,只需要實現(xiàn)Cache和CacheManager這兩個接口就可以了,下面分別具體來看一下。
Cache
spring中的Cache接口規(guī)范了緩存組件的定義,包含了緩存的各種操作,實現(xiàn)具體緩存操作的管理。例如我們熟悉的RedisCache、EhCacheCache等,都實現(xiàn)了這個接口。
在Cache接口中,定義了get、put、evict、clear等方法,分別對應緩存的存入、取出、刪除、清空操作。不過我們這里不直接使用Cache接口,上面這張圖中的AbstractValueAdaptingCache是一個抽象類,它已經(jīng)實現(xiàn)了Cache接口,是spring在Cache接口的基礎(chǔ)上幫助我們進行了一層封裝,所以我們直接繼承這個類就可以。
繼承AbstractValueAdaptingCache抽象類后,除了創(chuàng)建Cache的構(gòu)造方法外,還需要實現(xiàn)下面的幾個方法:
// 在緩存中實際執(zhí)行查找的操作,父類的get()方法會調(diào)用這個方法
protected abstract Object lookup(Object key);
// 通過key獲取緩存值,如果沒有找到,會調(diào)用valueLoader的call()方法
publicT get(Object key, Callable valueLoader);
// 將數(shù)據(jù)放入緩存中
public void put(Object key, Object value);
// 刪除緩存
public void evict(Object key);
// 清空緩存中所有數(shù)據(jù)
public void clear();
// 獲取緩存名稱,一般在CacheManager創(chuàng)建時指定
String getName();
// 獲取實際使用的緩存
Object getNativeCache();
因為要整合RedisTemplate和Caffeine的Cache,所以這些都需要在緩存的構(gòu)造方法中傳入,除此之外構(gòu)造方法中還需要再傳出緩存名稱cacheName,以及在配置文件中實際配置的一些緩存參數(shù)。先看一下構(gòu)造方法的實現(xiàn):
public class DoubleCache extends AbstractValueAdaptingCache {
private String cacheName;
private RedisTemplate
抽象父類的構(gòu)造方法中只有一個boolean類型的參數(shù)allowNullValues,表示是否允許緩存對象為null。除此之外,AbstractValueAdaptingCache中還定義了兩個包裝方法來配合這個參數(shù)進行使用,分別是toStoreValue和fromStoreValue,特殊用途是用于在緩存null對象時進行包裝、以及在獲取時進行解析并返回。
我們之后會在CacheManager中調(diào)用后面這個自己實現(xiàn)的構(gòu)造方法,來實例化Cache對象,參數(shù)中DoubleCacheConfig是使用@ConfigurationProperties讀取的yml配置文件封裝的數(shù)據(jù)對象,會在后面使用。
當一個方法添加了@Cacheable注解時,執(zhí)行時會先調(diào)用父類AbstractValueAdaptingCache中的get(key)方法,它會再調(diào)用我們自己實現(xiàn)的lookup方法。在實際執(zhí)行查找操作的lookup方法中,我們的邏輯仍然是先查找Caffeine、沒有找到時再查找Redis:
@Override
protected Object lookup(Object key) {
// 先從caffeine中查找
Object obj = caffeineCache.getIfPresent(key);
if (Objects.nonNull(obj)){
log.info("get data from caffeine");
return obj;
}
//再從redis中查找
String redisKey=this.name+":"+ key;
obj = redisTemplate.opsForValue().get(redisKey);
if (Objects.nonNull(obj)){
log.info("get data from redis");
caffeineCache.put(key,obj);
}
return obj;
}
如果lookup方法的返回結(jié)果不為null,那么就會直接返回結(jié)果給調(diào)用方。如果返回為null時,就會執(zhí)行原方法,執(zhí)行完成后調(diào)用put方法,將數(shù)據(jù)放入緩存中。接下來我們實現(xiàn)put方法:
@Override
public void put(Object key, Object value) {
if(!isAllowNullValues() && Objects.isNull(value)){
log.error("the value NULL will not be cached");
return;
}
//使用 toStoreValue(value) 包裝,解決caffeine不能存null的問題
caffeineCache.put(key,toStoreValue(value));
// null對象只存在caffeine中一份就夠了,不用存redis了
if (Objects.isNull(value))
return;
String redisKey=this.cacheName +":"+ key;
OptionalexpireOpt = Optional.ofNullable(doubleCacheConfig)
.map(DoubleCacheConfig::getRedisExpire);
if (expireOpt.isPresent()){
redisTemplate.opsForValue().set(redisKey,toStoreValue(value),
expireOpt.get(), TimeUnit.SECONDS);
}else{
redisTemplate.opsForValue().set(redisKey,toStoreValue(value));
}
}
上面我們對于是否允許緩存空對象進行了判斷,能夠緩存空對象的好處之一就是可以避免緩存穿透。需要注意的是,Caffeine中是不能直接緩存null的,因此可以使用父類提供的toStoreValue()方法,將它包裝成一個NullValue類型。在取出對象時,如果是NullValue,也不用我們自己再去調(diào)用fromStoreValue()將這個包裝類型還原,父類的get方法中已經(jīng)幫我們做好了。
另外,上面在put方法中緩存空對象時,只在Caffeine緩存中一份即可,可以不用在Redis中再存一份。
緩存的刪除方法evict()和清空方法clear()的實現(xiàn)就比較簡單了,直接刪除一跳或全部數(shù)據(jù)即可:
@Override
public void evict(Object key) {
redisTemplate.delete(this.cacheName +":"+ key);
caffeineCache.invalidate(key);
}
@Override
public void clear() {
Setkeys = redisTemplate.keys(this.cacheName.concat(":*"));
for (Object key : keys) {
redisTemplate.delete(String.valueOf(key));
}
caffeineCache.invalidateAll();
}
獲取緩存cacheName和實際緩存的方法實現(xiàn):
@Override
public String getName() {
return this.cacheName;
}
@Override
public Object getNativeCache() {
return this;
}
最后,我們再來看一下帶有兩個參數(shù)的get方法,為什么把這個方法放到最后來說呢,因為如果我們只是使用注解來管理緩存的話,那么這個方法不會被調(diào)用到,簡單看一下實現(xiàn):
@Override
publicT get(Object key, Callable valueLoader) {
ReentrantLock lock=new ReentrantLock();
try{
lock.lock();//加鎖
Object obj = lookup(key);
if (Objects.nonNull(obj)){
return (T)obj;
}
//沒有找到
obj = valueLoader.call();
put(key,obj);//放入緩存
return (T)obj;
}catch (Exception e){
log.error(e.getMessage());
}finally {
lock.unlock();
}
return null;
}
方法的實現(xiàn)比較容易理解,還是先調(diào)用lookup方法尋找是否已經(jīng)緩存了對象,如果沒有找到那么就調(diào)用Callable中的call方法進行獲取,并在獲取完成后存入到緩存中去。至于這個方法如何使用,具體代碼我們放在后面使用這一塊再看。
需要注意的是,這個方法的接口注釋中強調(diào)了需要我們自己來保證方法同步,因此這里使用了ReentrantLock進行了加鎖操作。到這里,Cache的實現(xiàn)就完成了,下面我們接著看另一個重要的接口CacheManager。
CacheManager
從名字就可以看出,CacheManager是一個緩存管理器,它可以被用來管理一組Cache。在上一篇文章的v2版本中,我們使用的CaffeineCacheManager就實現(xiàn)了這個接口,除此之外還有RedisCacheManager、EhCacheCacheManager等也都是通過這個接口實現(xiàn)。
下面我們要自定義一個類實現(xiàn)CacheManager接口,管理上面實現(xiàn)的DoubleCache作為spring中的緩存使用。接口中需要實現(xiàn)的方法只有下面兩個:
//根據(jù)cacheName獲取Cache實例,不存在時進行創(chuàng)建
Cache getCache(String name);
//返回管理的所有cacheName
CollectiongetCacheNames();
在自定義的緩存管理器中,我們要使用ConcurrentHashMap維護一組不同的Cache,再定義一個構(gòu)造方法,在參數(shù)中傳入已經(jīng)在spring中配置好的RedisTemplate,以及相關(guān)的緩存配置參數(shù):
public class DoubleCacheManager implements CacheManager {
Map cacheMap = new ConcurrentHashMap<>();
private RedisTemplate redisTemplate;
private DoubleCacheConfig dcConfig;
public DoubleCacheManager(RedisTemplate redisTemplate,
DoubleCacheConfig doubleCacheConfig) {
this.redisTemplate = redisTemplate;
this.dcConfig = doubleCacheConfig;
}
//...
}
然后實現(xiàn)getCache方法,邏輯很簡單,先根據(jù)name從Map中查找對應的Cache,如果找到則直接返回,這個參數(shù)name就是上一篇文章中提到的cacheName,CacheManager根據(jù)它實現(xiàn)不同Cache的隔離。
如果沒有根據(jù)名稱找到緩存的話,那么新建一個DoubleCache對象,并放入Map中。這里使用的ConcurrentHashMap的putIfAbsent()方法放入,避免重復創(chuàng)建Cache以及造成Cache內(nèi)數(shù)據(jù)的丟失。具體代碼如下:
@Override
public Cache getCache(String name) {
Cache cache = cacheMap.get(name);
if (Objects.nonNull(cache)) {
return cache;
}
cache = new DoubleCache(name, redisTemplate, createCaffeineCache(), dcConfig);
Cache oldCache = cacheMap.putIfAbsent(name, cache);
return oldCache == null ? cache : oldCache;
}
在上面創(chuàng)建DoubleCache對象的過程中,需要先創(chuàng)建一個Caffeine的Cache對象作為參數(shù)傳入,這一過程主要是根據(jù)實際項目的配置文件中的具體參數(shù)進行初始化,代碼如下:
private com.github.benmanes.caffeine.cache.Cache createCaffeineCache(){
Caffeine caffeineBuilder = Caffeine.newBuilder();
Optional dcConfigOpt = Optional.ofNullable(this.dcConfig);
dcConfigOpt.map(DoubleCacheConfig::getInit)
.ifPresent(init->caffeineBuilder.initialCapacity(init));
dcConfigOpt.map(DoubleCacheConfig::getMax)
.ifPresent(max->caffeineBuilder.maximumSize(max));
dcConfigOpt.map(DoubleCacheConfig::getExpireAfterWrite)
.ifPresent(eaw->caffeineBuilder.expireAfterWrite(eaw,TimeUnit.SECONDS));
dcConfigOpt.map(DoubleCacheConfig::getExpireAfterAccess)
.ifPresent(eaa->caffeineBuilder.expireAfterAccess(eaa,TimeUnit.SECONDS));
dcConfigOpt.map(DoubleCacheConfig::getRefreshAfterWrite)
.ifPresent(raw->caffeineBuilder.refreshAfterWrite(raw,TimeUnit.SECONDS));
return caffeineBuilder.build();
}
getCacheNames方法很簡單,直接返回Map的keySet就可以了,代碼如下:
@Override
public CollectiongetCacheNames() {
return cacheMap.keySet();
}
配置&使用
在application.yml文件中配置緩存的參數(shù),代碼中使用@ConfigurationProperties接收到DoubleCacheConfig類中:
doublecache:
allowNull: true
init: 128
max: 1024
expireAfterWrite: 30 #Caffeine過期時間
redisExpire: 60 #Redis緩存過期時間
配置自定義的DoubleCacheManager作為默認的緩存管理器:
@Configuration
public class CacheConfig {
@Autowired
DoubleCacheConfig doubleCacheConfig;
@Bean
public DoubleCacheManager cacheManager(RedisTemplateredisTemplate,
DoubleCacheConfig doubleCacheConfig){
return new DoubleCacheManager(redisTemplate,doubleCacheConfig);
}
}
Service中的代碼還是老樣子,不需要在代碼中手動操作緩存,只要直接在方法上使用@Cache相關(guān)注解即可:
@Service @Slf4j
@AllArgsConstructor
public class OrderServiceImpl implements OrderService {
private final OrderMapper orderMapper;
@Cacheable(value = "order",key = "#id")
public Order getOrderById(Long id) {
Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper()
.eq(Order::getId, id));
return myOrder;
}
@CachePut(cacheNames = "order",key = "#order.id")
public Order updateOrder(Order order) {
orderMapper.updateById(order);
return order;
}
@CacheEvict(cacheNames = "order",key = "#id")
public void deleteOrder(Long id) {
orderMapper.deleteById(id);
}
//沒有注解,使用get(key,callable)方法
public Order getOrderById2(Long id) {
DoubleCacheManager cacheManager = SpringContextUtil.getBean(DoubleCacheManager.class);
Cache cache = cacheManager.getCache("order");
Order order =(Order) cache.get(id, (Callable) () -> {
log.info("get data from database");
Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper()
.eq(Order::getId, id));
return myOrder;
});
return order;
}
}
注意最后這個沒有添加任何注解的方法,只有以這種方式調(diào)用時才會執(zhí)行我們在DoubleCache中自己實現(xiàn)的get(key,callable)方法。到這里,基于JSR107規(guī)范和spring接口的兩級緩存改造就完成了,下面我們看一下遺漏的第二個問題。
分布式環(huán)境改造
前面我們說了,在分布式環(huán)境下,可能會存在各個主機上一級緩存不一致的問題。當一臺主機修改了本地緩存后,其他主機是沒有感知的,仍然保持了之前的緩存,那么這種情況下就可能取到臟數(shù)據(jù)。既然我們在項目中已經(jīng)使用了Redis,那么就可以使用它的發(fā)布/訂閱功能來使各個節(jié)點的緩存進行同步。
定義消息體
在使用Redis發(fā)送消息前,需要先定義一個消息對象。其中的數(shù)據(jù)包括消息要作用于的Cache名稱、操作類型、數(shù)據(jù)以及發(fā)出消息的源主機標識:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CacheMassage implements Serializable {
private static final long serialVersionUID = -3574997636829868400L;
private String cacheName;
private CacheMsgType type; //標識更新或刪除操作
private Object key;
private Object value;
private String msgSource; //源主機標識,用來避免重復操作
}
定義一個枚舉來標識消息的類型,是要進行更新還是刪除操作:
public enum CacheMsgType {
UPDATE,
DELETE;
}
消息體中的msgSource是添加的一個消息源主機的標識,添加這個是為了避免收到當前主機發(fā)送的消息后,再進行重復操作,也就是說收到本機發(fā)出的消息直接丟掉什么都不做就可以了。源主機標識這里使用的是主機ip加項目端口的方式,獲取方法如下:
public static String getMsgSource() throws UnknownHostException {
String host = InetAddress.getLocalHost().getHostAddress();
Environment env = SpringContextUtil.getBean(Environment.class);
String port = env.getProperty("server.port");
return host+":"+port;
}
這樣消息體的定義就完成了,之后只要調(diào)用redisTemplate的convertAndSend方法就可以把這個對象發(fā)布到指定的主題上了。
Redis消息配置
要使用Redis的消息監(jiān)聽功能,需要配置兩項內(nèi)容:
- MessageListenerAdapter:消息監(jiān)聽適配器,可以在其中指定自定義的監(jiān)聽代理類,并且可以自定義使用哪個方法處理監(jiān)聽邏輯
- RedisMessageListenerContainer:一個可以為消息監(jiān)聽器提供異步行為的容器,并且提供消息轉(zhuǎn)換和分派等底層功能
@Configuration
public class MessageConfig {
public static final String TOPIC="cache.msg";
@Bean
RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter,
RedisConnectionFactory redisConnectionFactory){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(listenerAdapter, new PatternTop
分享標題:阿里二面:怎么實現(xiàn)兩級緩存
轉(zhuǎn)載來源:http://fisionsoft.com.cn/article/cdipohe.html


咨詢
建站咨詢
