网易首页 > 网易号 > 正文 申请入驻

如何使用 SpringBoot + 事务钩子函数来打造高效支付系统

0
分享至

点击进入IT资料库

前言

经过前面对Spring AOP、事务的总结,我们已经对它们有了一个比较感性的认知了。今天,我继续安利一个独门绝技:Spring 事务的钩子函数。单纯的讲技术可能比较枯燥乏味。

接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。

因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。

整个需求的流程如下所示:


图片

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务。这是什么意思呢?首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。那么,我们的流水落地api应该要有这样的功能:


图片

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。如果不存在事务则直接异步发送消息给kafka。而且这样的判断逻辑得放在二方库内部才行。那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
// 判断当前是否存在事务
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
// 无事务,异步发送消息给kafka
executor.submit(() -> {
// 发送消息给kafka
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
return;
}

// 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事务提交后,再异步发送消息给kafka
executor.submit(() -> {
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
}
}

});

}

代码比较简单,其主要是TransactionSynchronizationManager的使用。

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:

// TransactionSynchronizationManager.java类内部的部分代码

private static final ThreadLocal > synchronizations =
new NamedThreadLocal<>( "Transaction synchronizations");

public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}

很明显,synchronizations是一个线程变量(ThreadLocal)。那它是在什么时候set进去的呢?

这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:

