在当今互联网应用面临海量用户请求的背景下,实现高并发处理成为了关键挑战。分布式锁作为一种重要的同步机制,在保证数据一致性和避免竞态条件方面发挥着至关重要的作用。本文将深入探讨如何使用C#结合Redis实现分布式锁,为应对千万级并发场景提供多种解决方案,并分享在实践过程中那些令人难忘的踩坑经历。
为什么需要分布式锁
随着业务规模的不断扩大,应用程序往往采用分布式架构来提高系统的可扩展性和性能。在分布式环境中,多个进程或服务可能同时访问共享资源,如数据库、文件系统等。如果没有有效的同步机制,就会出现竞态条件,导致数据不一致等问题。分布式锁正是为了解决这类问题而诞生的,它通过在分布式系统中创建一个全局唯一的锁,确保同一时间只有一个进程能够访问共享资源。
Redis作为分布式锁的优势
Redis是一个高性能的内存数据库,它具备以下特点,使其成为实现分布式锁的理想选择:
原子操作:Redis支持一系列原子操作,如 SETNX(即SET if Not eXists),可以在不依赖其他事务机制的情况下实现锁的原子性操作。这意味着在多个客户端同时尝试获取锁时,只有一个客户端能够成功执行SETNX操作,从而获得锁。高可用性:Redis可以通过主从复制和集群模式实现高可用性,确保在部分节点故障的情况下,分布式锁服务依然能够正常运行。 性能卓越:由于Redis数据存储在内存中,读写速度极快,能够满足高并发场景下对锁操作的性能要求。
用C#实现分布式锁的9种姿势
姿势一:基本的SETNX实现
using StackExchange.Redis;
public class BasicDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
public BasicDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public bool TryAcquireLock()
{
var db = _redis.GetDatabase();
return db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
}
public void ReleaseLock()
{
var db = _redis.GetDatabase();
db.KeyDelete(_lockKey);
}
}
在这种实现中,我们使用StringSet方法并结合When.NotExists参数来模拟SETNX操作获取锁。获取锁时,为锁设置一个唯一的值(通常是一个GUID)和过期时间。释放锁时,直接删除锁对应的键。然而,这种方法存在一个明显的问题,即如果持有锁的进程在锁过期前崩溃,而其他进程在锁过期后获取到了锁,此时原进程恢复并尝试释放锁,就会误删其他进程的锁。
姿势二:基于Lua脚本的原子释放
为了解决上述误删锁的问题,我们可以利用Lua脚本在Redis中实现原子操作。Lua脚本可以确保一系列Redis命令作为一个原子操作执行,避免在执行过程中被其他命令打断。
using StackExchange.Redis;
public class LuaBasedDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
private static readonly LuaScript _releaseScript;
static LuaBasedDistributedLock()
{
_releaseScript = LuaScript.Prepare(@"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end");
}
public LuaBasedDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public bool TryAcquireLock()
{
var db = _redis.GetDatabase();
return db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
}
public void ReleaseLock()
{
var db = _redis.GetDatabase();
db.ScriptEvaluate(_releaseScript, new RedisKey[] { _lockKey }, new RedisValue[] { _lockValue });
}
}
在这个实现中,我们定义了一个Lua脚本_releaseScript,在释放锁时,只有当锁的值与当前持有锁的进程设置的值相同时,才会执行删除操作,从而避免了误删其他进程锁的情况。
姿势三:RedLock算法
RedLock算法是一种更高级的分布式锁实现方式,它通过在多个Redis实例上获取锁来提高锁的可靠性。假设我们有N个Redis实例(通常N为奇数),客户端需要在超过N/2个实例上成功获取锁,才能认为获取到了分布式锁。
using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
public class RedLock
{
private readonly List _redisInstances;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
private const int Quorum = 3; // 假设使用3个Redis实例,需获取超过半数的锁
public RedLock(List redisInstances, string lockKey)
{
_redisInstances = redisInstances;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public async Taskbool> TryAcquireLock()
{
var tasks = _redisInstances.Select(redis =>
{
var db = redis.GetDatabase();
return db.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
});
var results = await Task.WhenAll(tasks);
return results.Count(result => result) > Quorum / 2;
}
public async Task ReleaseLock()
{
var tasks = _redisInstances.Select(redis =>
{
var db = redis.GetDatabase();
return db.KeyDeleteAsync(_lockKey);
});
await Task.WhenAll(tasks);
}
}
RedLock算法虽然提高了锁的可靠性,但实现相对复杂,且在性能上可能会有所损失,因为需要与多个Redis实例进行交互。
姿势四:带续租功能的锁
在高并发场景下,锁的持有时间可能需要动态调整。如果锁在持有过程中过期,而业务逻辑尚未完成,就会导致并发问题。为了解决这个问题,我们可以实现一个带续租功能的分布式锁。
using StackExchange.Redis;
using System;
using System.Threading;
using System.Threading.Tasks;
public class RenewalDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
private const int RenewalInterval = 3000; // 3秒续租一次
private CancellationTokenSource _cancellationTokenSource;
public RenewalDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public async Taskbool> TryAcquireLock()
{
var db = _redis.GetDatabase();
var acquired = await db.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
if (acquired)
{
_cancellationTokenSource = new CancellationTokenSource();
Task.Run(() => RenewLockAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
}
return acquired;
}
private async Task RenewLockAsync(CancellationToken cancellationToken)
{
var db = _redis.GetDatabase();
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(RenewalInterval, cancellationToken);
var currentValue = await db.StringGetAsync(_lockKey);
if (currentValue == _lockValue)
{
await db.KeyExpireAsync(_lockKey, TimeSpan.FromSeconds(LockTimeout));
}
}
}
public void ReleaseLock()
{
_cancellationTokenSource?.Cancel();
var db = _redis.GetDatabase();
db.KeyDelete(_lockKey);
}
}
在这个实现中,当获取到锁后,启动一个后台任务,每隔一段时间(RenewalInterval)检查锁是否仍然由当前进程持有,如果是,则延长锁的过期时间。
姿势五:公平锁实现
在一些场景下,需要保证锁的获取顺序,即先请求锁的进程先获取到锁,这就需要实现公平锁。在Redis中,可以通过使用有序集合(Sorted Set)来实现公平锁。
using StackExchange.Redis;
using System;
using System.Threading.Tasks;
public class FairDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _clientId;
private const int LockTimeout = 10000; // 10秒
public FairDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_clientId = Guid.NewGuid().ToString();
}
public async Taskbool> TryAcquireLock()
{
var db = _redis.GetDatabase();
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var added = await db.SortedSetAddAsync(_lockKey, _clientId, timestamp);
if (added)
{
var rank = await db.SortedSetRankAsync(_lockKey, _clientId);
return rank == 0;
}
return false;
}
public void ReleaseLock()
{
var db = _redis.GetDatabase();
db.SortedSetRemove(_lockKey, _clientId);
}
}
在公平锁的实现中,每个客户端在尝试获取锁时,将自己的唯一标识(_clientId)和当前时间戳作为有序集合的成员添加到Redis中。只有当客户端添加的成员在有序集合中的排名为0时,才表示该客户端获取到了锁,从而保证了锁获取的公平性。
姿势六:可重入锁实现
可重入锁允许同一个线程多次获取同一把锁,而不会造成死锁。在Redis中实现可重入锁,需要在锁的实现中记录锁的持有线程信息。
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Threading;
public class ReentrantDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
private static readonly Dictionarystring, int> _reentrancyCount = new Dictionarystring, int>();
private static readonly AsyncLocalstring> _currentLockHolder = new AsyncLocalstring>();
public ReentrantDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public bool TryAcquireLock()
{
var db = _redis.GetDatabase();
var currentHolder = _currentLockHolder.Value;
if (currentHolder == _lockValue)
{
if (!_reentrancyCount.ContainsKey(_lockValue))
{
_reentrancyCount.Add(_lockValue, 1);
}
else
{
_reentrancyCount[_lockValue]++;
}
return true;
}
var acquired = db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
if (acquired)
{
_currentLockHolder.Value = _lockValue;
_reentrancyCount.Add(_lockValue, 1);
}
return acquired;
}
public void ReleaseLock()
{
var db = _redis.GetDatabase();
var currentHolder = _currentLockHolder.Value;
if (currentHolder == _lockValue)
{
_reentrancyCount[_lockValue]--;
if (_reentrancyCount[_lockValue] == 0)
{
_reentrancyCount.Remove(_lockValue);
_currentLockHolder.Value = null;
db.KeyDelete(_lockKey);
}
}
}
}
在可重入锁的实现中,我们使用一个Dictionary来记录每个锁持有者的重入次数,同时使用AsyncLocal来存储当前持有锁的线程对应的锁值。当同一个线程再次获取锁时,增加重入次数而不是重新获取锁,释放锁时则相应减少重入次数,当重入次数为0时才真正删除锁。
姿势七:基于Redisson的分布式锁
Redisson是一个在Redis基础上实现的Java驻内存数据网格(In-Memory Data Grid),它提供了丰富的分布式服务,包括分布式锁。虽然它是基于Java的,但通过一些跨语言的通信方式,C#应用也可以使用Redisson的分布式锁功能。
// 假设通过gRPC等方式与Redisson服务通信
// 这里仅为概念性示例,实际实现需根据具体通信方式调整
public class RedissonBasedDistributedLock
{
private readonly RedissonClient _redissonClient;
private readonly string _lockKey;
public RedissonBasedDistributedLock(RedissonClient redissonClient, string lockKey)
{
_redissonClient = redissonClient;
_lockKey = lockKey;
}
public async Taskbool> TryAcquireLock()
{
var lockObject = _redissonClient.GetLock(_lockKey);
return await lockObject.TryLockAsync(100, 10000, TimeUnit.Milliseconds);
}
public async Task ReleaseLock()
{
var lockObject = _redissonClient.GetLock(_lockKey);
await lockObject.UnlockAsync();
}
}
Redisson的分布式锁实现相对成熟,提供了多种功能,如锁的自动续租、公平锁、可重入锁等,并且在性能和可靠性方面表现出色。通过与Redisson服务进行通信,C#应用可以方便地使用这些强大的分布式锁功能。
姿势八:使用Redis事务实现分布式锁
Redis事务可以将多个命令打包成一个原子操作执行,我们可以利用这一特性来实现分布式锁。
using StackExchange.Redis;
using System;
using System.Threading.Tasks;
public class TransactionBasedDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockKey;
private readonly string _lockValue;
private const int LockTimeout = 10000; // 10秒
public TransactionBasedDistributedLock(ConnectionMultiplexer redis, string lockKey)
{
_redis = redis;
_lockKey = lockKey;
_lockValue = Guid.NewGuid().ToString();
}
public async Taskbool> TryAcquireLock()
{
var db = _redis.GetDatabase();
var transaction = db.CreateTransaction();
transaction.AddCondition(Condition.KeyNotExists(_lockKey));
transaction.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout));
var result = await transaction.ExecuteAsync();
return result;
}
public void ReleaseLock()
{
var db = _redis.GetDatabase();
db.KeyDelete(_lockKey);
}
}
在这种实现中,我们使用Redis事务的条件功能(Condition.KeyNotExists)来确保只有在锁不存在时才执行设置锁的操作,从而实现了原子性的锁获取。然而,Redis事务在处理复杂逻辑时可能会存在一些限制,且性能上相对一些基于Lua脚本的实现可能稍逊一筹。
姿势九:基于Redis Cluster的分布式锁
在Redis Cluster环境下,实现分布式锁需要考虑集群的特性,如数据分片等。
