服务注册

目录

1.客户端

入口

NamingExample来看

反射初始化NacosNamingService

服务注册方法

NacosNamingService.registerInstance

1.心跳参数校验 2.最终的服务名格式:serviceName@@groupName 3.如果是临时实例则会开启心跳包 4.服务注册 (参数组装调用API请求注册)

调用API请求注册NamingProxy.reqApi

1.单机注册中心,失败重试(默认3次,前提是nacos异常) 2.集群注册中心,随机挑选一个注册,失败则轮询其他注册中心 3.最终调用callServer方法 (API_URL: IP:PORT/nacos/v1/ns/instance

callServer方法如下:

2.服务端

1.注册表

先了解一下服务端保存实例信息的结构(下面我们简称叫注册表):

难理解的就是为什么一个服务会有多个集群,不应该一个服务就一个集群吗?

这里可以理解为按机房划分集群,不管有多少个集群都属于你该服务的。比如上海机房有一个SH 集群,深圳机房有一个SZ集群,请求的时候你可以按地区请求最近的集群实例,如果整个地区集群都不可用那么可以请求其他地区的集群实例。

2.注册接口信息

注册接口:/nacos/v1/ns/instance

请求参数:

3.注册方法

InstanceController.register

4.注册流程(以临时实例为例)

ServiceManager.registerInstance

1.创建一个空的service放入注册表,为其 开启一个心跳检测,并将这个service加入监听列表

2.拿到创建好的service

3.完成实例的注册表更新,并完成nacos集群同步

5.创建空的service加入注册表

让我们看看是怎么创建空的service的

ServiceManager.createEmptyService

ServiceManager.putServiceAndInit

6.添加实例

ServiceManager.addInstance

里面最重要的就是consistencyService.put(key,instances) 方法

consistencyService有很多种实现,根据实例的类型来判断具体走哪种实现方式,这里我们以临时实例为例,主要看看DistroConsistencyServiceImpl

DistroConsistencyServiceImpl.put 临时实例的注册方法

7.临时实例的添加

onPut方法:

1.会将任务放入Notifier内部的阻塞队列中,Notifier是个Runnable(异步执行任务)

2.最后会回到Service.onChange方法更新实例,内部调用updateIPs方法,这里面需要注意更新后会触发一个服务变更事件(后面有用)

Service.updateIPs

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        // 准备一个Map,key是cluster,值是集群下的Instance集合
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        // 获取服务的所有cluster名称
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                // 判断实例是否包含clusterName,没有的话用默认cluster
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                // 判断cluster是否存在,不存在则创建新的cluster
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                // 获取当前cluster实例的集合,不存在则创建新的
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                // 添加新的实例到 Instance 集合
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
        
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            // 将实例集合更新到 clusterMap(注册表)
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
        
        setLastModifiedMillis(System.currentTimeMillis());
        //触发服务变更事件
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();
        
        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }
        
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());
        
    }

8.临时实例的集群同步

distroProtocol.sync()临时实例集群同步

  1. 遍历集群中其他节点

  2. 定义一个DistroDelayTask异步任务放入一个ConcurrentHashMap中,会有一个ScheduledExecutorService线程池定时从这个map中取任务执行

线程池的定义在NacosDelayTaskExecuteEngine中:

上诉线程池执行的任务就是NacosDelayTaskExecuteEngine.processTasks()如下:

protected void processTasks() {
        // 获取任务map中所有的key
        Collection<Object> keys = getAllTaskKeys();
        //遍历key 并执行任务
        for (Object taskKey : keys) {
             // 取一个任务便从map中移除一个任务
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                // 尝试执行同步任务,如果失败会重试
                if (!processor.process(task)) {
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                // 如果失败会重试
                retryFailedTask(taskKey, task);
            }
        }
    }

DistroDelayTaskProcessor.process:

任务的执行被放入到process方法中,并被封装成DistroSyncChangeTask异步任务,又被塞到一个不知名封装好的地方(是一个阻塞队列,同样有地方取出来执行,我们直接看这个任务的执行)

DistroSyncChangeTask.run

1.syncData方法最终会到NamingProxy.syncData方法,执行HTTP请求,同步数据

2.如果失败了,则又会调用NacosDelayTaskExecuteEngine.addTask()方法重新将DistroDelayTask任务放进ConcurrentHashMap中,重复上述的processTasks方法

3.总结

  • 客户端:启动则获取自身配置信息,发起http请求注册,临时实例同时会开启心跳机制(下面会说),服务端是单机的情况下请求失败会重试三次,服务端是单机的集群的情况下请求失败会轮询请求
  • 服务端
    1. 本地通过一个Map保存所有服务信息,注册的实质就是往map里面添加信息
    2. 会先创建空的服务,后更新服务中的实例信息
    3. 服务创建后会初始化服务,启动心跳检测
    4. 往服务中添加实例的时候会判断实例是永久实例还是临时实例,不同类型的实例有不同的处理方式
    5. 注册后同时会发布服务变更事件(后面说,先记着这个事件)

问题一:为什么客户端注册会先开启心跳后发起注册请求?

因为心跳是异步定时执行,就算后续的注册发生某意外注册失败,心跳机制还可以弥补注册(因为心跳也可以注册),如果是先发起注册后开启心跳,有可能注册发生某意外就直接终止了,心跳还没开启

问题二:服务端注册怎么保证线程安全?

服务器注册会先创建一个空的服务,后对该服务填充信息初始化,保存服务的map用ConcurrentHashMap修饰的,所以此过程是线程安全的,后续再对服务内实例更新的时候,采用synchronized对该服务做了加锁操作

问题三:服务端注册怎么保证性能?(临时实例)

前置操作时采用ConcurrentHashMap和synchronized锁服务,前者是最优的线程安全map,后者锁的服务颗粒度一定程度的保证了性能,后续均采用了异步更新,如本地注册表更新采用了阻塞队列异步执行,临时实例集群同步过程中同样采用了阻塞队列异步执行机制,因为为阻塞的异步执行,所以保值性能的同时也保证了资源不会占用异常

Last Updated: