好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

基于Spring Batch 配置重试逻辑

Spring Batch在处理过程中遇到错误job默认会执行失败。为了提高应用程序的健壮性,我们需要处理临时异常造成失败。本文我们探讨如何配置Spring Batch的重试逻辑。

1. 应用示例

批处理应用读取csv文件

?

1

2

sammy, 1234 , 31 / 10 / 2015 , 10000

john, 9999 , 3 / 12 / 2015 , 12321

然后,通过调用rest接口处理每条记录,获取用户的年龄和邮编属性,为了正确输出日期,可以在属性上增加@XmlJavaTypeAdapter(LocalDateTimeAdapter.class)注解:

?

1

2

3

4

5

6

7

8

9

10

@XmlRootElement (name = [transactionRecord])

@Data

public class Transaction {

private String username;

private int userId;

private int age;

private String postCode;

private LocalDateTime transactionDate;

private double amount;

}

处理类如下

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {

     private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor. class );

     @Autowired

     private CloseableHttpClient closeableHttpClient;

     @Override

     public Transaction process(Transaction transaction) throws IOException, JSONException {

         LOGGER.info( "Attempting to process user with id={}" , transaction.getUserId());

         HttpResponse response = fetchMoreUserDetails(transaction.getUserId());

         //parse user's age and postCode from response and update transaction

         String result = EntityUtils.toString(response.getEntity());

         JSONObject userObject = new JSONObject(result);

         transaction.setAge(Integer.parseInt(userObject.getString( "age" )));

         transaction.setPostCode(userObject.getString( "postCode" ));

         return transaction;

     }

     private HttpResponse fetchMoreUserDetails( int id) throws IOException {

         final HttpGet request = new HttpGet( "http://HdhCmsTesttestapi测试数据:81/user/" + id);

         return closeableHttpClient.execute(request);

     }

}

这里当然也可以使用RestTemplate进行调用,调用服务仅为了测试,读者可以搭建测试接口。

最终输出结果为

?

1

2

3

4

5

6

7

8

9

10

11

< transactionRecord >

     < transactionRecord >

         < amount >10000.0</ amount >

         < transactionDate >2015-10-31 0</ transactionDate >

         < userId >1234</ userId >

         < username >sammy</ username >

         < age >10</ age >

         < postCode >430222</ postCode >

     </ transactionRecord >

     ...

</ transactionRecord >

2. 给处理增加重试功能

如果连接rest接口因为网络不稳定导致连接超时,那么批处理将失败。但这种错误并不是不能恢复,可以通过重试几次进行尝试。

因此我们配置批处理job在失败的情况下重试三次

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

@Configuration

@EnableBatchProcessing

public class SpringBatchRetryConfig {

     private static final String[] tokens = { "username" , "userid" , "transactiondate" , "amount" };

     private static final int TWO_SECONDS = 2000 ;

     @Autowired

     private JobBuilderFactory jobBuilderFactory;

     @Autowired

     private StepBuilderFactory stepBuilderFactory;

     @Value ( "input/recordRetry.csv" )

     private Resource inputCsv;

     @Value ( "file:xml/retryOutput.xml" )

     private Resource outputXml;

     public ItemReader<Transaction> itemReader(Resource inputData) throws ParseException {

         DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

         tokenizer.setNames(tokens);

         DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();

         lineMapper.setLineTokenizer(tokenizer);

         lineMapper.setFieldSetMapper( new RecordFieldSetMapper());

         FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();

         reader.setResource(inputData);

         reader.setLinesToSkip( 1 );

         reader.setLineMapper(lineMapper);

         return reader;

     }

     @Bean

     public CloseableHttpClient closeableHttpClient() {

         final RequestConfig config = RequestConfig.custom()

           .setConnectTimeout(TWO_SECONDS)

           .build();

         return HttpClientBuilder.create().setDefaultRequestConfig(config).build();

     }

     @Bean

     public ItemProcessor<Transaction, Transaction> retryItemProcessor() {

         return new RetryItemProcessor();

     }

     @Bean

     public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {

         StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();

         itemWriter.setMarshaller(marshaller);

         itemWriter.setRootTagName( "transactionRecord" );

         itemWriter.setResource(outputXml);

         return itemWriter;

     }

