蘇寧如何解決事務(wù)與非事務(wù)的數(shù)據(jù)一致性問題
原創(chuàng)【51CTO.com原創(chuàng)稿件】
1、業(yè)務(wù)場(chǎng)景
作為擁有線上線下大數(shù)據(jù)的智慧零售平臺(tái),蘇寧的系統(tǒng)對(duì)于并發(fā)和高效要求非常高。針對(duì)各種苛刻的場(chǎng)景,蘇寧都有相應(yīng)的解決方案。
蘇寧的售后訂單系統(tǒng)每天要處理大量訂單的創(chuàng)建,修改以及數(shù)據(jù)分發(fā)的操作。
為了保證高效,我們的數(shù)據(jù)經(jīng)過分庫(kù)分表存儲(chǔ)于數(shù)據(jù)庫(kù)集群中,同時(shí)根據(jù)一定的算法將部分活躍訂單緩存在Redis,保證訂單處理的效率。分發(fā)數(shù)據(jù)時(shí),我們通過蘇寧自研的MQ消息平臺(tái) (WindQ)向下游系統(tǒng)分發(fā)消息,處理效率上可以做到準(zhǔn)實(shí)時(shí),即消息能夠及時(shí)被下游接收,并立即通過反饋接口反饋回來,避免實(shí)時(shí)接口調(diào)用時(shí)可能發(fā)生因網(wǎng)絡(luò),下游處理效率問題帶來的阻塞。
基于以上的背景,我們遇到了這樣的場(chǎng)景:
1. 在創(chuàng)建訂單的時(shí)候,我們要保存訂單到數(shù)據(jù)庫(kù)和緩存。
2. 同時(shí)要將下發(fā)下游的消息保存到數(shù)據(jù)庫(kù),并通過WindQ發(fā)往下游系統(tǒng)。
3. 下游系統(tǒng)返回接收數(shù)據(jù)的結(jié)果后,需要根據(jù)返回的結(jié)果,對(duì)保存到數(shù)據(jù)庫(kù)的數(shù)據(jù)進(jìn)行刪除操作。
圖1:業(yè)務(wù)模型
雖然邏輯簡(jiǎn)單,但由于數(shù)據(jù)庫(kù)操作是在一個(gè)事務(wù)中,而Redis和發(fā)消息隊(duì)列的操作則并非能靠事務(wù)控制。
- 如果緩存成功,但是事務(wù)失敗,則可能導(dǎo)致我們?cè)谙到y(tǒng)有了一份異常的訂單緩存。而實(shí)際上這個(gè)訂單并不存在。
- 另外當(dāng)消息發(fā)送到下游了以后,如果下游處理速度非常快,處理結(jié)果立刻返回,處理返回結(jié)果的程序要去刪除一條已經(jīng)發(fā)送成功數(shù)據(jù)。此時(shí),有可能本地的事務(wù)尚未提交,那么,刪除操作就做了無用功。***當(dāng)事務(wù)提交以后,那條應(yīng)該刪掉的數(shù)據(jù),就會(huì)一直在待處理表中,變成異常數(shù)據(jù)。
圖2:異常場(chǎng)景
說明:數(shù)據(jù)通過消息隊(duì)列發(fā)送下游系統(tǒng),同時(shí)保存一份到數(shù)據(jù)庫(kù)是為了保證發(fā)送隊(duì)列異?;蛘呦掠卧诮邮?、處理時(shí)發(fā)生異常的情況,可以通過數(shù)據(jù)庫(kù)保存的數(shù)據(jù)進(jìn)行補(bǔ)償處理的一種機(jī)制,當(dāng)下游系統(tǒng)反饋數(shù)據(jù)接收正常后,將該數(shù)據(jù)刪除。最終保證上下游數(shù)據(jù)一致。
業(yè)務(wù)場(chǎng)景模擬
系統(tǒng)由Business對(duì)外提供服務(wù),在此過程中通過OrdeSaver和MessageSender執(zhí)行具體數(shù)據(jù)處理功能。
- Business為業(yè)務(wù)的入口,所有的業(yè)務(wù)邏輯由此開始。
- OrderSaver執(zhí)行的業(yè)務(wù)是保存訂單以及緩存訂單到Redis。
- MessageSender執(zhí)行的業(yè)務(wù)是保存下發(fā)數(shù)據(jù)到下發(fā)表中,并將數(shù)據(jù)發(fā)送到WindQ消息隊(duì)列中。
Business
- /**
- * 完整的業(yè)務(wù),有多個(gè)數(shù)據(jù)庫(kù)操作,以及數(shù)據(jù)庫(kù)以外的需要延遲執(zhí)行的業(yè)務(wù)邏輯
- */
- public class Business {
- public void saveInfo(Map<String, Object> map){
- System.out.println("業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始" );
- new OrderSaver().saveOrderInfo(map);
- new MessageSender().saveMessageForSend(map);
- System.out.println("業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束");
- }
- }
OrderSaver
- /**
- * 保存服務(wù)單的業(yè)務(wù)邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("Save order info to datebase");
- System.out.println("Cache order info into redis");
- }
- }
MessageSender
- /**
- * 下發(fā)數(shù)據(jù)的業(yè)務(wù)邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){
- //保存數(shù)據(jù)到數(shù)據(jù)庫(kù)
- System.out.println("Save create message to datebase.");
- System.out.println("Send message to windq");
- }
- }
業(yè)務(wù)接口調(diào)用
- public class Sample {
- //此處模擬業(yè)務(wù)接口被調(diào)用的情況
- public static void main(String[] args) {
- Business business = new Business();
- Map<String,Object> param = new HashMap<>();
- business.saveInfo(param);
- }
- }
輸出結(jié)果
業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始
Save order info to datebase
Cache order info into redis
Save create message to datebase.
Send message to windq
業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束
圖3:常規(guī)輸出
以上場(chǎng)景模擬了常規(guī)情況下,我們的業(yè)務(wù)處理流程,此時(shí)也就會(huì)相應(yīng)地出現(xiàn)我們上面所描述地異常。而我們所期望的處理結(jié)果應(yīng)該是:
業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始
Save order info to datebase
Save create message to datebase.
業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束
Cache order info into redis
Send message to windq
圖4:期望的結(jié)果
2、解決方案
該問題如何處理呢,這個(gè)時(shí)候我們就該將緩存到Redis以及下發(fā)WindQ的操作延遲到事務(wù)提交后處理。這樣在事務(wù)沒有提交前Resdis中不會(huì)有數(shù)據(jù),WindQ也不會(huì)將數(shù)據(jù)下發(fā)。假如事務(wù)失敗,后續(xù)也可以根據(jù)異常進(jìn)行后續(xù)處理。即便事務(wù)成功緩存Redis或者發(fā)送WindQ出錯(cuò),也可以根據(jù)存入數(shù)據(jù)庫(kù)的數(shù)據(jù)進(jìn)行后續(xù)的補(bǔ)償處理。
2.1 處理方案一:代碼轉(zhuǎn)移
我們首先會(huì)想到通過代碼轉(zhuǎn)移的方式,將邏輯代碼移動(dòng)到事務(wù)外面。但這個(gè)時(shí)候問題來了......
2.1.1 問題一:邏輯割裂
我們?yōu)榱嗣枋鰳I(yè)務(wù)模型,將現(xiàn)實(shí)場(chǎng)景盡量簡(jiǎn)化,從模型上看,是可以將兩段非事務(wù)方法挪到事務(wù)外面來操作。但是,由于保存訂單和緩存Redis是一套操作,其使用的數(shù)據(jù)是一致的,保存下發(fā)的消息和發(fā)送WindQ也是一對(duì)呼應(yīng)的操作。代碼是寫在一起的,符合邏輯和業(yè)務(wù)要求有改動(dòng)時(shí)也能夠一并處理,拆開的話,對(duì)統(tǒng)一數(shù)據(jù)的處理上給人的感覺就不是一氣呵成了。
某些業(yè)務(wù)中并非只有一兩個(gè)成對(duì)的操作,將多個(gè)成對(duì)操作的事務(wù)-非事務(wù)關(guān)聯(lián)型邏輯強(qiáng)行拆開,顯得規(guī)模浩大,這種方式變得不可取。
另外原先的處理方案中,保存數(shù)據(jù)庫(kù)、緩存以及發(fā)送WindQ處理的數(shù)據(jù)是一致的,而拆開寫的話,就會(huì)導(dǎo)致數(shù)據(jù)要從前傳到后。要保證數(shù)據(jù)能從里面?zhèn)鞯酵饷嬉矊⒊蔀橐粋€(gè)問題。
2.2 處理方案二:延遲執(zhí)行的模型
為了解決***方案的問題,我們做了以下的設(shè)計(jì)。
數(shù)據(jù)要往數(shù)據(jù)庫(kù)中存的時(shí)候,我們可以先把要處理的數(shù)據(jù)和要做的動(dòng)作先定義好,放到一個(gè)容器中去,在事務(wù)提交后,我們?cè)倌玫竭@個(gè)容器,統(tǒng)一將之前定義好的操作和數(shù)據(jù)取出來,按要求執(zhí)行。
具體怎么做呢?
經(jīng)過一番思考,我們構(gòu)建出了一個(gè)模型
ExecutorHandler - Executor
- Executor 可執(zhí)行對(duì)象,用于定義一個(gè)需要執(zhí)行的邏輯。比如將數(shù)據(jù)通過WindQ發(fā)送,或者將訂單刷入緩存。
- ExecutorHandler容器類,內(nèi)部保存了一個(gè)Executor的列表。
代碼邏輯
- 在業(yè)務(wù)代碼中,我們將需要執(zhí)行的業(yè)務(wù)操作封裝到Executor中。
- 定義好以后,通過ExecutorHandler的add方法,添加到容器中去。
- 在業(yè)務(wù)邏輯執(zhí)行的過程中,先進(jìn)行數(shù)據(jù)庫(kù)操作,而非數(shù)據(jù)庫(kù)操作只是在對(duì)應(yīng)的位置進(jìn)行定義,在整個(gè)事務(wù)完成以后,通過ExecutorHandler的handle方法,遍歷所有的Executor對(duì)象,執(zhí)行需要延遲的非事務(wù)操作。
圖5:業(yè)務(wù)模型
Executor
- public interface Executor {
- void execute();
- }
ExecutorHandler
- public class ExecutorHandler {
- //需要執(zhí)行的業(yè)務(wù)處理對(duì)象列表
- private List<Executor> executors;
- public void handle(){
- if(!(null == executors || executors.isEmpty())){
- for(Executor executor : executors){
- executor.execute();
- }
- }
- }
- public void add(Executor executor){
- if(null == executors){
- executors = new ArrayList<>();
- }
- executors.add(executor);
- }
- }
業(yè)務(wù)接口調(diào)用
- public class Sample {
- //此處模擬業(yè)務(wù)接口被調(diào)用的情況
- public static void main(String[] args) {
- Business business = new Business();
- ExecutorHandler handler = new ExecutorHandler();
- Map<String, Object> param = new HashMap<String, Object>();
- //執(zhí)行業(yè)務(wù)方法,開啟事務(wù),保存數(shù)據(jù)
- business.saveInfo(param, handler);
- //執(zhí)行延遲執(zhí)行的方法
- handler.handle();
- }
- }
輸出結(jié)果
業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始
Save order info to datebase
Save create message to datebase.
業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束
Cache order info into redis
Send message to windq
Business
- /**
- * 完整的業(yè)務(wù),有多個(gè)數(shù)據(jù)庫(kù)操作,以及數(shù)據(jù)庫(kù)以外的需要延遲執(zhí)行的業(yè)務(wù)邏輯
- */
- public class Business {
- public void saveInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始" );
- new OrderSaver().saveOrderInfo(map,executorHandler);
- new MessageSender().saveMessageForSend(map,executorHandler);
- System.out.println("業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束");
- }
- }
MessageSender
- /**
- * 下發(fā)數(shù)據(jù)的業(yè)務(wù)邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(Map<String, Object> map,ExecutorHandler executorHandler){
- //保存數(shù)據(jù)到數(shù)據(jù)庫(kù)
- System.out.println("Save create message to datebase.");
- //將要延遲執(zhí)行的業(yè)務(wù)邏輯定義好,注冊(cè)到容器中去
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Send message to windq");
- }
- });
- }
- }
OrderSaver
- /**
- * 保存服務(wù)單的業(yè)務(wù)邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(Map<String, Object> map,ExecutorHandler executorHandler){
- System.out.println("Save order info to datebase");
- //這就是所謂的回調(diào)函數(shù)
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Cache order info into redis");
- }
- });
- }
- }
2.2.1 問題二:參數(shù)傳遞
以上的方案,解決了延遲執(zhí)行的問題,但是,此刻我們發(fā)現(xiàn),由于要使用ExecutorHandler,這個(gè)時(shí)候就需要隨時(shí)隨地將該對(duì)象傳遞下去,要考慮如何降低該對(duì)象的侵略性。
- 靜態(tài)變量:在使用中需要考慮同步和清理的問題,很容易在多線程的情況下使得邏輯變得混亂,不采用。
- 成員變量:同樣也存在著數(shù)據(jù)清理的問題,不推薦使用,不采用。
2.2.2 問題二處理方案:使用ThreadLocal參數(shù)傳遞
有沒有生命周期是整個(gè)線程內(nèi)的呢?這時(shí)我們就需要用到ThreadLocal了。
通過ThreadLocal來獲取ExecutorHandler 可以作為有效的解決方案。
由于ThreadLocal對(duì)象最終在使用完的時(shí)候需要remove掉,因此,該方法需要集中統(tǒng)一調(diào)用。
實(shí)現(xiàn)時(shí),我們定義了HandlerThreadLocal類。
HandlerThreadLocal對(duì)象負(fù)責(zé)通過 ThreadLocal的get方法來獲取線程本地的ExecutorHandler對(duì)象,并執(zhí)行其 handle方法(具體實(shí)現(xiàn)可以參照后面的代碼)。
執(zhí)行完業(yè)務(wù)操作以后,通過調(diào)用remove方法將其銷毀。
2.2.3 異常的捕捉和處理DelayedCallHandler
由于ThreadLocal的remove方法是一定需要被執(zhí)行,因此該方法應(yīng)該放在一個(gè)try- catch - finally 塊的finally段中,保證其不被遺漏。
- DelayedCallHandler通過handle()方法調(diào)用業(yè)務(wù)邏輯。
- 在調(diào)用完業(yè)務(wù)邏輯后,調(diào)用ExecutorHandler的handle()方法,執(zhí)行已經(jīng)注冊(cè)到延遲調(diào)用容器中的業(yè)務(wù)方法。
- ***在finally中將ThreadLocal 對(duì)象remove掉。
整個(gè)DelayedCallHandler的handle方法就是一個(gè)完整的try- catch - finally 塊。
2.2.4 標(biāo)準(zhǔn)化定義:DelayablelService需要延遲調(diào)用的業(yè)務(wù)類
由于DelayedCallHandler已經(jīng)模塊化,業(yè)務(wù)方法***也定義成一個(gè)具體的方法名(doBusiness),所有的業(yè)務(wù)處理類,實(shí)現(xiàn)DelayedCallHandler接口,在doBusiness方法中調(diào)用有事務(wù)的業(yè)務(wù)邏輯。
3、最終實(shí)現(xiàn)方案
基于處理方案二的分析,***我們使用ThreadLocal來傳遞業(yè)務(wù)數(shù)據(jù)。
我們通過ThreadLocal
在業(yè)務(wù)邏輯MessageSender 、OrderSaver中通過executorHandler將需要延遲執(zhí)行的業(yè)務(wù)定義好。
在HandlerThreadLocal中,使用 executorHandler處理之前定義好的邏輯。
這樣做將事務(wù)和非事務(wù)分開,不再以方法參數(shù)的方式向下游傳遞數(shù)據(jù),使得數(shù)據(jù)傳遞得以結(jié)構(gòu),處理起來更加優(yōu)雅。
示例代碼如下。
業(yè)務(wù)接口調(diào)用
- public static void main(String[] args) {
- Business b = new Business();
- Map<String, Object> map = new HashMap<String, Object>();
- DelayedCallHandler<Map<String, Object>> bu = new DelayedCallHandler<Map<String, Object>>();
- bu.handle(b,map);
- }
輸出結(jié)果:
業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始
Save order info to datebase
Save create message to datebase.
業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束
Cache order info into redis
Send message to windq
HandlerThreadLocal
- public class HandlerThreadLocal {
- public static final ThreadLocal<ExecutorHandler> executorHandler = new ThreadLocal<ExecutorHandler>(){
- protected ExecutorHandler initialValue(){
- return new ExecutorHandler();
- }
- };
- public static final void handle(){
- executorHandler.get().handle();
- }
- public static final void remove(){
- executorHandler.remove();
- }
- }
DelayedCallHandler
- public class DelayedCallHandler<T> {
- public void handle(DelayablelService<T> buisnes,T param){
- try {
- //先執(zhí)行業(yè)務(wù)操作
- buisnes.doBusiness(param);
- //執(zhí)行延遲執(zhí)行的業(yè)務(wù)
- HandlerThreadLocal.handle();
- } catch (Exception e) {
- //處理異常
- }finally {
- HandlerThreadLocal.remove();
- }
- }
- }
DelayablelService1.
- public interface DelayablelService {
- public void doBusiness(T param);
- }
Business
- /**
- * 完整的業(yè)務(wù),有多個(gè)數(shù)據(jù)庫(kù)操作,以及數(shù)據(jù)庫(kù)以外的需要延遲執(zhí)行的業(yè)務(wù)邏輯
- */
- public class Business implements DelayablelService< Map<String, Object> > {
- @Override
- public void doBusiness(Map<String, Object> map){
- saveInfo(map);
- }
- public void saveInfo(Map<String, Object> map){
- System.out.println("業(yè)務(wù)開始 事務(wù)開啟 保存數(shù)據(jù)操作開始" );
- new OrderSaver().saveOrderInfo();
- new MessageSender().saveMessageForSend();
- System.out.println("業(yè)務(wù)結(jié)束 事務(wù)提交 保存數(shù)據(jù)操作結(jié)束");
- }
- }
MessageSender
- /**
- * 下發(fā)數(shù)據(jù)的業(yè)務(wù)邏輯
- */
- public class MessageSender {
- public void saveMessageForSend(){
- ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();
- System.out.println("Save create message to datebase.");
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Send message to windq");
- }
- });
- }
- }
OrderSaver
- /**
- * 保存服務(wù)單的業(yè)務(wù)邏輯
- */
- public class OrderSaver {
- public void saveOrderInfo(){
- System.out.println("Save order info to datebase");
- ExecutorHandler executorHandler = HandlerThreadLocal.executorHandler.get();
- //這就是所謂的回調(diào)函數(shù)
- executorHandler.add(new Executor() {
- @Override
- public void execute() {
- System.out.println("Cache order info into redis");
- }
- });
- }
- }
4、總結(jié)
使用延遲執(zhí)行的模型,解決了在一個(gè)業(yè)務(wù)邏輯中既有數(shù)據(jù)庫(kù)事務(wù)的操作,又有相關(guān)的非事務(wù)操作時(shí),事務(wù)失敗或未提交而非事務(wù)操作成功導(dǎo)致的數(shù)據(jù)不一致問題。
文中提到的邏輯割裂和參數(shù)傳遞的問題,都是在比較復(fù)雜的場(chǎng)景下才有的。蘇寧售后訂單業(yè)務(wù)中此類邏輯常有出現(xiàn),因此我們就這些問題進(jìn)行了分析、討論,得出這樣的解決方案。并非所有的系統(tǒng)和業(yè)務(wù)都需要這樣。任何解決方案都要因情況而定,避免畫蛇添足。
在使用該模型時(shí),有使用到匿名內(nèi)部類和線程局部變量(ThreadLocal),在使用時(shí),有一定的注意事項(xiàng),ThreadLocal在使用結(jié)束后要通過其remove()方法移除,使用時(shí)需要留意。
作者:
王海勇,蘇寧科技集團(tuán)蘇寧云軟件公司售后研發(fā)中心技術(shù)經(jīng)理。從事Java開發(fā)多年,擅長(zhǎng)業(yè)務(wù)抽象及業(yè)務(wù)架構(gòu)設(shè)計(jì),2016年9月加入蘇寧,參與售后服務(wù)域訂單平臺(tái)、時(shí)效平臺(tái)等系統(tǒng)平臺(tái)的研發(fā)工作。在蘇寧巨大業(yè)務(wù)量的場(chǎng)景下,保證系統(tǒng)穩(wěn)定、安全、高效地提供服務(wù)。
【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】