从零到千万级并发:用C#+Redis实现分布式锁的9种姿势与血腥踩坑史

艺帆风顺 发布于 2025-04-02 32 次阅读


在当今互联网应用面临海量用户请求的背景下,实现高并发处理成为了关键挑战。分布式锁作为一种重要的同步机制,在保证数据一致性和避免竞态条件方面发挥着至关重要的作用。本文将深入探讨如何使用C#结合Redis实现分布式锁,为应对千万级并发场景提供多种解决方案,并分享在实践过程中那些令人难忘的踩坑经历。

为什么需要分布式锁 

随着业务规模的不断扩大,应用程序往往采用分布式架构来提高系统的可扩展性和性能。在分布式环境中,多个进程或服务可能同时访问共享资源,如数据库、文件系统等。如果没有有效的同步机制,就会出现竞态条件,导致数据不一致等问题。分布式锁正是为了解决这类问题而诞生的,它通过在分布式系统中创建一个全局唯一的锁,确保同一时间只有一个进程能够访问共享资源。

Redis作为分布式锁的优势 

Redis是一个高性能的内存数据库,它具备以下特点,使其成为实现分布式锁的理想选择:

  1. 原子操作:Redis支持一系列原子操作,如SETNX(即SET if Not eXists),可以在不依赖其他事务机制的情况下实现锁的原子性操作。这意味着在多个客户端同时尝试获取锁时,只有一个客户端能够成功执行SETNX操作,从而获得锁。
  2. 高可用性:Redis可以通过主从复制和集群模式实现高可用性,确保在部分节点故障的情况下,分布式锁服务依然能够正常运行。
  3. 性能卓越:由于Redis数据存储在内存中,读写速度极快,能够满足高并发场景下对锁操作的性能要求。

用C#实现分布式锁的9种姿势 

姿势一:基本的SETNX实现

using StackExchange.Redis;

public class BasicDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒

    public BasicDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public bool TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        return db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
    }

    public void ReleaseLock()
    {
        var db = _redis.GetDatabase();
        db.KeyDelete(_lockKey);
    }
}

在这种实现中,我们使用StringSet方法并结合When.NotExists参数来模拟SETNX操作获取锁。获取锁时,为锁设置一个唯一的值(通常是一个GUID)和过期时间。释放锁时,直接删除锁对应的键。然而,这种方法存在一个明显的问题,即如果持有锁的进程在锁过期前崩溃,而其他进程在锁过期后获取到了锁,此时原进程恢复并尝试释放锁,就会误删其他进程的锁。

姿势二:基于Lua脚本的原子释放

为了解决上述误删锁的问题,我们可以利用Lua脚本在Redis中实现原子操作。Lua脚本可以确保一系列Redis命令作为一个原子操作执行,避免在执行过程中被其他命令打断。

using StackExchange.Redis;

public class LuaBasedDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒
    private static readonly LuaScript _releaseScript;

    static LuaBasedDistributedLock()
    {
        _releaseScript = LuaScript.Prepare(@"
            if redis.call('GET', KEYS[1]) == ARGV[1] then
                return redis.call('DEL', KEYS[1])
            else
                return 0
            end");
    }

    public LuaBasedDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public bool TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        return db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
    }

    public void ReleaseLock()
    {
        var db = _redis.GetDatabase();
        db.ScriptEvaluate(_releaseScript, new RedisKey[] { _lockKey }, new RedisValue[] { _lockValue });
    }
}

在这个实现中,我们定义了一个Lua脚本_releaseScript,在释放锁时,只有当锁的值与当前持有锁的进程设置的值相同时,才会执行删除操作,从而避免了误删其他进程锁的情况。

姿势三:RedLock算法

RedLock算法是一种更高级的分布式锁实现方式,它通过在多个Redis实例上获取锁来提高锁的可靠性。假设我们有N个Redis实例(通常N为奇数),客户端需要在超过N/2个实例上成功获取锁,才能认为获取到了分布式锁。

using StackExchange.Redis;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

public class RedLock
{
    private readonly List _redisInstances;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒
    private const int Quorum = 3; // 假设使用3个Redis实例,需获取超过半数的锁

