博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java分布式锁之数据库实现
阅读量:6509 次
发布时间:2019-06-24

本文共 7327 字,大约阅读时间需要 24 分钟。

之前的文章中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。以下详细讲解了数据库的交易同步锁和交易重试补偿锁的实现。

一、数据库的设计

数据库锁表的表结构如下:

 
field type comment
ID bigint 主键
OUTER_SERIAL_NO varchar 流水号
CUST_NO char 客户号
SOURCE_CODE varchar 锁操作
THREAD_NO varchar 线程号
STATUS char 锁状态
REMARK varchar 备注
CREATED_AT timestamp 创建时间
UPDATED_AT timestamp 更新时间

作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

客户号:客户的唯一标识

锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

线程号:当前操作线程的线程号,比如取当前线程的uuid

锁状态:P处理中,F失败,Y成功

二、代码设计

代码的目录结构如下: 

主要贴一下锁操作的核心代码实现:

锁接口定义:DbLockManager.java

/** * 锁接口 
* * @Author fugaoyang * */public interface DbLockManager { /** * 加锁 */ boolean lock(String outerSerialNo, String custNo, LockSource source); /** * 解锁 */ void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus);}
View Code

锁接口实现类:DbLockManagerImpl.java

/** *  * 数据库锁实现
* * @author fugaoyang * */@Servicepublic class DbLockManagerImpl implements DbLockManager { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); @Autowired private DbSyncLockMapper lockMapper; @Transactional public boolean lock(String outerSerialNo, String custNo, LockSource source) { boolean isLock = false; TradeSyncLock lock = null; try { lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null == lock) { lock = new TradeSyncLock(); createLock(lock, outerSerialNo, custNo, source); int num = lockMapper.insert(lock); if (num == 1) { isLock = true; } LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); return isLock; } // 根据交易类型进行加锁 isLock = switchSynsLock(lock, source); LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); } catch (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e); } return isLock; } @Transactional public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) { try { TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null != lock) { lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(), ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid()); } LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); } catch (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e); } } /** * 匹配加锁 */ private boolean switchSynsLock(TradeSyncLock lock, LockSource source) { boolean isLock = false; switch (source) { case WITHDRAW: ; isLock = tradeSynsLock(lock); break; case WITHDRAW_RETRY: ; isLock = retrySynsLock(lock); break; default: ; } return isLock; } /** * 交易同步锁 */ private boolean tradeSynsLock(TradeSyncLock lock) { // 处理中的不加锁,即不执行交易操作 if (LockStatus.P.getName().equals(lock.getStatus())) { return false; } int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(), ThreadLogUtils.getCurrThreadUuid(), null); if (num == 1) { return true; } return false; } /** * 补偿同步锁 */ private boolean retrySynsLock(TradeSyncLock lock) { // 处理中或处理完成的不加锁,即不执行补偿操作 if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) { return false; } int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(), ThreadLogUtils.getCurrThreadUuid(), null); if (num == 1) { return true; } return false; } private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) { lock.setOuterSerialNo(outerSerialNo); lock.setCustNo(custNo); lock.setSourceCode(source.getCode()); lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid()); lock.setStatus(LockStatus.P.getName()); lock.setRemark(source.getDesc()); }}
View Code

获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

/** *  * 线程处理
* * @author fugaoyang * */public class ThreadLogUtils { private static ThreadLogUtils instance = null; private ThreadLogUtils() { setInstance(this); } // 初始化标志 private static final Object __noop = new Object(); private static ThreadLocal __flag = new InheritableThreadLocal() { @Override protected Object initialValue() { return null; } }; // 当前线程的UUID信息,主要用于打印日志; private static ThreadLocal
currLogUuid = new InheritableThreadLocal
() { @Override protected String initialValue() { return UUID.randomUUID().toString()/* .toUpperCase() */; } }; private static ThreadLocal
currThreadUuid = new ThreadLocal
() { @Override protected String initialValue() { return UUIDGenerator.getUuid(); } }; public static void clear(Boolean isNew) { if (isNew) { currLogUuid.remove(); __flag.remove(); currThreadUuid.remove(); } } public static String getCurrLogUuid() { if (!isInitialized()) { throw new IllegalStateException("TLS未初始化"); } return currLogUuid.get(); } public static String getCurrThreadUuid() { return currThreadUuid.get(); } public static void clearCurrThreadUuid() { currThreadUuid.remove(); } public static String getLogPrefix() { if (!isInitialized()) { return ""; } return "
"; } private static boolean isInitialized() { return __flag.get() != null; } /** * 初始化上下文,如果已经初始化则返回false,否则返回true
* * @return */ public static boolean initialize() { if (isInitialized()) { return false; } __flag.set(__noop); return true; } private static void setInstance(ThreadLogUtils instance) { ThreadLogUtils.instance = instance; } public static ThreadLogUtils getInstance() { return instance; }}
View Code

两种锁的实现的大致思路如下:

1.交易同步锁

当一个客户来取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

2.交易重试补偿锁

1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

后续可以优化的部分:

1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

 

因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

完整示例代码后续会更新到github。

 

转载地址:http://dnbfo.baihongyu.com/

你可能感兴趣的文章
C语言位操作控件属性
查看>>
nginx的安装及基本配置,及多个域名服务
查看>>
Servlet访问postgresql数据库并提取数据显示在前端jsp页面
查看>>
不改一行代码定位线上性能问题
查看>>
定义运算符
查看>>
git管理
查看>>
idea演示
查看>>
Android第三十天
查看>>
告别暗黄皮肤变水嫩皮肤的8个小习惯
查看>>
加强Eclipse代码自动提示的方法
查看>>
【HM】第4课:MySQL入门
查看>>
GNS3-地址重叠环境中部署IPsec
查看>>
exchange online 用户疑问之许可证和用户数据归档
查看>>
QImage Mat IplImage 之间的相互转换
查看>>
lsof命令详解
查看>>
使用eclipse与android studio 在开发自定义控件时的区别
查看>>
我的友情链接
查看>>
mysql学习笔记
查看>>
django 问题解决
查看>>
年年有鱼游戏Android源码项目
查看>>