/**
* Activate transaction synchronization for the current thread.
* Called by a transaction manager on transaction begin.
* @throws IllegalStateException if synchronization is already active
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。

因此,结合这两个方法我们是指能解决我们最开始提出的疑问:要如何判断当前是否存在事务

3.2、如何在事务提交后触发自定义逻辑?TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:

/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
*

Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {

Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事务提交后,再异步发送消息给kafka
executor.submit(() -> {
try {
// 发送消息给kafka
} catch (Exception e) {
// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
}
});
}
}

});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”。这是什么意思呢?

是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调


图片

invoke系列的回调


图片

而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommitinvokeAfterCompletion这两个方法来选了。首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。

但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。

因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。

IT架构师/技术大咖的交流圈子,为您提供架构体系知识、技术文章、流行实践案例、解决方案等,行业大咖分享交流/同行经验分享互动,期待你的加入!扫码即可加入哦,随着材料不断增多社群会不定期涨价早加入更优惠

免责声明:

本公众号部分分享的资料来自网络收集和整理,所有文字和图片版权归属于原作者所有,且仅代表作者个人观点,与本公众号无关,文章仅供读者学习交流使用,并请自行核实相关内容,如文章内容涉及侵权,请联系后台管理员删除。

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
5150万越南盾重罚背后:中国游客在越南挥舞国旗事件,引发越南网友热烈讨论(附现场视频)

5150万越南盾重罚背后:中国游客在越南挥舞国旗事件,引发越南网友热烈讨论(附现场视频)

缅甸中文网
2024-07-01 21:04:14
火葬场烧尸人:见惯了生离死别,最喜欢年轻漂亮的女尸

火葬场烧尸人:见惯了生离死别,最喜欢年轻漂亮的女尸

吴学华看天下
2024-06-26 12:43:21
直播间号称9999元可买圆明园马首,老年人被收割?

直播间号称9999元可买圆明园马首,老年人被收割?

金羊网
2024-07-03 15:15:33
英国上将揭露当年“香港”回归真相:谁敢抗衡中国解放军?

英国上将揭露当年“香港”回归真相:谁敢抗衡中国解放军?

文史旺旺旺
2024-07-02 19:10:27
著名导演讽刺李晨:一张老脸像50岁,看你演17岁中学生想吐!

著名导演讽刺李晨:一张老脸像50岁,看你演17岁中学生想吐!

飞花文史
2024-06-28 10:19:46
库里深情告别克莱:谢谢你所做的一切 我们是一辈子的水花兄弟!

库里深情告别克莱:谢谢你所做的一切 我们是一辈子的水花兄弟!

直播吧
2024-07-03 01:33:26
骗取国家青苗补偿款、在违建上谋利!惠州两人被查

骗取国家青苗补偿款、在违建上谋利!惠州两人被查

南方都市报
2024-07-03 14:06:08
时代的红利,已经结束了

时代的红利,已经结束了

黑噪音
2024-07-02 18:00:13
黄一鸣母亲被吐槽像妖婆,网友找出她年轻时的照片,颜值很高

黄一鸣母亲被吐槽像妖婆,网友找出她年轻时的照片,颜值很高

新游戏大妹子
2024-07-03 12:28:22
陕西一男子带女儿旅游,5个月后丑恶行径曝光,被判7年6个月

陕西一男子带女儿旅游,5个月后丑恶行径曝光,被判7年6个月

一场奇遇日记
2024-06-20 21:45:28
1965年,中央派彭德怀到大西南当第三副主任,他的三位上司都是谁

1965年,中央派彭德怀到大西南当第三副主任,他的三位上司都是谁

祥瑞
2024-06-14 18:30:47
解放军战车集结,厦门街头遍布解放军战车,台军一纸命令引起恐慌

解放军战车集结,厦门街头遍布解放军战车,台军一纸命令引起恐慌

星辰故事屋
2024-07-02 20:27:33
济南机场海关在一入境旅客行李箱中查获品牌鞋60双,预估案值近15万元

济南机场海关在一入境旅客行李箱中查获品牌鞋60双,预估案值近15万元

鲁中晨报
2024-07-03 09:59:03
咸丰死后不久,46岁肃顺随即被杀身亡,慈禧:不杀他,我何以掌权

咸丰死后不久,46岁肃顺随即被杀身亡,慈禧:不杀他,我何以掌权

史笔似尘钩
2024-06-27 21:35:08
罢免于北辰连署行动超乎预期,游智彬:他激发了蓝营基层的怒火

罢免于北辰连署行动超乎预期,游智彬:他激发了蓝营基层的怒火

海峡导报社
2024-07-03 15:34:04
42岁伊万卡将公开支持父亲,但拒绝回政坛,“那是黑暗的世界”

42岁伊万卡将公开支持父亲,但拒绝回政坛,“那是黑暗的世界”

译言
2024-07-03 09:50:41
国青男篮遭遇惨案:狂输美国84分球都难运过半场 名宿执教引争议

国青男篮遭遇惨案:狂输美国84分球都难运过半场 名宿执教引争议

厝边人侃体育
2024-07-02 19:07:46
奥运会落选赛首日战况:东契奇空砍三双,西班牙45分大胜黎巴嫩!

奥运会落选赛首日战况:东契奇空砍三双,西班牙45分大胜黎巴嫩!

你的篮球频道
2024-07-03 07:01:42
证监会最新发声!进一步全面深化资本市场改革

证监会最新发声!进一步全面深化资本市场改革

中国商报
2024-07-03 14:14:31
凌晨3点排长队,10点线上开卖即售罄!年轻人又在抢什么?

凌晨3点排长队,10点线上开卖即售罄!年轻人又在抢什么?

极目新闻
2024-07-03 14:52:45
2024-07-03 16:14:44
IT架构师联盟
IT架构师联盟
IT架构实战分享
695文章数 7653关注度
往期回顾 全部

科技要闻

吴世春:"中国大模型五虎"想跑出来非常难

头条要闻

一周内三"虎"被逮捕 两周前同时被中纪委开除党籍公职

头条要闻

一周内三"虎"被逮捕 两周前同时被中纪委开除党籍公职

体育要闻

世界第二打第三,成了一场英格兰模仿秀

娱乐要闻

刘亦菲唐嫣深夜晒照,美女贴贴好养眼

财经要闻

张军:房地产是经济收缩的受害者而非原因

汽车要闻

巴黎4S店价格对比 同款车型中国售价打对折

态度原创

教育
艺术
时尚
家居
健康

教育要闻

求阴影部分面积,难哭五年级外甥女

艺术要闻

穿越时空的艺术:《马可·波罗》AI沉浸影片探索人类文明

今日热点:NCT127主打曲《WALK》;《长相思2》官宣上星......

家居要闻

温柔简约 浅色基调与明亮空间的协奏

人类为何至今无法攻克渐冻症?

无障碍浏览 进入关怀版