    public RedLock(List redisInstances, string lockKey)
    {
        _redisInstances = redisInstances;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public async Taskbool> TryAcquireLock()
    {
        var tasks = _redisInstances.Select(redis =>
        {
            var db = redis.GetDatabase();
            return db.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
        });
        var results = await Task.WhenAll(tasks);
        return results.Count(result => result) > Quorum / 2;
    }

    public async Task ReleaseLock()
    {
        var tasks = _redisInstances.Select(redis =>
        {
            var db = redis.GetDatabase();
            return db.KeyDeleteAsync(_lockKey);
        });
        await Task.WhenAll(tasks);
    }
}

RedLock算法虽然提高了锁的可靠性,但实现相对复杂,且在性能上可能会有所损失,因为需要与多个Redis实例进行交互。

姿势四:带续租功能的锁

在高并发场景下,锁的持有时间可能需要动态调整。如果锁在持有过程中过期,而业务逻辑尚未完成,就会导致并发问题。为了解决这个问题,我们可以实现一个带续租功能的分布式锁。

using StackExchange.Redis;
using System;
using System.Threading;
using System.Threading.Tasks;

public class RenewalDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒
    private const int RenewalInterval = 3000; // 3秒续租一次
    private CancellationTokenSource _cancellationTokenSource;

    public RenewalDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public async Taskbool> TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        var acquired = await db.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
        if (acquired)
        {
            _cancellationTokenSource = new CancellationTokenSource();
            Task.Run(() => RenewLockAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
        }
        return acquired;
    }

    private async Task RenewLockAsync(CancellationToken cancellationToken)
    {
        var db = _redis.GetDatabase();
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(RenewalInterval, cancellationToken);
            var currentValue = await db.StringGetAsync(_lockKey);
            if (currentValue == _lockValue)
            {
                await db.KeyExpireAsync(_lockKey, TimeSpan.FromSeconds(LockTimeout));
            }
        }
    }

    public void ReleaseLock()
    {
        _cancellationTokenSource?.Cancel();
        var db = _redis.GetDatabase();
        db.KeyDelete(_lockKey);
    }
}

在这个实现中,当获取到锁后,启动一个后台任务,每隔一段时间(RenewalInterval)检查锁是否仍然由当前进程持有,如果是,则延长锁的过期时间。

姿势五:公平锁实现

在一些场景下,需要保证锁的获取顺序,即先请求锁的进程先获取到锁,这就需要实现公平锁。在Redis中,可以通过使用有序集合(Sorted Set)来实现公平锁。

using StackExchange.Redis;
using System;
using System.Threading.Tasks;

public class FairDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _clientId;
    private const int LockTimeout = 10000; // 10秒

    public FairDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _clientId = Guid.NewGuid().ToString();
    }

    public async Taskbool> TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var added = await db.SortedSetAddAsync(_lockKey, _clientId, timestamp);
        if (added)
        {
            var rank = await db.SortedSetRankAsync(_lockKey, _clientId);
            return rank == 0;
        }
        return false;
    }

    public void ReleaseLock()
    {
        var db = _redis.GetDatabase();
        db.SortedSetRemove(_lockKey, _clientId);
    }
}

在公平锁的实现中,每个客户端在尝试获取锁时,将自己的唯一标识(_clientId)和当前时间戳作为有序集合的成员添加到Redis中。只有当客户端添加的成员在有序集合中的排名为0时,才表示该客户端获取到了锁,从而保证了锁获取的公平性。

姿势六:可重入锁实现

可重入锁允许同一个线程多次获取同一把锁,而不会造成死锁。在Redis中实现可重入锁,需要在锁的实现中记录锁的持有线程信息。

using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Threading;

