Redis高级应用:分布式锁与消息队列实现


Redis高级应用:分布式锁与消息队列实现

Redis不仅仅是一个缓存工具,它丰富的数据结构和原子操作特性使其成为构建分布式系统的利器。本文将深入讲解Redis在分布式锁和消息队列两个高级场景中的实现原理和最佳实践。

一、分布式锁

在分布式系统中,多个服务实例可能同时访问共享资源,分布式锁是协调并发访问的核心机制。分布式锁需要满足:互斥性、防死锁、可重入、高可用。

1. 基本原理

Redis通过SET命令的NX(不存在才设置)和EX(设置过期时间)选项实现分布式锁。加锁时设置一个唯一标识值,解锁时通过Lua脚本原子性地检查值并删除,避免误删别人的锁。

2. PHP实现分布式锁

class RedisLock
{
    private $redis;
    private $lockKey;
    private $lockValue;
    private $expireTime = 30;
    
    public function lock($retryCount = 3, $retryDelay = 200): bool
    {
        $this->lockValue = uniqid(getmypid() . ":", true);
        
        for ($i = 0; $i < $retryCount; $i++) {
            $result = $this->redis->set(
                $this->lockKey,
                $this->lockValue,
                ["NX", "EX" => $this->expireTime]
            );
            if ($result) return true;
            usleep($retryDelay * 1000);
        }
        return false;
    }
    
    public function unlock(): bool
    {
        // Lua脚本保证原子性:先检查值再删除
        $script = LuaScripts::COMPARE_AND_DELETE;
        return (bool) $this->redis->eval($script, [$this->lockKey, $this->lockValue], 1);
    }
    
    public function renew(): bool
    {
        $script = LuaScripts::COMPARE_AND_EXPIRE;
        return (bool) $this->redis->eval($script, [$this->lockKey, $this->lockValue, $this->expireTime], 1);
    }
}

3. 防止库存超卖

function deductStock($productId, $quantity)
{
    $lock = new RedisLock($redis, "stock:" . $productId);
    try {
        if (!$lock->lock()) {
            throw new Exception("系统繁忙,请稍后重试");
        }
        $stock = Stock::find($productId);
        if ($stock->quantity < $quantity) {
            throw new Exception("库存不足");
        }
        $stock->quantity -= $quantity;
        $stock->save();
        return true;
    } finally {
        $lock->unlock();
    }
}

二、消息队列实现

1. 基于List的简单队列

// 生产者入队
$redis->lpush("queue:emails", json_encode([
    "id" => uniqid("msg_"),
    "to" => "user@example.com",
    "subject" => "欢迎注册",
    "created_at" => time()
]));

// 消费者阻塞等待
while (true) {
    $result = $redis->brpop(["queue:emails"], 30);
    if ($result) {
        $message = json_decode($result[1], true);
        try {
            sendEmail($message);
        } catch (Exception $e) {
            // 失败放入重试队列
            $redis->lpush("queue:emails:retry", $result[1]);
        }
    }
}

2. 基于Stream的可靠队列(Redis 5.0+)

// 创建消费者组
$redis->xgroup("CREATE", "stream:orders", "processors", "$", true);

// 生产消息
$redis->xadd("stream:orders", "*", [
    "order_id" => $orderId,
    "action" => "create",
    "data" => json_encode($orderData)
]);

// 消费消息(支持确认机制)
$messages = $redis->xreadgroup(
    "GROUP", "processors", "consumer-1",
    "COUNT", 10, "BLOCK", 5000,
    "STREAMS", "stream:orders", ">"
);

foreach ($messages as $entries) {
    foreach ($entries as $id => $data) {
        try {
            processOrder($data);
            $redis->xack("stream:orders", "processors", [$id]);
        } catch (Exception $e) {
            // 未确认的消息会被自动重新投递
        }
    }
}

三、发布订阅模式

// 发布通知
$redis->publish("notifications", json_encode([
    "type" => "order_paid",
    "message" => "订单已支付成功"
]));

// 订阅频道
$redis->subscribe(["notifications"], function ($redis, $channel, $msg) {
    $data = json_decode($msg, true);
    handleNotification($data);
});

四、生产环境注意事项

  • 分布式锁要设置合理的过期时间,避免死锁
  • 解锁必须通过Lua脚本保证原子性
  • Redis主从切换时锁可能丢失,强一致性场景建议使用RedLock算法
  • 消息队列要处理重复消费问题,实现幂等性
  • 生产环境建议使用Redis Sentinel或Cluster保证高可用
  • Stream队列要定期清理已处理消息(XTRIM命令)

Redis在分布式场景中的应用非常广泛。分布式锁解决了并发问题,消息队列实现了服务解耦和异步处理。简单场景用Redis即可,复杂场景可考虑RabbitMQ或Kafka等专业中间件。


0.070076s