Redis高级应用:分布式锁与消息队列实现
Redis不仅仅是一个缓存工具,它丰富的数据结构和原子操作特性使其成为构建分布式系统的利器。本文将深入讲解Redis在分布式锁和消息队列两个高级场景中的实现原理和最佳实践。
一、分布式锁
在分布式系统中,多个服务实例可能同时访问共享资源,分布式锁是协调并发访问的核心机制。分布式锁需要满足:互斥性、防死锁、可重入、高可用。
1. 基本原理
Redis通过SET命令的NX(不存在才设置)和EX(设置过期时间)选项实现分布式锁。加锁时设置一个唯一标识值,解锁时通过Lua脚本原子性地检查值并删除,避免误删别人的锁。
2. PHP实现分布式锁
class RedisLock
{
private $redis;
private $lockKey;
private $lockValue;
private $expireTime = 30;
public function lock($retryCount = 3, $retryDelay = 200): bool
{
$this->lockValue = uniqid(getmypid() . ":", true);
for ($i = 0; $i < $retryCount; $i++) {
$result = $this->redis->set(
$this->lockKey,
$this->lockValue,
["NX", "EX" => $this->expireTime]
);
if ($result) return true;
usleep($retryDelay * 1000);
}
return false;
}
public function unlock(): bool
{
// Lua脚本保证原子性:先检查值再删除
$script = LuaScripts::COMPARE_AND_DELETE;
return (bool) $this->redis->eval($script, [$this->lockKey, $this->lockValue], 1);
}
public function renew(): bool
{
$script = LuaScripts::COMPARE_AND_EXPIRE;
return (bool) $this->redis->eval($script, [$this->lockKey, $this->lockValue, $this->expireTime], 1);
}
}
3. 防止库存超卖
function deductStock($productId, $quantity)
{
$lock = new RedisLock($redis, "stock:" . $productId);
try {
if (!$lock->lock()) {
throw new Exception("系统繁忙,请稍后重试");
}
$stock = Stock::find($productId);
if ($stock->quantity < $quantity) {
throw new Exception("库存不足");
}
$stock->quantity -= $quantity;
$stock->save();
return true;
} finally {
$lock->unlock();
}
}
二、消息队列实现
1. 基于List的简单队列
// 生产者入队
$redis->lpush("queue:emails", json_encode([
"id" => uniqid("msg_"),
"to" => "user@example.com",
"subject" => "欢迎注册",
"created_at" => time()
]));
// 消费者阻塞等待
while (true) {
$result = $redis->brpop(["queue:emails"], 30);
if ($result) {
$message = json_decode($result[1], true);
try {
sendEmail($message);
} catch (Exception $e) {
// 失败放入重试队列
$redis->lpush("queue:emails:retry", $result[1]);
}
}
}
2. 基于Stream的可靠队列(Redis 5.0+)
// 创建消费者组
$redis->xgroup("CREATE", "stream:orders", "processors", "$", true);
// 生产消息
$redis->xadd("stream:orders", "*", [
"order_id" => $orderId,
"action" => "create",
"data" => json_encode($orderData)
]);
// 消费消息(支持确认机制)
$messages = $redis->xreadgroup(
"GROUP", "processors", "consumer-1",
"COUNT", 10, "BLOCK", 5000,
"STREAMS", "stream:orders", ">"
);
foreach ($messages as $entries) {
foreach ($entries as $id => $data) {
try {
processOrder($data);
$redis->xack("stream:orders", "processors", [$id]);
} catch (Exception $e) {
// 未确认的消息会被自动重新投递
}
}
}
三、发布订阅模式
// 发布通知
$redis->publish("notifications", json_encode([
"type" => "order_paid",
"message" => "订单已支付成功"
]));
// 订阅频道
$redis->subscribe(["notifications"], function ($redis, $channel, $msg) {
$data = json_decode($msg, true);
handleNotification($data);
});
四、生产环境注意事项
- 分布式锁要设置合理的过期时间,避免死锁
- 解锁必须通过Lua脚本保证原子性
- Redis主从切换时锁可能丢失,强一致性场景建议使用RedLock算法
- 消息队列要处理重复消费问题,实现幂等性
- 生产环境建议使用Redis Sentinel或Cluster保证高可用
- Stream队列要定期清理已处理消息(XTRIM命令)
Redis在分布式场景中的应用非常广泛。分布式锁解决了并发问题,消息队列实现了服务解耦和异步处理。简单场景用Redis即可,复杂场景可考虑RabbitMQ或Kafka等专业中间件。