public class ReentrantDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒
    private static readonly Dictionarystring, int> _reentrancyCount = new Dictionarystring, int>();
    private static readonly AsyncLocalstring> _currentLockHolder = new AsyncLocalstring>();

    public ReentrantDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public bool TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        var currentHolder = _currentLockHolder.Value;
        if (currentHolder == _lockValue)
        {
            if (!_reentrancyCount.ContainsKey(_lockValue))
            {
                _reentrancyCount.Add(_lockValue, 1);
            }
            else
            {
                _reentrancyCount[_lockValue]++;
            }
            return true;
        }
        var acquired = db.StringSet(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout), When.NotExists);
        if (acquired)
        {
            _currentLockHolder.Value = _lockValue;
            _reentrancyCount.Add(_lockValue, 1);
        }
        return acquired;
    }

    public void ReleaseLock()
    {
        var db = _redis.GetDatabase();
        var currentHolder = _currentLockHolder.Value;
        if (currentHolder == _lockValue)
        {
            _reentrancyCount[_lockValue]--;
            if (_reentrancyCount[_lockValue] == 0)
            {
                _reentrancyCount.Remove(_lockValue);
                _currentLockHolder.Value = null;
                db.KeyDelete(_lockKey);
            }
        }
    }
}

在可重入锁的实现中,我们使用一个Dictionary来记录每个锁持有者的重入次数,同时使用AsyncLocal来存储当前持有锁的线程对应的锁值。当同一个线程再次获取锁时,增加重入次数而不是重新获取锁,释放锁时则相应减少重入次数,当重入次数为0时才真正删除锁。

姿势七:基于Redisson的分布式锁

Redisson是一个在Redis基础上实现的Java驻内存数据网格(In-Memory Data Grid),它提供了丰富的分布式服务,包括分布式锁。虽然它是基于Java的,但通过一些跨语言的通信方式,C#应用也可以使用Redisson的分布式锁功能。

// 假设通过gRPC等方式与Redisson服务通信
// 这里仅为概念性示例,实际实现需根据具体通信方式调整
public class RedissonBasedDistributedLock
{
    private readonly RedissonClient _redissonClient;
    private readonly string _lockKey;

    public RedissonBasedDistributedLock(RedissonClient redissonClient, string lockKey)
    {
        _redissonClient = redissonClient;
        _lockKey = lockKey;
    }

    public async Taskbool> TryAcquireLock()
    {
        var lockObject = _redissonClient.GetLock(_lockKey);
        return await lockObject.TryLockAsync(100, 10000, TimeUnit.Milliseconds);
    }

    public async Task ReleaseLock()
    {
        var lockObject = _redissonClient.GetLock(_lockKey);
        await lockObject.UnlockAsync();
    }
}

Redisson的分布式锁实现相对成熟,提供了多种功能,如锁的自动续租、公平锁、可重入锁等,并且在性能和可靠性方面表现出色。通过与Redisson服务进行通信,C#应用可以方便地使用这些强大的分布式锁功能。

姿势八:使用Redis事务实现分布式锁

Redis事务可以将多个命令打包成一个原子操作执行,我们可以利用这一特性来实现分布式锁。

using StackExchange.Redis;
using System;
using System.Threading.Tasks;

public class TransactionBasedDistributedLock
{
    private readonly ConnectionMultiplexer _redis;
    private readonly string _lockKey;
    private readonly string _lockValue;
    private const int LockTimeout = 10000; // 10秒

    public TransactionBasedDistributedLock(ConnectionMultiplexer redis, string lockKey)
    {
        _redis = redis;
        _lockKey = lockKey;
        _lockValue = Guid.NewGuid().ToString();
    }

    public async Taskbool> TryAcquireLock()
    {
        var db = _redis.GetDatabase();
        var transaction = db.CreateTransaction();
        transaction.AddCondition(Condition.KeyNotExists(_lockKey));
        transaction.StringSetAsync(_lockKey, _lockValue, TimeSpan.FromSeconds(LockTimeout));
        var result = await transaction.ExecuteAsync();
        return result;
    }

    public void ReleaseLock()
    {
        var db = _redis.GetDatabase();
        db.KeyDelete(_lockKey);
    }
}

在这种实现中,我们使用Redis事务的条件功能(Condition.KeyNotExists)来确保只有在锁不存在时才执行设置锁的操作,从而实现了原子性的锁获取。然而,Redis事务在处理复杂逻辑时可能会存在一些限制,且性能上相对一些基于Lua脚本的实现可能稍逊一筹。

姿势九:基于Redis Cluster的分布式锁

在Redis Cluster环境下,实现分布式锁需要考虑集群的特性,如数据分片等。