服务注册
目录
1.客户端
入口
从NamingExample来看
反射初始化NacosNamingService
服务注册方法
NacosNamingService.registerInstance
1.心跳参数校验 2.最终的服务名格式:serviceName@@groupName 3.如果是临时实例则会开启心跳包 4.服务注册 (参数组装调用API请求注册)
调用API请求注册NamingProxy.reqApi
1.单机注册中心,失败重试(默认3次,前提是nacos异常) 2.集群注册中心,随机挑选一个注册,失败则轮询其他注册中心 3.最终调用callServer方法 (API_URL: IP:PORT/nacos/v1/ns/instance )
callServer方法如下:
2.服务端
1.注册表
先了解一下服务端保存实例信息的结构(下面我们简称叫注册表):
难理解的就是为什么一个服务会有多个集群,不应该一个服务就一个集群吗?
这里可以理解为按机房划分集群,不管有多少个集群都属于你该服务的。比如上海机房有一个SH 集群,深圳机房有一个SZ集群,请求的时候你可以按地区请求最近的集群实例,如果整个地区集群都不可用那么可以请求其他地区的集群实例。
2.注册接口信息
注册接口:/nacos/v1/ns/instance
请求参数:
3.注册方法
InstanceController.register
4.注册流程(以临时实例为例)
ServiceManager.registerInstance
1.创建一个空的service放入注册表,为其 开启一个心跳检测,并将这个service加入监听列表
2.拿到创建好的service
3.完成实例的注册表更新,并完成nacos集群同步
5.创建空的service加入注册表
让我们看看是怎么创建空的service的
ServiceManager.createEmptyService:
ServiceManager.putServiceAndInit:
6.添加实例
ServiceManager.addInstance:
里面最重要的就是consistencyService.put(key,instances) 方法
consistencyService有很多种实现,根据实例的类型来判断具体走哪种实现方式,这里我们以临时实例为例,主要看看DistroConsistencyServiceImpl
DistroConsistencyServiceImpl.put 临时实例的注册方法
7.临时实例的添加
onPut方法:
1.会将任务放入Notifier内部的阻塞队列中,Notifier是个Runnable(异步执行任务)
2.最后会回到Service.onChange方法更新实例,内部调用updateIPs方法,这里面需要注意更新后会触发一个服务变更事件(后面有用)
Service.updateIPs
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 准备一个Map,key是cluster,值是集群下的Instance集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 获取服务的所有cluster名称
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 判断实例是否包含clusterName,没有的话用默认cluster
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断cluster是否存在,不存在则创建新的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取当前cluster实例的集合,不存在则创建新的
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 添加新的实例到 Instance 集合
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 将实例集合更新到 clusterMap(注册表)
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
//触发服务变更事件
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
8.临时实例的集群同步
distroProtocol.sync()临时实例集群同步:
遍历集群中其他节点
定义一个DistroDelayTask异步任务放入一个ConcurrentHashMap中,会有一个ScheduledExecutorService线程池定时从这个map中取任务执行
线程池的定义在NacosDelayTaskExecuteEngine中:
上诉线程池执行的任务就是NacosDelayTaskExecuteEngine.processTasks()如下:
protected void processTasks() {
// 获取任务map中所有的key
Collection<Object> keys = getAllTaskKeys();
//遍历key 并执行任务
for (Object taskKey : keys) {
// 取一个任务便从map中移除一个任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
// 尝试执行同步任务,如果失败会重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
// 如果失败会重试
retryFailedTask(taskKey, task);
}
}
}
DistroDelayTaskProcessor.process:
任务的执行被放入到process方法中,并被封装成DistroSyncChangeTask异步任务,又被塞到一个不知名封装好的地方(是一个阻塞队列,同样有地方取出来执行,我们直接看这个任务的执行)
DistroSyncChangeTask.run
1.syncData方法最终会到NamingProxy.syncData方法,执行HTTP请求,同步数据
2.如果失败了,则又会调用NacosDelayTaskExecuteEngine.addTask()方法重新将DistroDelayTask任务放进ConcurrentHashMap中,重复上述的processTasks方法
3.总结
- 客户端:启动则获取自身配置信息,发起http请求注册,临时实例同时会开启心跳机制(下面会说),服务端是单机的情况下请求失败会重试三次,服务端是单机的集群的情况下请求失败会轮询请求
- 服务端:
- 本地通过一个Map保存所有服务信息,注册的实质就是往map里面添加信息
- 会先创建空的服务,后更新服务中的实例信息
- 服务创建后会初始化服务,启动心跳检测
- 往服务中添加实例的时候会判断实例是永久实例还是临时实例,不同类型的实例有不同的处理方式
- 注册后同时会发布服务变更事件(后面说,先记着这个事件)
问题一:为什么客户端注册会先开启心跳后发起注册请求?
因为心跳是异步定时执行,就算后续的注册发生某意外注册失败,心跳机制还可以弥补注册(因为心跳也可以注册),如果是先发起注册后开启心跳,有可能注册发生某意外就直接终止了,心跳还没开启
问题二:服务端注册怎么保证线程安全?
服务器注册会先创建一个空的服务,后对该服务填充信息初始化,保存服务的map用ConcurrentHashMap修饰的,所以此过程是线程安全的,后续再对服务内实例更新的时候,采用synchronized对该服务做了加锁操作
问题三:服务端注册怎么保证性能?(临时实例)
前置操作时采用ConcurrentHashMap和synchronized锁服务,前者是最优的线程安全map,后者锁的服务颗粒度一定程度的保证了性能,后续均采用了异步更新,如本地注册表更新采用了阻塞队列异步执行,临时实例集群同步过程中同样采用了阻塞队列异步执行机制,因为为阻塞的异步执行,所以保值性能的同时也保证了资源不会占用异常