本篇內(nèi)容介紹了“怎么配置Spring Batch批處理失敗重試”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到長(zhǎng)沙縣網(wǎng)站設(shè)計(jì)與長(zhǎng)沙縣網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:網(wǎng)站設(shè)計(jì)制作、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名注冊(cè)、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋長(zhǎng)沙縣地區(qū)。
1. 引言
默認(rèn)情況下,Spring批處理作業(yè)在執(zhí)行過(guò)程中出現(xiàn)任何錯(cuò)誤都會(huì)失敗。然而有些時(shí)候,為了提高應(yīng)用程序的彈性,我們就需要處理這類間歇性的故障。在這篇短文中,我們就來(lái)一起探討 如何在Spring批處理框架中配置重試邏輯。
2. 簡(jiǎn)單舉例
假設(shè)有一個(gè)批處理作業(yè),它讀取一個(gè)CSV文件作為輸入:
username, userid, transaction_date, transaction_amount sammy, 1234, 31/10/2015, 10000 john, 9999, 3/12/2015, 12321
然后,它通過(guò)訪問(wèn)REST端點(diǎn)來(lái)處理每條記錄,獲取用戶的 age 和 postCode 屬性:
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> { @Override public Transaction process(Transaction transaction) throws IOException { log.info("RetryItemProcessor, attempting to process: {}", transaction); HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); //parse user's age and postCode from response and update transaction ... return transaction; } ... }最后,它生成并輸出一個(gè)合并的XML:
<transactionRecord> <transactionRecord> <amount>10000.0</amount> <transactionDate>2015-10-31 00:00:00</transactionDate> <userId>1234</userId> <username>sammy</username> <age>10</age> <postCode>430222</postCode> </transactionRecord> ... </transactionRecord>
3. ItemProcessor 中添加重試
現(xiàn)在假設(shè),如果到REST端點(diǎn)的連接由于某些網(wǎng)絡(luò)速度慢而超時(shí),該怎么辦?如果發(fā)生這種情況,則我們的批處理工作將失敗。
在這種情況下,我們希望失敗的 item 處理重試幾次。因此,接下來(lái)我將批處理作業(yè)配置為:在出現(xiàn)故障時(shí)執(zhí)行最多三次重試:
@Bean public Step retryStep( ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) throws ParseException { return stepBuilderFactory .get("retryStep") .<Transaction, Transaction>chunk(10) .reader(itemReader(inputCsv)) .processor(processor) .writer(writer) .faultTolerant() .retryLimit(3) .retry(ConnectTimeoutException.class) .retry(DeadlockLoserDataAccessException.class) .build(); }這里調(diào)用了 faultTolerant() 來(lái)啟用重試功能。另外,我們使用 retry 和 retryLimit 分別定義符合重試條件的異常和 item 的最大重試次數(shù)。
4. 測(cè)試重試次數(shù)
假設(shè)我們有一個(gè)測(cè)試場(chǎng)景,其中返回 age 和 postCode 的REST端點(diǎn)關(guān)閉了一段時(shí)間。在這個(gè)測(cè)試場(chǎng)景中,我們只對(duì)前兩個(gè) API 調(diào)用獲取一個(gè) ConnectTimeoutException ,而第三個(gè)調(diào)用將成功:
@Test public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); when(httpResponse.getEntity()) .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); //fails for first two calls and passes third time onwards when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Timeout count 1")) .thenThrow(new ConnectTimeoutException("Timeout count 2")) .thenReturn(httpResponse); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); AssertFile.assertFileEquals(expectedResult, actualResult); }在這里,我們的工作成功地完成了。另外,從日志中可以明顯看出 第一條記錄 id=1234 失敗了兩次,最后在第三次重試時(shí)成功了:
19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep] 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999 19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同樣,看下另一個(gè)測(cè)試用例,當(dāng)所有重試次數(shù)都用完時(shí)會(huì)發(fā)生什么:
@Test public void whenEndpointAlwaysFail_thenJobFails() throws Exception { when(httpClient.execute(any())) .thenThrow(new ConnectTimeoutException("Endpoint is down")); JobExecution jobExecution = jobLauncherTestUtils .launchJob(defaultJobParameters()); JobInstance actualJobInstance = jobExecution.getJobInstance(); ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); assertThat(actualJobExitStatus.getExitDescription(), containsString("org.apache.http.conn.ConnectTimeoutException")); }在這個(gè)測(cè)試用例中,在作業(yè)因 ConnectTimeoutException 而失敗之前,會(huì)嘗試對(duì)第一條記錄重試三次。
5. 使用XML配置重試
最后,讓我們看一下與上述配置等價(jià)的XML:
<batch:job id="retryBatchJob"> <batch:step id="retryStep"> <batch:tasklet> <batch:chunk reader="itemReader" writer="itemWriter" processor="retryItemProcessor" commit-interval="10" retry-limit="3"> <batch:retryable-exception-classes> <batch:include class="org.apache.http.conn.ConnectTimeoutException"/> <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> </batch:retryable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
“怎么配置Spring Batch批處理失敗重試”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
網(wǎng)頁(yè)標(biāo)題:怎么配置SpringBatch批處理失敗重試
網(wǎng)頁(yè)URL:http://www.chinadenli.net/article20/gidjco.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序、定制開發(fā)、標(biāo)簽優(yōu)化、域名注冊(cè)、網(wǎng)站收錄、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)