好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java按时间梯度实现异步回调接口的方法

1. 背景

  在业务处理完之后,需要调用其他系统的接口,将相应的处理结果通知给对方,若是同步请求,假如调用的系统出现异常或是宕机等事件,会导致自身业务受到影响,事务会一直阻塞,数据库连接不够用等异常现象,可以通过异步回调来防止阻塞,但异步的情况还存在一个问题,若调用一次不成功的话接下来怎么处理?这个地方就需要按时间梯度回调,比如前期按10s间隔回调,回调3次,若不成功按30s回调,回调2次,再不成功按分钟回调,依次类推……相当于给了对方系统恢复的时间,不可能一直处于异常或宕机等异常状态,若是再不成功可以再通过人工干预的手段去处理了,具体业务具体实现。

2. 技术实现

  大体实现思路如下图,此过程用到两个队列,当前队列和next队列,当前队列用来存放第一次需要回调的数据对象,如果调用不成功则放入next队列,按照制定的时间策略再继续回调,直到成功或最终持久化后人工接入处理。

  用到的技术如下:

•http请求库,retrofit2
•队列,linkedblockingqueue
•调度线程池,scheduledexecutorservice

3. 主要代码说明

3.1 回调时间梯度的策略设计

采用枚举来对策略规则进行处理,便于代码上的维护,该枚举设计三个参数,级别、回调间隔、回调次数;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

/**

  * 回调策略

  */

public enum callbacktype {

   //等级1,10s执行3次

   seconds_10( 1 , 10 , 3 ),

   //等级2,30s执行2次

   seconds_30( 2 , 30 , 2 ),

   //等级3,60s执行2次

   minute_1( 3 , 60 , 2 ),

   //等级4,5min执行1次

   minute_5( 4 , 300 , 1 ),

   //等级5,30min执行1次

   minute_30( 5 , 30 * 60 , 1 ),

   //等级6,1h执行2次

   hour_1( 6 , 60 * 60 , 1 ),

   //等级7,3h执行2次

   hour_3( 7 , 60 * 60 * 3 , 1 ),

   //等级8,6h执行2次

   hour_6( 8 , 60 * 60 * 6 , 1 );

 

   //级别

   private int level;

   //回调间隔时间 秒

   private int intervaltime;

   //回调次数

   private int count;

}

3.2 数据传输对象设计

声明抽象父类,便于其他对象调用传输继承。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

/**

  * 消息对象父类

  */

public abstract class messageinfo {

   //开始时间

   private long starttime;

   //更新时间

   private long updatetime;

   //是否回调成功

   private boolean issuccess= false ;

   //回调次数

   private int count= 0 ;

   //回调策略

   private callbacktype callbacktype;

}

要传输的对象,继承消息父类;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

/**

  * 工单回调信息

  */

public class workordermessage extends messageinfo {

   //车架号

   private string vin;

   //工单号

   private string workorderno;

   //工单状态

   private integer status;

   //工单原因

   private string reason;

   //操作用户

   private integer userid;

}

3.3 调度线程池的使用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

//声明线程池,大小为16

private scheduledexecutorservice pool = executors.newscheduledthreadpool( 16 );

 

...略

 

while ( true ){

       //从队列获取数据,交给定时器执行

       try {

         workordermessage message = messagequeue.getmessagefromnext();

         long excuetime = message.getupdatetime()+message.getcallbacktype().getintervaltime()* 1000 ;

         long t = excuetime - system.currenttimemillis();

         if (t/ 1000 < 5 ) { //5s之内将要执行的数据提交给调度线程池

           system.out.println( "messagehandlenext-满足定时器执行条件" +jsonobject.tojsonstring(message));

           pool.schedule( new callable< boolean >() {

             @override

             public boolean call() throws exception {

               remotecallback(message);

               return true ;

             }

           }, t, timeunit.milliseconds);

         } else {

           messagequeue.putmessagetonext(message);

         }

       } catch (interruptedexception e) {

         system.out.println(e);

       }

     }

3.4 retrofit2的使用,方便好用。

具体可查看官网相关文档进行了解,用起来还是比较方便的。

retrofit初始化:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import retrofit2.retrofit;

import retrofit2.converter.gson.gsonconverterfactory;

public class retrofithelper {

