這重試器寫的真地地地地地地道,你覺得地道嗎?
服務(wù)總是不穩(wěn)定的,有的時候需要編寫重試邏輯,比如,HTTP的重試;定時任務(wù)的重試等。
一、簡單的實現(xiàn)
我們可以使用while
或for
循環(huán),配置try-catch
和break
組合,完成循環(huán)邏輯。
final Random random = new Random();
final int maxRetryCount = 10;
int times = 0;
while (true) {
times++;
if (times > maxRetryCount) {
break;
}
try {
// 業(yè)務(wù)邏輯
System.out.println("最大重試" + maxRetryCount + "次,當前是第" + times + "次");
if (random.nextInt(10) > 5) {
throw new RuntimeException("隨機數(shù)失敗");
}
if (random.nextInt(10) / 2 == 0) {
System.out.println("邏輯執(zhí)行成功");
break;
}
Thread.sleep(1000);
// 業(yè)務(wù)邏輯
} catch (Exception e) {
System.out.println("進入異常捕獲");
}
}
System.out.println("業(yè)務(wù)邏輯執(zhí)行完畢");
其中一次執(zhí)行結(jié)果:
最大重試10次,當前是第1次 進入異常捕獲 最大重試10次,當前是第2次 進入異常捕獲 最大重試10次,當前是第3次 最大重試10次,當前是第4次 進入異常捕獲 最大重試10次,當前是第5次 最大重試10次,當前是第6次 進入異常捕獲 最大重試10次,當前是第7次 最大重試10次,當前是第8次 邏輯執(zhí)行成功 業(yè)務(wù)邏輯執(zhí)行完畢
上面這種實現(xiàn)算是重試邏輯的模板化代碼,大差不差的都是這種寫法。
我們再看看其他的寫法。
二、重試裝飾器的實現(xiàn)
本節(jié)我們使用裝飾器模式實現(xiàn),借助經(jīng)典的面向?qū)ο缶幊田L格(通過類和接口)。同時,我們選擇更簡潔的函數(shù)式方法。
首先,我們將聲明一個函數(shù),接收Supplier<T>
和最大調(diào)用次數(shù)作為參數(shù)。
然后,還是使用while
循環(huán)和try-catch
塊多次調(diào)用該函數(shù)。
最后,我們將通過返回另一個Supplier<T>
來保留原始數(shù)據(jù)類型。
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
return () -> {
int retries = 0;
while (retries < maxRetries) {
try {
return supplier.get();
} catch (Exception e) {
retries++;
}
}
throw new IllegalStateException(String.format("任務(wù)在 %s 次嘗試后失敗", maxRetries));
};
}
有了上面的函數(shù),我們可以基于這個函數(shù)裝飾器繼續(xù)創(chuàng)建CompletableFuture
:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
return CompletableFuture.supplyAsync(retryableSupplier);
}
模擬下業(yè)務(wù)場景,有個標志位數(shù)字,我們需要檢查這個標志位是否大于4,如果大于就結(jié)束任務(wù),如果小于4,重試。
final AtomicInteger retriesCounter = new AtomicInteger(0);
Supplier<Integer> codeToRun = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
然后我們借助retryTask()
函數(shù)完成重試邏輯,假設(shè)最大重試10次,根據(jù)上面的定義,正常會在第四次跳出:
CompletableFuture<Integer> result = retryTask(codeToRun, 10);
結(jié)果會打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 Retrying: 3 Retrying: 4 100 4
如果重試次數(shù)小于4,就會觸發(fā)IllegalStateException
異常:
try {
result = retryTask(codeToRun, 3);
System.out.println(result.get());
} catch (Exception e) {
System.out.println("超過最大重試次數(shù)");
}
結(jié)果會打?。?/span>
Retrying: 0 Retrying: 1 Retrying: 2 超過最大重試次數(shù)
三、重試CompletableFuture
CompletableFuture
提供了內(nèi)部邏輯出現(xiàn)異常時處理的方法,比如exceptionally()
等方法,我們可以直接使用這些方法,不需要自定義裝飾器。
CompletableFuture
的使用可以參考由淺入深掌握CompletableFuture的七種用法。
(一)不安全重試
exceptionally()
方法允許指定一個替代函數(shù),當主要邏輯出現(xiàn)異常時,會調(diào)用指定的替代函數(shù)。
如果我們打算重試兩次,我們可以這樣寫:
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier)
.exceptionally(__ -> supplier.get())
.exceptionally(__ -> supplier.get());
}
如果重試次數(shù)可變,我們可以使用for
循環(huán):
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
上面的寫法可以滿足需求,但是有一點需要注意,當Supplier
運行比較快,在CompletableFuture
和exceptionally()
回退創(chuàng)建之前執(zhí)行完畢,那exceptionally()
的方法就會在主線程執(zhí)行。比如,我們設(shè)定retryUnsafe
休眠1000ms,指定codeToRun
不做sleep:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
codeToRun = () -> {
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
retryUnsafe(codeToRun, 3);
運行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1 Retrying: 1; thread:main Retrying: 2; thread:main Retrying: 3; thread:main
符合預(yù)期,后續(xù)調(diào)用是由主線程執(zhí)行的。如果初始調(diào)用很快,但后續(xù)調(diào)用預(yù)計會更慢,這可能會成為問題。
(二)異步重試
如果是在Java12之后,我們可以通過exceptionallyAsync()
方法實現(xiàn),將所有重試都異步執(zhí)行。
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyAsync(__ -> supplier.get());
}
return cf;
}
這個時候運行結(jié)果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1
Retrying: 1; thread:ForkJoinPool.commonPool-worker-1
Retrying: 2; thread:ForkJoinPool.commonPool-worker-1
Retrying: 3; thread:ForkJoinPool.commonPool-worker-1
(三)嵌套CompletableFutures
如果是在Java12之前,我們需要實現(xiàn)兼容方案,那我們可以寫一個增加邏輯,實現(xiàn)異步化,我們可以這樣寫:
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries)
throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
.thenCompose(Function.identity());
}
return cf;
}
其實簡單或就是有創(chuàng)建了一個CompletableFuture
,運行結(jié)果也是符合預(yù)期的。
四、總結(jié)
在本文中,我們探討了在CompletableFuture中重試函數(shù)調(diào)用的概念。我們首先深入研究了以函數(shù)式風格實現(xiàn)裝飾器模式,使我們能夠重試函數(shù)本身。
隨后,我們利用CompletableFuture API完成相同的任務(wù),同時保持異步流程。我們發(fā)現(xiàn)了Java 12中引入的exceptionallyAsync()
方法,它非常適合這個目的。最后,我們提出了一種替代方法,僅依賴于原始Java 8 API中的方法。
文末總結(jié)
本文介紹了使用實現(xiàn)重試邏輯的三種方式:簡單模式、裝飾器模式、和利用CompletableFuture
原生API的實現(xiàn)。