一、背景
最近,在版本封板的前夕,测试小姐姐突然找来,说有个功能设置了发送结果推送,设置的总共推送次数是99次,但是推送记录却有150多条,很显然数据出现了多发。
二、问题初修复
发送结果推送,在Web中由单独一条线程进行处理的。消息下发的时候,会先将推送数据记录到数据库的一个表中,同时投递到一个消息到延迟队列进行消费,通知线程会判断消息是否下发成功,成功则会进行通知,如果是还没下发或者待审核状态则重新投递回队列。另外模块初始化的时候也会去数据库里加载任务投递到队列里执行。
阅读代码发现,通知线程是没有加节点锁,意味着如果当前数据库中存在需要执行的任务,此时模块多节点重启,就会同时去加载任务并执行,从而造成数据下发。问题看起来不难解决,Web模块本身就有基于Redisson实现的节点锁,给任务加上一个就可以解决(恩,我真是个机灵鬼)。
基于Redisson实现的节点锁:
public class RedisDistributedLock {
private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
@Resource(name = "redissonClient")
private RedissonClient redisson;
// 判断是否拿到节点锁
private volatile boolean locked = false;
private RLock lock = null;
@PostConstruct
public void init() {
lock = redisson.getLock(clusterId + "-web");
}
public void tryLock() {
try {
locked = lock.tryLock(1, 30, TimeUnit.SECONDS);
if (locked) {
logger.info("Current node get the redis lock at:{}", new Date());
} else {
logger.warn("The redis lock is already held for another node.");
}
} catch (InterruptedException e) {
logger.error("Get lock from redis fail,caused by", e);
}
}
public boolean isLocked() {
return locked;
}
}
给通知线程加上节点锁:
三、新问题及原因
一切看起来很完美也符合逻辑,但是一启动,输出日志"Redis lock is held by other node, abandon to inform",很明显并没有拿到锁,一开始以为是被其他人抢占了,修改Redisson的配置文件换了新的db,还是获取不到锁,这有点颠覆自己对于节点锁的理解。
继续查看节点锁中的tryLock方法的引用,可以看到除了节点锁维护线程外,还有文件导入导出也尝试去加锁。
再看下Redisson对于tryLock的处理方式,会先获取当前请求的线程ID,调用tryAcquire去尝试进行加锁。
跟踪到最后发现,Redisson对于分布式锁的处理是调用了RedissonLock#tryLockInnerAsync,通过执行Lua脚本去实现加锁,数据结构上采用hash存储分布式锁的数据。对于这段脚本,getName是设置的锁名称,对应的是key,也就是KEYS[1],而ARGV[2]是field,对应的是getLockName方法返回的结果,value则是1。处理逻辑也比较直观,判断对应的key是否存在,如果存在,则返回nil,如果不存在,则添加对应的key和filed,同时设置相应的过期时间。
查看RedissonLock#getLockName可以看到,对于分布式锁的field,主要是由连接管理器的id和当前请求的线程id组成,而连接管理器的id则是UUID。可见,Redisson对于分布式锁加锁的对象其实是当前线程。
回到一开始的问题,这时候问题原因就很明了。文件导入导出在@PostConstruct的时候获取节点锁,也就是在项目一开始启动的时候先获取锁,此时是能获取成功,同时将节点锁状态RedisDistributedLock.locked 设置为true,而等到分布式锁线程GetRedisLockThread执行加锁,因为启动的是另外一条线程,此时加锁失败,同时将RedisDistributedLock.locked又设置成false(对原有值进行了覆盖),导致通知线程获取到的是节点锁加锁失败的状态。
四、最终处理方案
1:导入导出中获取节点锁,主要是为了项目重启的时候,保证只有一个节点将未完成的导入导出任务设置为失败。这里主要做的是数据修正的操作,所以取消导入导出的加锁操作,在项目启动的时候延时再去执行。
2:节点锁维护线程RedisDistributedLock.tryLock 方法是public,意味着随时随地都可以修改到节点锁状态,这里应该控制好访问权限。