   private static final string http_url = "http://baidu.com/" ;

   private static retrofit retrofit;

   public static retrofit instance(){

     if (retrofit == null ){

       retrofit = new retrofit.builder()

           .baseurl(http_url)

           .addconverterfactory(gsonconverterfactory.create())

           .build();

     }

     return retrofit;

   }

}

如果需要修改超时时间,连接时间等可以这样初始话,retrofit采用okhttpclient

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import okhttp3.okhttpclient;

import retrofit2.retrofit;

import retrofit2.converter.gson.gsonconverterfactory;

import java.util.concurrent.timeunit;

public class retrofithelper {

   private static final string http_url = "http://baidu.com/" ;

   private static retrofit retrofit;

   public static retrofit instance(){

     if (retrofit == null ){

       retrofit = new retrofit.builder()

           .baseurl(http_url)

           .client( new okhttpclient.builder()

               .connecttimeout( 30 , timeunit.seconds) //连接时间

               .readtimeout( 30 , timeunit.seconds) //读时间

               .writetimeout( 30 , timeunit.seconds) //写时间

               .build())

           .addconverterfactory(gsonconverterfactory.create())

           .build();

     }

     return retrofit;

   }

}

retrofit使用通过接口调用,要先声明一个接口;

?

1

2

3

4

5

6

7

8

9

import com.alibaba.fastjson.jsonobject;

import com.woasis.callbackdemo.bean.workordermessage;

import retrofit2.call;

import retrofit2.http.body;

import retrofit2.http.post;

public interface workordermessageinterface {

   @post ( "/api" )

   call<jsonobject> updatebatteryinfo( @body workordermessage message);

}

接口和实例对象准备好了,接下来就是调用;

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

private void remotecallback(workordermessage message){

     //实例接口对象

     workordermessageinterface workordermessageinterface = retrofithelper.instance().create(workordermessageinterface. class );

     //调用接口方法

     call<jsonobject> objectcall = workordermessageinterface.updatebatteryinfo(message);

     system.out.println( "远程调用执行:" + new date());

     //异步调用执行

     objectcall.enqueue( new callback<jsonobject>() {

       @override

       public void onresponse(call<jsonobject> call, response<jsonobject> response) {

         system.out.println( "messagehandlenext****调用成功" +thread.currentthread().getid());

         message.setsuccess( true );

         system.out.println( "messagehandlenext-回调成功" +jsonobject.tojsonstring(message));

       }

       @override

       public void onfailure(call<jsonobject> call, throwable throwable) {

         system.out.println( "messagehandlenext++++调用失败" +thread.currentthread().getid());

         //失败后再将数据放入队列

         try {

           //对回调策略初始化

           long currenttime = system.currenttimemillis();

           message.setupdatetime(currenttime);

           message.setsuccess( false );

           callbacktype callbacktype = message.getcallbacktype();

           //获取等级

           int level = callbacktype.getlevel(callbacktype);

           //获取次数

           int count = callbacktype.getcount(callbacktype);

           //如果等级已经最高,则不再回调

           if (callbacktype.hour_6.getlevel() == callbacktype.getlevel() && count == message.getcount()){

             system.out.println( "messagehandlenext-等级最高,不再回调, 线下处理:" +jsonobject.tojsonstring(message));

           } else {

             //看count是否最大,count次数最大则增加level

             if (message.getcount()<callbacktype.getcount()){

               message.setcount(message.getcount()+ 1 );

             } else { //如果不小,则增加level

               message.setcount( 1 );

               level += 1 ;

               message.setcallbacktype(callbacktype.gettypebylevel(level));

             }

             messagequeue.putmessagetonext(message);

           }

         } catch (interruptedexception e) {

           e.printstacktrace();

           system.out.println( "messagehandlenext-放入队列数据失败" );

         }

       }

     });

   }

3.5结果实现

4.总结

本次实现了按照时间梯度去相应其他系统的接口,不再导致本身业务因其他系统的异常而阻塞。

源码: https://github.com/liuzwei/callback-demo

以上所述是小编给大家介绍的java按时间梯度实现异步回调接口,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

原文链接:https://www.cnblogs.com/soinve/p/9555151.html

查看更多关于Java按时间梯度实现异步回调接口的方法的详细内容...

  阅读:46次