好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

dubbo服务注册到nacos的过程剖析

前言

前面聊到到了我们的dubbo服务从redis迁移到nacos注册中心,迁移后发现,会时不时的抛一个异常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了这个剖析过程,当然最后查明异常是我们的SLB网络映射问题,和nacos没有关系。

dubbo版本:2.7.4.1 nacos client版本:1.0.0 nacos server版本:1.1.3

简述过程

dubbo侧:dubbo通过nacos注册中心实现,注册服务到nacos,同时添加心跳任务,心跳任务每隔5s发送一次服务健康心跳。同时每隔1s查询nacos服务列表是否有更新,如果有更新触发服务实例更新通知,更新dubbo本地服务列表 nacos侧:nacos接收到心跳后,如果此时服务实例不存在,则新建一个服务实例,如果此时服务实例不健康,则设置为健康状态,并主动推送状态到客户端。nacos内部有一个检查服务状态的任务,如果15s没有健康心跳上报,则设置服务实例不健康,如果30s没有健康心跳上报,则下线这个服务实例,并推送状态到客户端。

源码剖析具体实现

在dubbo的registry包下,针对服务注册行为定义了四个接口,所有的服务注册(zookeeper、nacos、redis、etcd等)支持都是这些接口的实现

NotifyListener:服务变更通知监听的接口定义,在实现注册中心时不需关心实现,对接具体监听器往下传递这个实例就好 RegistryService:服务注册、取消注册、定义、取消订阅、服务查找的接口定义,是最核心的一个接口,包含了注册中心实现的核心功能 Registry:对RegistryService、Node的包装,多了检测服务是否可用,服务销毁下线的方法,一般直接实现Registry接口 RegistryFactory:通过注册中心URL获取注册中心实现的接口定义,dubbo的spi设计,针对每个具体实现,映射了一个注册中心协议头,如nacos实现对应了nacos:// 新对接一个注册中心,并不需要直接实现Registry接口,可直接继承FailbackRegistry抽象类,实现相关的do方法即可。dubbo针对服务注册的抽象和nacos服务注册的抽象非常契合,大部分接口可以直接对接使用,只有服务订阅监听器的定义不一样,稍微包装转换下即可,所以实现起来就非常简单了。

服务注册

org.apache.dubbo.registry.nacos.NacosRegistry:152

?

1

2

3

4

5

6

@Override

     public void doRegister(URL url) {

         final String serviceName = getServiceName(url);

         final Instance instance = createInstance(url);

         execute(namingService -> namingService.registerInstance(serviceName, instance));

     }

dubbo中,所以的服务都被封装成了URL,对应nacos中的服务实例Instance,所以服务注册时,只需要简单的将URL转换成Instance就可以注册到nacos中,下面看看namingService中的具体注册行为。

com.alibaba.nacos.client.naming.NacosNamingService:283

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

@Override

     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

         if (instance.isEphemeral()) {

             BeatInfo beatInfo = new BeatInfo();

             beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));

             beatInfo.setIp(instance.getIp());

             beatInfo.setPort(instance.getPort());

             beatInfo.setCluster(instance.getClusterName());

             beatInfo.setWeight(instance.getWeight());

             beatInfo.setMetadata(instance.getMetadata());

             beatInfo.setScheduled( false );

             beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);

         }

         serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

     }

如上代码,除了注册实例外,还判断了instance实例是否是临时实例,如果是临时实例,则加入了beatReactor的心跳列表。这是因为,nacos将服务分成了两类,一类是临时性的服务, 像dubbo、spring cloud这种,需要通过心跳来保活,如果心跳没有及时发送,服务端会自动下线这个instance。一类是永久性服务,如数据库、缓存服务等, 客户端不会也没法发送心跳,这类服务就由服务端通过TCP端口检测等方式反向探活。下面看看临时实例的心跳是怎么发送的。

com.alibaba.nacos.client.naming.NacosNamingService:104

?

1

2

3

4

5

6

7

8

private int initClientBeatThreadCount(Properties properties) {

         if (properties == null ) {

             return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;

         }

         return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),

             UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);

     }

     //可通过配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10设置维护心跳的线程数

先看一段获取心跳beatReactor线程池线程数量的初始化代码,传入的Properties是配置dubbo注册中心时的参数列表,如果配置了namingClientBeatThreadCount,则取配置的值, 默认维护心跳的线程池大小为:如果是单核的,就是一个线程,多核的就CPU核心数一半的线程。继续心跳逻辑

