這篇文章將為大家詳細講解有關如何在Java中使用redisson實現(xiàn)一個分布式鎖,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
成都創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設、網(wǎng)站重做改版、雷山網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、H5技術、商城網(wǎng)站建設、集團公司官網(wǎng)建設、外貿(mào)營銷網(wǎng)站建設、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為雷山等各大城市提供網(wǎng)站開發(fā)制作服務。
1. 基本用法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency>
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("anyLock");
lock.lock();
try {
...
} finally {
lock.unlock();
}針對上面這段代碼,重點看一下Redisson是如何基于Redis實現(xiàn)分布式鎖的
Redisson中提供的加鎖的方法有很多,但大致類似,此處只看lock()方法
更多請參見https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
2. 加鎖


可以看到,調(diào)用getLock()方法后實際返回一個RedissonLock對象,在RedissonLock對象的lock()方法主要調(diào)用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個方法才是關鍵
首先,看一下evalWriteAsync方法的定義
復制代碼 代碼如下:
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
最后兩個參數(shù)分別是keys和params
實際調(diào)用是這樣的:

單獨將調(diào)用的那一段摘出來看
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));結(jié)合上面的參數(shù)聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設前面獲取鎖時傳的name是“abc”,假設調(diào)用的線程ID是Thread-1,假設成員變量UUID類型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,這段腳本的意思是
1、判斷有沒有一個叫“abc”的key
2、如果沒有,則在其下設置一個字段為“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值為“1”的鍵值對 ,并設置它的過期時間
3、如果存在,則進一步判斷“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,則其值加1,并重新設置過期時間
4、返回“abc”的生存時間(毫秒)
這里用的數(shù)據(jù)結(jié)構是hash,hash的結(jié)構是: key 字段1 值1 字段2 值2 。。。
用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當前獲得鎖的線程
所有競爭這把鎖的線程都要判斷在這個key下有沒有自己線程的字段,如果沒有則不能獲得鎖,如果有,則相當于重入,字段值加1(次數(shù))
3. 解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}我們還是假設name=abc,假設線程ID是Thread-1
同理,我們可以知道
KEYS[1]是getName(),即KEYS[1]=abc
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時間
ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,上面腳本的意思是:
1、判斷是否存在一個叫“abc”的key
2、如果不存在,向Channel中廣播一條消息,廣播的內(nèi)容是0,并返回1
3、如果存在,進一步判斷字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在
4、若字段不存在,返回空,若字段存在,則字段值減1
5、若減完以后,字段值仍大于0,則返回0
6、減完后,若字段值小于或等于0,則廣播一條消息,廣播內(nèi)容是0,并返回1;
可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的線程現(xiàn)在可以獲得鎖了
4. 等待
以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?
再回到前面獲取鎖的位置
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 訂閱
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
protected static final LockPubSub PUBSUB = new LockPubSub();
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}這里會訂閱Channel,當資源可用時可以及時知道,并搶占,防止無效的輪詢而浪費資源
當資源可用用的時候,循環(huán)去嘗試獲取鎖,由于多個線程同時去競爭資源,所以這里用了信號量,對于同一個資源只允許一個線程獲得鎖,其它的線程阻塞
5. 小結(jié)


關于如何在Java中使用Redisson實現(xiàn)一個分布式鎖就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
本文名稱:如何在Java中使用Redisson實現(xiàn)一個分布式鎖
URL地址:http://www.chinadenli.net/article14/ihogde.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供響應式網(wǎng)站、網(wǎng)站制作、移動網(wǎng)站建設、軟件開發(fā)、網(wǎng)站營銷、標簽優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)