新聞中心
環(huán)境:SpringBoot2.7.12

本篇文章將會為大家介紹有關(guān)spring integration提供的分布式鎖功能。
1. 簡介
Spring Integration 是一個框架,用于構(gòu)建事件驅(qū)動的應用程序。在 Spring Integration 中,LockRegistry 是一個接口,用于管理分布式鎖。分布式鎖是一種同步機制,用于確保在分布式系統(tǒng)中的多個節(jié)點之間對共享資源的互斥訪問。
LockRegistry及相關(guān)子接口(如:RenewableLockRegistry) 接口的主要功能:
- 獲取鎖:當應用程序需要訪問共享資源時,它可以通過 LockRegistry 獲取一個鎖。
- 釋放鎖:當應用程序完成對共享資源的訪問后,它應該釋放鎖,以便其他應用程序可以獲取它(第一點中提到,并沒有提供直接釋放鎖的操作,而是內(nèi)部自動完成)。
- 續(xù)期:提供續(xù)期機制,以便在需要時延長鎖的持有時間。
常見的 LockRegistry 實現(xiàn)包括基于數(shù)據(jù)庫、ZooKeeper 和 Redis 的實現(xiàn)。
公共依賴
org.springframework.boot
spring-boot-starter-integration
2. 基于數(shù)據(jù)庫分布式鎖
引入依賴
org.springframework.integration
spring-integration-jdbc
com.zaxxer
HikariCP
compile
mysql
mysql-connector-java
8.0.30
配置
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: xxxooo
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
---
spring:
integration:
jdbc:
initialize-schema: always
# 基于數(shù)據(jù)庫需要執(zhí)行初始化腳本
schema: classpath:schema-mysql.sql
注冊核心Bean對象
@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource) {
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
// 這里根據(jù)你的業(yè)務需要,配置表前綴,默認:IN_
lockRepository.setPrefix("T_") ;
return lockRepository ;
}
// 注冊基于數(shù)據(jù)庫的分布式鎖
@Bean
public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {
return new JdbcLockRegistry(lockRepository) ;
}
測試用例
@Test
public void testLock() throws Exception
int len = 10 ;
CountDownLatch cdl = new CountDownLatch(len) ;
CountDownLatch waiter = new CountDownLatch(len) ;
Thread[] ts = new Thread[len] ;
for (int i = 0; i < len; i++) {
ts[i] = new Thread(() -> {
waiter.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 準備獲取鎖") ;
try {
waiter.await() ;
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 獲取鎖
Lock lock = registry.obtain("drug_store_key_001") ;
lock.lock() ;
System.out.println(Thread.currentThread().getName() + " - 獲取鎖成功") ;
try {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
// 釋放鎖
lock.unlock() ;
cdl.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 鎖釋放成功") ;
}
}, "T - " + i) ;
}
for (int i = 0; i < len; i++) {
ts[i].start() ;
}
cdl.await() ;
}
數(shù)據(jù)庫
圖片
鎖的實現(xiàn)JdbcLock,該對象實現(xiàn)了java.util.concurrent.locks.Lock,所以該鎖是支持重入等操作的。
配置鎖獲取失敗后的重試間隔,默認值100ms
JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);
// 定義鎖對象時設置當獲取鎖失敗后重試間隔時間。
jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;
鎖續(xù)期
jdbcLockRegistry.renewLock("drug_store_key_001");
3. 基于Redis分布式鎖
引入依賴
org.springframework.integration
spring-integration-jdbc
org.springframework.boot
spring-boot-starter-data-redis
配置
spring:
redis:
host: localhost
port: 6379
password: xxxooo
database: 8
lettuce:
pool:
maxActive: 8
maxIdle: 100
minIdle: 10
maxWait: -1
測試用例
測試代碼與上面基于JDBC的一樣,只需要修改調(diào)用加鎖的代碼即可
Lock lock = redisLockRegistry.obtain("001") ;設置鎖的有效期,默認是60s
// 第三個參數(shù)設置了key的有效期,這里改成10s
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;注意:redis key的有效期設置為10s,如果你的業(yè)務執(zhí)行超過了10s,那么程序?qū)箦e。并沒有redission watch dog機制。
Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)
at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)
at java.lang.Thread.run(Thread.java:748)如果10s過期后key自動刪除后,其它線程是否能立馬獲取到鎖呢?如果是單節(jié)點中其它現(xiàn)在也不能獲取鎖,必須等上一個線程結(jié)束后才可以,這是因為在內(nèi)部還維護了一個ReentrantLock鎖,在獲取分布式鎖前要先獲取本地的一個鎖。
private abstract class RedisLock implements Lock {
private final ReentrantLock localLock = new ReentrantLock();
public final void lock() {
this.localLock.lock();
while (true) {
try {
if (tryRedisLock(-1L)) {
return;
}
} catch (InterruptedException e) {
} catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
}注意:不管是基于數(shù)據(jù)庫還是Redis都要先獲取本地的鎖
Spring Cloud Task就使用到了Spring Integration中的鎖基于數(shù)據(jù)庫的。
總結(jié):Spring Integration 的分布式鎖為開發(fā)者提供了一種在分布式系統(tǒng)中實現(xiàn)可靠同步的有效方法。通過合理選擇和使用這些鎖實現(xiàn),可以確保對共享資源的訪問在多個節(jié)點之間保持協(xié)調(diào)一致,從而提高系統(tǒng)的整體可靠性和性能。
完畢?。。?/p>
分享名稱:Spring自帶分布式鎖你用過嗎?
本文來源:http://fisionsoft.com.cn/article/djcides.html


咨詢
建站咨詢