com.alibaba.nacos.client.naming.beat.BeatReactor:78

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

class BeatProcessor implements Runnable {

         @Override

         public void run() {

             try {

                 for (Map.Entry entry : dom2Beat.entrySet()) {

                     BeatInfo beatInfo = entry.getValue();

                     if (beatInfo.isScheduled()) {

                         continue ;

                     }

                     beatInfo.setScheduled( true );

                     executorService.schedule( new BeatTask(beatInfo), 0 , TimeUnit.MILLISECONDS);

                 }

             } catch (Exception e) {

                 NAMING_LOGGER.error( "[CLIENT-BEAT] Exception while scheduling beat." , e);

             } finally {

                 executorService.schedule( this , clientBeatInterval, TimeUnit.MILLISECONDS);

             }

         }

     }

     class BeatTask implements Runnable {

         BeatInfo beatInfo;

         public BeatTask(BeatInfo beatInfo) {

             this .beatInfo = beatInfo;

         }

         @Override

         public void run() {

             long result = serverProxy.sendBeat(beatInfo);

             beatInfo.setScheduled( false );

             if (result > 0 ) {

                 clientBeatInterval = result;

             }

         }

     }

dom2Beat是一个存放需要心跳上报的临时实例的map容器,NacosNamingService.registerInstance中通过判断临时节点添加到心跳列表的逻辑, 最终添加到了这个map里。BeatReactor初始化后会触发BeatProcessor线程的调用,BeatProcessor线程是一个不断自我触发调用的线程,前一次 心跳上报逻辑执行完后,间隔5S触发下一次心跳上报。间隔时间由变量clientBeatInterval控制,受nacos服务端返回的心跳结果值的影响 心跳间隔可能会改变,nacos服务端从instance的元数据中寻找key为preserved.heart.beat.interval的值返回,如果为空则返回5S。 这个功能在dubbo2.7.4.1的版本里还不成熟,只能通过注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能够直接在注册中心的url参数配置就算成熟了,所以这个功能暂时不推荐使用,可以作为实验功能试试。

服务订阅

org.apache.dubbo.registry.nacos.NacosRegistry:399

?

1

2

3

4

5

6

7

8

9

10

11

12

13

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)

             throws NacosException {

         if (!nacosListeners.containsKey(serviceName)) {

             EventListener eventListener = event -> {

                 if (event instanceof NamingEvent) {

                     NamingEvent e = (NamingEvent) event;

                     notifySubscriber(url, listener, e.getInstances());

                 }

             };

             namingService.subscribe(serviceName, eventListener);

             nacosListeners.put(serviceName, eventListener);

         }

     }

nacos的服务监听是EventListener,所以dubbo的服务订阅只需要将NotifyListener的处理包装进onEvent中处理即可, 通过namingService.subscribe添加nacos的订阅。最终EventListener对象会被添加到事件调度器的监听器列表中,见如下代码:

com.alibaba.nacos.client.naming.core.EventDispatcher:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

public class EventDispatcher {

     private ExecutorService executor = null ;

     private BlockingQueuechangedServices = new LinkedBlockingQueue();

     private ConcurrentMap observerMap = new ConcurrentHashMap();

     public EventDispatcher() {

         executor = Executors.newSingleThreadExecutor( new ThreadFactory() {

             @Override

             public Thread newThread(Runnable r) {

                 Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener" );

                 thread.setDaemon( true );

                 return thread;

             }

         });

         executor.execute( new Notifier());

     }

     public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

         NAMING_LOGGER.info( "[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map" );

         Listobservers = Collections.synchronizedList( new ArrayList());

         observers.add(listener);

         observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);

         if (observers != null ) {

             observers.add(listener);

         }

         serviceChanged(serviceInfo);

     }

     public void removeListener(String serviceName, String clusters, EventListener listener) {

         NAMING_LOGGER.info( "[LISTENER] removing " + serviceName + " with " + clusters + " from listener map" );

         Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));

         if (observers != null ) {

             Iteratoriter = observers.iterator();

             while (iter.hasNext()) {

                 EventListener oldListener = iter.next();

                 if (oldListener.equals(listener)) {

                     iter.remove();

                 }

             }

             if (observers.isEmpty()) {

                 observerMap.remove(ServiceInfo.getKey(serviceName, clusters));

             }

         }

     }

     public ListgetSubscribeServices() {

         ListserviceInfos = new ArrayList();

         for (String key : observerMap.keySet()) {

             serviceInfos.add(ServiceInfo.fromKey(key));

         }

         return serviceInfos;

     }

     public void serviceChanged(ServiceInfo serviceInfo) {

         if (serviceInfo == null ) {

             return ;

         }

         changedServices.add(serviceInfo);

     }

     private class Notifier implements Runnable {

         @Override

         public void run() {

             while ( true ) {

                 ServiceInfo serviceInfo = null ;

                 try {

                     serviceInfo = changedServices.poll( 5 , TimeUnit.MINUTES);

                 } catch (Exception ignore) {

                 }

                 if (serviceInfo == null ) {

                     continue ;

                 }

                 try {

                     Listlisteners = observerMap.get(serviceInfo.getKey());

                     if (!CollectionUtils.isEmpty(listeners)) {

                         for (EventListener listener : listeners) {

                             Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());

                             listener.onEvent( new NamingEvent(serviceInfo.getName(), hosts));

                         }

                     }

                 } catch (Exception e) {

                     NAMING_LOGGER.error( "[NA] notify error for service: "

                         + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);

                 }

             }

         }

     }

     public void setExecutor(ExecutorService executor) {

         ExecutorService oldExecutor = this .executor;

         this .executor = executor;

         oldExecutor.shutdown();

     }

}

EventDispatcher中维护了一个监听器列表observerMap,同时维护了一个事件变更的阻塞队列changedServices,监听调度器初始化后,会触发一个线程消费阻塞队列的 数据,当注册服务发生变化时,将变更数据入队,就能唤醒线程更新dubbo内存中的服务列表了。上面已经聊到,nacos client会以1s的频次拉取注册的实例,当拉取到的实例和本地内存的 有出入时,就会触发入队操作,如:

com.alibaba.nacos.client.naming.core.HostReactor:296

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public class UpdateTask implements Runnable {

         long lastRefTime = Long.MAX_VALUE;

         private String clusters;

         private String serviceName;

         public UpdateTask(String serviceName, String clusters) {

             this .serviceName = serviceName;

             this .clusters = clusters;

         }

         @Override

         public void run() {

             try {

                 ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                 if (serviceObj == null ) {

                     updateServiceNow(serviceName, clusters);

                     executor.schedule( this , DEFAULT_DELAY, TimeUnit.MILLISECONDS);

                     return ;

                 }

                 if (serviceObj.getLastRefTime() <= lastRefTime) {

                     updateServiceNow(serviceName, clusters);

                     serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                 } else {

                     // if serviceName already updated by push, we should not override it

                     // since the push data may be different from pull through force push

                     refreshOnly(serviceName, clusters);

                 }

                 executor.schedule( this , serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

                 lastRefTime = serviceObj.getLastRefTime();

             } catch (Throwable e) {

                 NAMING_LOGGER.warn( "[NA] failed to update serviceName: " + serviceName, e);

             }

         }

     }

DEFAULT_DELAY值为1s,同时,nacos也会主动的推送数据变更事件,当遇到nacos主动推送时,serviceInfoMap中的serviceObj会被更新,那么下次 nacos client拉取的时间间隔会被设置成10S之后,具体的和本地列表比对的逻辑都在updateServiceNow方法内,这里就不展开讲述了。

结语

dubbo注册服务到nacos以及订阅服务是一个比较复杂的过程,在剖析的过程中,带着疑问去看源码会有事半功倍的效果,比如博主在看源码前, 首先是为了寻找nacos的心跳异常,然后对nacos如何实现事件监听比较好奇。然后层层剖析渐进明朗恍然大悟。当然在剖析dubbo注册服务到nacos时,也需要了解 nacos服务端的处理逻辑,nacos服务端非常核心的两个类ClientBeatCheckTask、ClientBeatProcessor,包含了心跳处理、健康检测和事件推送的逻辑, 有兴趣可以看看

以上就是dubbo服务注册到nacos的过程剖析的详细内容,更多关于dubbo服务注册到nacos的资料请关注其它相关文章!

原文链接:http://HdhCmsTestkailing.pub/article/index/arcid/276.html

查看更多关于dubbo服务注册到nacos的过程剖析的详细内容...

  阅读:23次