     @Bean

     public Marshaller marshaller() {

         Jaxb2Marshaller marshaller = new Jaxb2Marshaller();

         marshaller.setClassesToBeBound(Transaction. class );

         return marshaller;

     }

     @Bean

     public Step retryStep( @Qualifier ( "retryItemProcessor" ) ItemProcessor<Transaction, Transaction> processor,

       ItemWriter<Transaction> writer) throws ParseException {

         return stepBuilderFactory.get( "retryStep" )

           .<Transaction, Transaction>chunk( 10 )

           .reader(itemReader(inputCsv))

           .processor(processor)

           .writer(writer)

           .faultTolerant()

           .retryLimit( 3 )

           .retry(ConnectTimeoutException. class )

           .retry(DeadlockLoserDataAccessException. class )

           .build();

     }

     @Bean (name = "retryBatchJob" )

     public Job retryJob( @Qualifier ( "retryStep" ) Step retryStep) {

         return jobBuilderFactory

           .get( "retryBatchJob" )

           .start(retryStep)

           .build();

     }

这里调用faultTolerant()方法启用重试功能,并设置重试次数和对应异常。

3. 测试重试功能

我们测试场景,期望接口在一定时间内返回年龄和邮编。前两次调用API抛出异常ConnectTimeoutException

第三次成功调用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

@RunWith (SpringRunner. class )

@SpringBatchTest

@EnableAutoConfiguration

@ContextConfiguration (classes = { SpringBatchRetryConfig. class })

public class SpringBatchRetryIntegrationTest {

     private static final String TEST_OUTPUT = "xml/retryOutput.xml" ;

     private static final String EXPECTED_OUTPUT = "src/test/resources/output/batchRetry/retryOutput.xml" ;

     @Autowired

     private JobLauncherTestUtils jobLauncherTestUtils;

     @MockBean

     private CloseableHttpClient closeableHttpClient;

     @Mock

     private CloseableHttpResponse httpResponse;

     @Test

     public void whenEndpointAlwaysFail_thenJobFails() throws Exception {

         when(closeableHttpClient.execute(any()))

           .thenThrow( new ConnectTimeoutException( "Endpoint is down" ));

         JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters());

         JobInstance actualJobInstance = jobExecution.getJobInstance();

         ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

         assertThat(actualJobInstance.getJobName(), is( "retryBatchJob" ));

         assertThat(actualJobExitStatus.getExitCode(), is( "FAILED" ));

         assertThat(actualJobExitStatus.getExitDescription(), containsString( "org.apache.http.conn.ConnectTimeoutException" ));

     }

     @Test

     public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {

         FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);

         FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

         //前两次调用失败,第三次继续执行

         when(httpResponse.getEntity())

           .thenReturn( new StringEntity( "{ \"age\":10, \"postCode\":\"430222\" }" ));

         when(closeableHttpClient.execute(any()))

           .thenThrow( new ConnectTimeoutException( "Timeout count 1" ))

           .thenThrow( new ConnectTimeoutException( "Timeout count 2" ))

           .thenReturn(httpResponse);

         JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters());

         JobInstance actualJobInstance = jobExecution.getJobInstance();

         ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

         assertThat(actualJobInstance.getJobName(), is( "retryBatchJob" ));

         assertThat(actualJobExitStatus.getExitCode(), is( "COMPLETED" ));

         AssertFile.assertFileEquals(expectedResult, actualResult);

     }

     private JobParameters defaultJobParameters() {

         JobParametersBuilder paramsBuilder = new JobParametersBuilder();

         paramsBuilder.addString( "jobID" , String.valueOf(System.currentTimeMillis()));

         return paramsBuilder.toJobParameters();

     }

}

job成功执行

从日志可以看到两次失败,最终调用成功。

19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

同时也定义了另一个测试,重试多次并失败,抛出异常 ConnectTimeoutException。

4. 总结

本文我们学习如何配置Spring Batch的重试逻辑。通过示例学习配置并机型测试,仅为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/neweastsun/article/details/109264179

查看更多关于基于Spring Batch 配置重试逻辑的详细内容...

  阅读:15次