一、问题背景:分布式锁失效的隐患
在分布式系统中,分布式锁是保证数据一致性的重要手段。最常见的实现方式是基于 Redis 的分布式锁,通常设置一个过期时间来防止死锁。
然而,在实际生产环境中,我们遇到了这样的问题:
2024-01-15 10:30:15 ERROR - 订单处理异常:库存扣减成功,但订单状态更新失败
2024-01-15 10:30:15 WARN - 分布式锁已过期释放,其他线程获取了锁
2024-01-15 10:30:16 INFO - 另一个线程开始处理同一订单...
问题分析:
- 锁的过期时间设置为 30 秒
- 订单处理任务实际执行了 45 秒
- 锁在第 30 秒自动过期释放
- 其他线程获取锁并开始处理
- 前一个线程继续执行,导致数据不一致
这就是典型的分布式锁续约问题:长任务执行中途锁失效,引发并发安全问题。
二、核心概念:分布式锁与 WatchDog 机制
2.1 分布式锁的基本原理
┌────────────────────────────────────────────────────────────────┐
│ Redis 分布式锁工作流程 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 客户端 A │
│ │ │
│ │ SET lock:key value NX EX 30 │
│ │ (加锁,30秒过期) │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Redis │ │
│ │ lock:key = "client-A-value" │ │
│ │ TTL = 30s │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 执行业务逻辑(耗时45秒) │
│ │ │
│ 30秒后 ────────────────────────────────────────────────────▶ │
│ │ │
│ │ 锁自动过期释放 │
│ │ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Redis │ │
│ │ lock:key = null (已删除) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 客户端 B 获取锁 │
│ │ │
│ ▼ │
│ 问题:客户端 A 仍在执行,客户端 B 同时执行 │
│ 结果:数据不一致 │
│ │
└────────────────────────────────────────────────────────────────┘
2.2 WatchDog 自动续期机制
┌────────────────────────────────────────────────────────────────┐
│ WatchDog 自动续期机制 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 主线程 │
│ │ │
│ │ 获取锁(lockTimeout=30s) │
│ │ │
│ ├─────────────────────────────────────────────┐ │
│ │ │ │
│ │ 启动 WatchDog 看门狗线程 │ │
│ │ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ WatchDog Thread │ │ │
│ │ │ │ │ │
│ │ │ 每 10秒检查一次: │ │ │
│ │ │ - 锁是否仍由当前线程持有? │ │ │
│ │ │ - 主线程是否仍在执行? │ │ │
│ │ │ │ │ │
│ │ │ 如果满足条件: │ │ │
│ │ │ → 续期:EXPIRE lock:key 30 │ │ │
│ │ │ │ │ │
│ │ │ 每 10 秒续期一次 │ │ │
│ │ │ - 10s: EXPIRE lock:key 30 │ │ │
│ │ │ - 20s: EXPIRE lock:key 30 │ │ │
│ │ │ - 30s: EXPIRE lock:key 30 │ │ │
│ │ │ - 40s: EXPIRE lock:key 30 │ │ │
│ │ │ │ │ │
│ │ │ 主线程完成后: │ │ │
│ │ │ → 停止续期 │ │ │
│ │ │ → 释放锁 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ │ │ │
│ │ 执行业务逻辑(45秒) │ │
│ │ │ │
│ │ ────────▶ 锁持续被续期,不会过期 │ │
│ │ │ │
│ │ 任务完成 │ │
│ │ │ │
│ │ 释放锁 │ │
│ │ 停止 WatchDog │ │
│ │ │ │
│ ▼ ▼ │
│ 安全完成 停止续期 │
│ │
└────────────────────────────────────────────────────────────────┘
2.3 WatchDog 续期时序图
时间轴 主线程 WatchDog Redis
│
│ 获取锁 ──────────────────────────────────────▶ lock:key=value
│ (TTL=30s)
│
│ 启动 WatchDog
│ │
│ │
0s │ 开始定时检查
│ │
│ 开始执行业务
│ │
10s │ 检查锁状态 ──────────────────────────▶ GET lock:key
│ │ 锁仍持有? ──────────────────────────▶ value
│ │ 续期 ───────────────────────────────▶ EXPIRE 30s
│ │ (TTL 重置为 30s)
│ │
20s │ 检查锁状态 ──────────────────────────▶ GET lock:key
│ │ 续期 ───────────────────────────────▶ EXPIRE 30s
│ │
│ 继续执行...
│ │
30s │ 检查锁状态 ──────────────────────────▶ GET lock:key
│ │ 续期 ───────────────────────────────▶ EXPIRE 30s
│ │
│ 继续执行...
│ │
40s │ 检查锁状态 ──────────────────────────▶ GET lock:key
│ │ 续期 ───────────────────────────────▶ EXPIRE 30s
│ │
45s 任务完成
│ 释放锁 ───────────────────────────────────────▶ DEL lock:key
│ 停止 WatchDog
│ │ 停止续期
│
▼
2.4 续期策略参数
| 参数 | 默认值 | 说明 |
|---|---|---|
lockTimeout | 30秒 | 锁的初始过期时间 |
watchdogInterval | 10秒 | WatchDog 检查间隔(通常为 lockTimeout/3) |
renewExtension | 30秒 | 每次续期延长时间 |
maxRenewCount | 无限制 | 最大续期次数(可选配置) |
三、实现方案:基于 Redis 的 WatchDog 自动续期
3.1 方案架构设计
┌────────────────────────────────────────────────────────────────┐
│ 分布式锁续期架构设计 │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ DistributedLockService │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ lockWithWatchdog(key, timeout, task) │ │ │
│ │ │ │ │ │
│ │ │ 1. 获取锁 │ │ │
│ │ │ 2. 启动 WatchDog │ │ │
│ │ │ 3. 执行任务 │ │ │
│ │ │ 4. 停止 WatchDog │ │ │
│ │ │ 5. 释放锁 │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ WatchDogRenewalThread │ │ │
│ │ │ │ │ │
│ │ │ - 定时检查锁状态 │ │ │
│ │ │ - 自动续期 │ │ │
│ │ │ - 任务完成后停止 │ │ │
│ │ │ - 异常情况处理 │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Redis │ │
│ │ │ │
│ │ - 分布式锁存储 │ │
│ │ - 锁续期操作 │ │
│ │ - 锁状态查询 │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
3.2 分布式锁核心实现
@Service
@Slf4j
public class DistributedLockService {
@Autowired
private StringRedisTemplate redisTemplate;
private final ScheduledExecutorService watchdogExecutor =
Executors.newScheduledThreadPool(5);
/**
* 带WatchDog自动续期的分布式锁
*
* @param lockKey 锁的key
* @param lockValue 锁的value(用于标识持有者)
* @param lockTime 锁的初始过期时间(秒)
* @param task 要执行的任务
* @return 任务执行结果
*/
public <T> T executeWithLock(String lockKey, String lockValue,
long lockTime, Supplier<T> task) {
// 1. 尝试获取锁
Boolean acquired = tryLock(lockKey, lockValue, lockTime);
if (!acquired) {
throw new LockAcquisitionException("Failed to acquire lock: " + lockKey);
}
// 2. 启动WatchDog续期线程
ScheduledFuture<?> watchdogFuture = startWatchdog(lockKey, lockValue, lockTime);
try {
// 3. 执行业务任务
log.info("Lock acquired, starting task execution: {}", lockKey);
T result = task.get();
log.info("Task completed successfully: {}", lockKey);
return result;
} catch (Exception e) {
log.error("Task execution failed: {}", lockKey, e);
throw e;
} finally {
// 4. 停止WatchDog
stopWatchdog(watchdogFuture);
// 5. 释放锁
releaseLock(lockKey, lockValue);
}
}
/**
* 尝试获取锁
*/
private Boolean tryLock(String lockKey, String lockValue, long lockTime) {
return redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, lockTime, TimeUnit.SECONDS);
}
/**
* 启动WatchDog自动续期
*/
private ScheduledFuture<?> startWatchdog(String lockKey, String lockValue,
long lockTime) {
// 续期间隔 = 锁过期时间 / 3
long renewalInterval = lockTime / 3;
return watchdogExecutor.scheduleAtFixedRate(() -> {
try {
renewLock(lockKey, lockValue, lockTime);
} catch (Exception e) {
log.error("WatchDog renewal failed for lock: {}", lockKey, e);
}
}, renewalInterval, renewalInterval, TimeUnit.SECONDS);
}
/**
* 续期锁
*/
private void renewLock(String lockKey, String lockValue, long lockTime) {
// 检查锁是否仍由当前线程持有
String currentValue = redisTemplate.opsForValue().get(lockKey);
if (lockValue.equals(currentValue)) {
// 锁仍由当前线程持有,续期
redisTemplate.expire(lockKey, lockTime, TimeUnit.SECONDS);
log.debug("Lock renewed: {}, new TTL: {}s", lockKey, lockTime);
} else {
// 锁已被其他线程持有或已过期,停止续期
log.warn("Lock no longer held by current thread: {}", lockKey);
}
}
/**
* 停止WatchDog
*/
private void stopWatchdog(ScheduledFuture<?> watchdogFuture) {
if (watchdogFuture != null) {
watchdogFuture.cancel(false);
log.debug("WatchDog stopped");
}
}
/**
* 释放锁(Lua脚本保证原子性)
*/
private void releaseLock(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
);
log.info("Lock released: {}", lockKey);
}
}
3.3 WatchDog 续期优化实现
@Component
@Slf4j
public class WatchDogManager {
@Autowired
private StringRedisTemplate redisTemplate;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(10);
// 存储每个锁对应的续期任务
private final ConcurrentHashMap<String, ScheduledFuture<?>> renewalTasks =
new ConcurrentHashMap<>();
// 存储每个锁的持有信息
private final ConcurrentHashMap<String, LockContext> lockContexts =
new ConcurrentHashMap<>();
/**
* 启动WatchDog续期
*/
public void startRenewal(String lockKey, String lockValue, long lockTimeSec) {
LockContext context = LockContext.builder()
.lockKey(lockKey)
.lockValue(lockValue)
.lockTimeSec(lockTimeSec)
.renewalIntervalSec(lockTimeSec / 3)
.startTime(System.currentTimeMillis())
.renewalCount(0)
.build();
lockContexts.put(lockKey, context);
// 启动定时续期任务
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
() -> performRenewal(lockKey),
context.getRenewalIntervalSec(),
context.getRenewalIntervalSec(),
TimeUnit.SECONDS
);
renewalTasks.put(lockKey, future);
log.info("WatchDog started for lock: {}, interval: {}s",
lockKey, context.getRenewalIntervalSec());
}
/**
* 执行续期
*/
private void performRenewal(String lockKey) {
LockContext context = lockContexts.get(lockKey);
if (context == null) {
log.warn("Lock context not found: {}", lockKey);
stopRenewal(lockKey);
return;
}
try {
// 使用Lua脚本保证原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
context.getLockValue(),
String.valueOf(context.getLockTimeSec())
);
if (result != null && result == 1) {
context.setRenewalCount(context.getRenewalCount() + 1);
log.debug("Lock renewed: {}, count: {}, remaining TTL: {}s",
lockKey, context.getRenewalCount(), context.getLockTimeSec());
} else {
log.warn("Lock renewal failed, lock may have been released: {}", lockKey);
stopRenewal(lockKey);
}
} catch (Exception e) {
log.error("Exception during lock renewal: {}", lockKey, e);
// 续期失败不停止WatchDog,继续尝试
}
}
/**
* 停止续期
*/
public void stopRenewal(String lockKey) {
ScheduledFuture<?> future = renewalTasks.remove(lockKey);
if (future != null) {
future.cancel(false);
log.info("WatchDog stopped for lock: {}", lockKey);
}
lockContexts.remove(lockKey);
}
/**
* 获取续期统计信息
*/
public LockStats getLockStats(String lockKey) {
LockContext context = lockContexts.get(lockKey);
if (context == null) {
return null;
}
return LockStats.builder()
.lockKey(lockKey)
.renewalCount(context.getRenewalCount())
.durationSec((System.currentTimeMillis() - context.getStartTime()) / 1000)
.build();
}
@PreDestroy
public void shutdown() {
scheduler.shutdown();
log.info("WatchDog scheduler shutdown");
}
}
3.4 增强版分布式锁服务
@Service
@Slf4j
public class EnhancedDistributedLockService {
@Autowired
private WatchDogManager watchDogManager;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 带WatchDog自动续期的分布式锁执行
*/
public <T> T lockAndExecute(String lockKey, long lockTimeSec,
Supplier<T> task) {
String lockValue = generateLockValue();
return lockAndExecute(lockKey, lockValue, lockTimeSec,
Long.MAX_VALUE, task);
}
/**
* 增强版:支持最大续期次数限制
*/
public <T> T lockAndExecute(String lockKey, String lockValue,
long lockTimeSec, long maxRenewalTimeMs,
Supplier<T> task) {
long startTime = System.currentTimeMillis();
// 1. 尝试获取锁
boolean acquired = tryAcquireLock(lockKey, lockValue, lockTimeSec);
if (!acquired) {
throw new DistributedLockException(
"Failed to acquire lock: " + lockKey);
}
log.info("Lock acquired: key={}, value={}, timeout={}s",
lockKey, lockValue, lockTimeSec);
// 2. 启动WatchDog
watchDogManager.startRenewal(lockKey, lockValue, lockTimeSec);
try {
// 3. 执行任务(带超时检查)
T result = executeWithTimeout(task, maxRenewalTimeMs, startTime);
log.info("Task completed successfully: key={}, duration={}ms",
lockKey, System.currentTimeMillis() - startTime);
return result;
} catch (TimeoutException e) {
log.error("Task timeout exceeded max renewal time: key={}, max={}ms",
lockKey, maxRenewalTimeMs);
throw new DistributedLockException("Task execution timeout", e);
} catch (Exception e) {
log.error("Task execution failed: key={}", lockKey, e);
throw e;
} finally {
// 4. 停止WatchDog
watchDogManager.stopRenewal(lockKey);
// 5. 释放锁
releaseLock(lockKey, lockValue);
}
}
/**
* 带超时检查的任务执行
*/
private <T> T executeWithTimeout(Supplier<T> task,
long maxRenewalTimeMs,
long startTime) throws TimeoutException {
// 检查是否超过最大续期时间
if (maxRenewalTimeMs != Long.MAX_VALUE) {
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > maxRenewalTimeMs) {
throw new TimeoutException("Task exceeded max renewal time");
}
}
return task.get();
}
/**
* 尝试获取锁
*/
private boolean tryAcquireLock(String lockKey, String lockValue,
long lockTimeSec) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, lockTimeSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
/**
* 释放锁(Lua脚本保证原子性)
*/
private void releaseLock(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
);
log.info("Lock released: key={}", lockKey);
}
/**
* 生成锁的唯一标识
*/
private String generateLockValue() {
return UUID.randomUUID().toString() + ":" + Thread.currentThread().getId();
}
}
四、实战应用:订单处理场景
4.1 订单处理服务
@Service
@Slf4j
public class OrderProcessingService {
@Autowired
private EnhancedDistributedLockService lockService;
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private OrderRepository orderRepository;
/**
* 处理订单(带分布式锁和WatchDog续期)
*/
public OrderResult processOrder(Long orderId) {
String lockKey = "order:process:" + orderId;
long lockTimeSec = 30; // 锁初始过期时间30秒
long maxRenewalTimeMs = 120000; // 最大续期时间120秒
return lockService.lockAndExecute(
lockKey,
lockTimeSec,
maxRenewalTimeMs,
() -> doProcessOrder(orderId)
);
}
/**
* 实际订单处理逻辑
*/
private OrderResult doProcessOrder(Long orderId) {
log.info("Starting order processing: orderId={}", orderId);
try {
// 1. 查询订单信息
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
// 2. 扣减库存(耗时操作)
inventoryService.deductStock(order.getProductId(), order.getQuantity());
// 3. 处理支付(耗时操作)
PaymentResult payment = paymentService.processPayment(order.getPaymentId());
// 4. 更新订单状态
order.setStatus(OrderStatus.PROCESSING);
order.setPaymentTransactionId(payment.getTransactionId());
orderRepository.save(order);
// 5. 发送通知
notificationService.sendOrderConfirmation(order.getUserId(), orderId);
log.info("Order processed successfully: orderId={}", orderId);
return OrderResult.success(orderId, payment.getTransactionId());
} catch (Exception e) {
log.error("Order processing failed: orderId={}", orderId, e);
// 补偿处理
compensateOrder(orderId);
return OrderResult.failed(orderId, e.getMessage());
}
}
/**
* 订单补偿处理
*/
private void compensateOrder(Long orderId) {
// 恢复库存
// 取消支付
// 更新订单状态为失败
}
}
4.2 监控续期状态
@RestController
@RequestMapping("/api/lock")
@Slf4j
public class LockMonitorController {
@Autowired
private WatchDogManager watchDogManager;
@GetMapping("/stats/{lockKey}")
public ResponseEntity<LockStats> getLockStats(@PathVariable String lockKey) {
LockStats stats = watchDogManager.getLockStats(lockKey);
if (stats == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(stats);
}
@GetMapping("/active")
public ResponseEntity<List<LockStats>> getActiveLocks() {
// 返回当前活跃的锁列表
return ResponseEntity.ok(Collections.emptyList());
}
}
五、配置文件示例
server:
port: 8080
spring:
application:
name: distributed-lock-demo
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 3000
# 分布式锁配置
distributed-lock:
default-lock-time: 30s
watchdog-interval: 10s
max-renewal-time: 120s
retry-times: 3
retry-interval: 100ms
management:
endpoints:
web:
exposure:
include: health, info, prometheus, metrics
logging:
level:
com.example.lock: DEBUG
org.springframework.data.redis: DEBUG
六、最佳实践建议
6.1 WatchDog 配置规范
| 配置项 | 推荐值 | 说明 |
|---|---|---|
lockTime | 30秒 | 锁初始过期时间 |
watchdogInterval | lockTime/3 | 续期检查间隔 |
maxRenewalTime | 120秒 | 最大续期时间上限 |
renewalThreads | 5-10 | WatchDog线程池大小 |
6.2 使用规范
- 合理设置锁过期时间:根据任务预估时长设置
- 设置最大续期时间:防止任务无限续期
- 任务异常处理:任务失败时正确释放锁
- 监控续期状态:记录续期次数和时长
- Lua脚本原子操作:续期和释放使用Lua保证原子性
6.3 注意事项
┌────────────────────────────────────────────────────────────────┐
│ WatchDog 使用注意事项 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. 续期间隔设置 │
│ - 续期间隔 < 锁过期时间 │
│ - 推荐:续期间隔 = 锁过期时间 / 3 │
│ - 预留足够时间完成续期 │
│ │
│ 2. 最大续期时间 │
│ - 设置最大续期上限 │
│ - 超过上限强制停止 │
│ - 防止无限续期占用资源 │
│ │
│ 3. 续期失败处理 │
│ - 续期失败不立即停止 │
│ - 继续尝试续期 │
│ - 多次失败后停止 │
│ │
│ 4. 锁释放时机 │
│ - 任务完成后立即释放 │
│ - 异常情况也要释放 │
│ - 使用Lua脚本保证原子性 │
│ │
│ 5. 线程池配置 │
│ - WatchDog使用独立线程池 │
│ - 避免与业务线程池混用 │
│ - 合理设置线程数量 │
│ │
└────────────────────────────────────────────────────────────────┘
七、总结
WatchDog 自动续期机制解决了分布式锁在长任务执行过程中的过期失效问题,核心要点:
- 自动续期:WatchDog 定时检查并续期锁
- 续期间隔:推荐设置为锁过期时间的 1/3
- 原子操作:使用 Lua 脚本保证续期原子性
- 最大续期限制:防止无限续期
- 异常处理:确保锁正确释放
通过 WatchDog 机制,可以有效防止分布式锁中途失效,保障分布式系统的数据一致性。
互动话题
您在项目中是否遇到过分布式锁过期失效的问题?是如何解决的?欢迎在评论区分享您的经验!
