配置 Spring Batch 批處理失敗重試
1. 引言
默認情況下,Spring批處理作業(yè)在執(zhí)行過程中出現(xiàn)任何錯誤都會失敗。然而有些時候,為了提高應(yīng)用程序的彈性,我們就需要處理這類間歇性的故障。在這篇短文中,我們就來一起探討 如何在Spring批處理框架中配置重試邏輯。
如果對spring batch不了解,可以參考以前的一篇文章:
開車!Spring Batch 入門級示例教程!
2. 簡單舉例
假設(shè)有一個批處理作業(yè),它讀取一個CSV文件作為輸入:
- username, userid, transaction_date, transaction_amount
- sammy, 1234, 31/10/2015, 10000
- john, 9999, 3/12/2015, 12321
然后,它通過訪問REST端點來處理每條記錄,獲取用戶的 age 和 postCode 屬性:
- public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
- @Override
- public Transaction process(Transaction transaction) throws IOException {
- log.info("RetryItemProcessor, attempting to process: {}", transaction);
- HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
- //parse user's age and postCode from response and update transaction
- ...
- return transaction;
- }
- ...
- }
最后,它生成并輸出一個合并的XML:
- <transactionRecord>
- <transactionRecord>
- <amount>10000.0</amount>
- <transactionDate>2015-10-31 00:00:00</transactionDate>
- <userId>1234</userId>
- <username>sammy</username>
- <age>10</age>
- <postCode>430222</postCode>
- </transactionRecord>
- ...
- </transactionRecord>
3. ItemProcessor 中添加重試
現(xiàn)在假設(shè),如果到REST端點的連接由于某些網(wǎng)絡(luò)速度慢而超時,該怎么辦?如果發(fā)生這種情況,則我們的批處理工作將失敗。
在這種情況下,我們希望失敗的 item 處理重試幾次。因此,接下來我將批處理作業(yè)配置為:在出現(xiàn)故障時執(zhí)行最多三次重試:
- @Bean
- public Step retryStep(
- ItemProcessor<Transaction, Transaction> processor,
- ItemWriter<Transaction> writer) throws ParseException {
- return stepBuilderFactory
- .get("retryStep")
- .<Transaction, Transaction>chunk(10)
- .reader(itemReader(inputCsv))
- .processor(processor)
- .writer(writer)
- .faultTolerant()
- .retryLimit(3)
- .retry(ConnectTimeoutException.class)
- .retry(DeadlockLoserDataAccessException.class)
- .build();
- }
這里調(diào)用了 faultTolerant() 來啟用重試功能。另外,我們使用 retry 和 retryLimit 分別定義符合重試條件的異常和 item 的最大重試次數(shù)。
4. 測試重試次數(shù)
假設(shè)我們有一個測試場景,其中返回 age 和 postCode 的REST端點關(guān)閉了一段時間。在這個測試場景中,我們只對前兩個 API 調(diào)用獲取一個 ConnectTimeoutException ,而第三個調(diào)用將成功:
- @Test
- public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
- FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
- FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);
- when(httpResponse.getEntity())
- .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
- //fails for first two calls and passes third time onwards
- when(httpClient.execute(any()))
- .thenThrow(new ConnectTimeoutException("Timeout count 1"))
- .thenThrow(new ConnectTimeoutException("Timeout count 2"))
- .thenReturn(httpResponse);
- JobExecution jobExecution = jobLauncherTestUtils
- .launchJob(defaultJobParameters());
- JobInstance actualJobInstance = jobExecution.getJobInstance();
- ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
- assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
- assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
- AssertFile.assertFileEquals(expectedResult, actualResult);
- }
在這里,我們的工作成功地完成了。另外,從日志中可以明顯看出 第一條記錄 id=1234 失敗了兩次,最后在第三次重試時成功了:
- 19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
- 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
- 19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同樣,看下另一個測試用例,當(dāng)所有重試次數(shù)都用完時會發(fā)生什么:
- @Test
- public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
- when(httpClient.execute(any()))
- .thenThrow(new ConnectTimeoutException("Endpoint is down"));
- JobExecution jobExecution = jobLauncherTestUtils
- .launchJob(defaultJobParameters());
- JobInstance actualJobInstance = jobExecution.getJobInstance();
- ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
- assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
- assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
- assertThat(actualJobExitStatus.getExitDescription(),
- containsString("org.apache.http.conn.ConnectTimeoutException"));
- }
在這個測試用例中,在作業(yè)因 ConnectTimeoutException 而失敗之前,會嘗試對第一條記錄重試三次。
5. 使用XML配置重試
最后,讓我們看一下與上述配置等價的XML:
- <batch:job id="retryBatchJob">
- <batch:step id="retryStep">
- <batch:tasklet>
- <batch:chunk reader="itemReader" writer="itemWriter"
- processor="retryItemProcessor" commit-interval="10"
- retry-limit="3">
- <batch:retryable-exception-classes>
- <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
- <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
- </batch:retryable-exception-classes>
- </batch:chunk>
- </batch:tasklet>
- </batch:step>
- </batch:job>
6. 簡單總結(jié)
在本文中,我們學(xué)習(xí)了如何在Spring批處理中配置重試邏輯,其中包括使用Java和XML配置。以及使用單元測試來觀察重試在實踐中是如何工作的。
本文轉(zhuǎn)載自微信公眾號「鍋外的大佬」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系鍋外的大佬公眾號。