配置中心
目录
TIP
客户端的配置有两种方式来维持,一是客户端主动获取,二是客户端长轮询更新
关于配置文件有几个类型要说明一下:
1.本地配置文件:本地就已经存在的配置文件
2.本地缓存文件:从服务端获取的保存在了本地(本地生成了文件)
3.cacheData缓存数据:内存中缓存的配置文件数据
客户端主动获取
客户端
从ConfigExample中我们可以得知,获取配置的方法为NacosConfigService.getConfig:
1.优先从本地配置中获取
2.本地配置中没有则会去服务端获取
3.服务端异常且异常不是因为鉴权失败,则从本地缓存文件中获取
我们来到ClientWorker.getServerConfig方法里面: 1.发起HTTP请求,URL:/v1/cs/configs 2.请求后不管怎样都会先创建一个本地缓存文件,只不过成功的会带有数据以及文件类型
服务端
ConfigController.getConfig
除去前面的参数校验以及处理,直接看ConfigServletInner.doGetConfig方法:
(里面判断分支太多就不截全部了,文件不是读数据库就是读本地文件)
1.获取读锁,获取不到就自旋重复获取10次
2.根据beta、tag、autoTag来判断读什么配置
3.PropertyUtil.isDirectRead()判断是读mysql还是读文件
4.使用jdk的零拷贝传输直接将文件输入流转response输出流
5.释放读锁
网上找的doGetConfig方法流程图
客户端长轮询更新
客户端
1.入口
在NacosConfigService初始化的时候,会初始化两个组件
- 一是网络组件,也就是http数据处理的(起作用的是ServerHttpAgent)
- 二是客户端的长轮询ClientWorker
ClientWorker在初始化的时候就会初始化两个定时调度线程池,以及启动一个定时任务,该定时任务会执行ClientWorker.checkConfigInfo() 方法(10ms执行一次):
2.配置文件分片处理
ClientWorker.checkConfigInfo()
该任务就是用来分配任务的,设定每3000个配置文件为一个分片,每个分片都会开启一个异步任务放到线程池中处理该分片的配置文件,异步任务为LongPollingRunnable
- 假设有5000个配置文件,就分为两个分片,开启两个线程处理
- 假设有8000个配置文件,就分为三个分片,开启三个线程处理
3.配置文件处理
LongPollingRunnable.run()
- 遍历所有配置,只有该异步任务监听的分片配置才会处理
- 对比本地配置文件和缓存的cacheData数据,判断数据是否出现变化
- 出现变化则针对监听器发布变更通知
- 将该分片监听的配置与服务端的配置对比,找到需要更新的配置Key(dataId+group+namespace)
- 遍历这些需要更新的key,然后请求服务端(主动获取的方式),得到最新的cacheData并更新本地缓存文件
- 针对监听器发布变更通知
- 之后继续执行该任务(轮询)
向服务端对比更新过程就类似于:我先拿着我这个分片所有配置文件的key和内容的MD5去和服务端的配置对比,服务端对比这些文件的MD5后返回需要更新的配置文件key,然后客户端遍历这些需要更新的配置文件key去主动请求服务端获取最新的,然后更新本地缓存及本地缓存的文件
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 校验本地配置同时获取同分片配置
for (CacheData cacheData : cacheMap.values()) {
// 同分片的配置才处理
if (cacheData.getTaskId() == taskId) {
// 将同分片的配置加入集合
cacheDatas.add(cacheData);
try {
//通过本地配置文件和cacheData集合中的数据进行比对,判断是否出现数据变化
checkLocalConfig(cacheData);
//这里表示数据有变化,需要通知监听器
if (cacheData.isUseLocalConfigInfo()) {
//通知所有针对当前配置设置了监听的监听器
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 与服务端对比 找到需要更新的配置key
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
// 遍历发生了变化的key,并根据key去服务端请求最新配置,并更新到内存缓存中
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 从远程服务端获取最新的配置,并缓存到内存中
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setEncryptedDataKey(response.getEncryptedDataKey());
cache.setContent(response.getContent());
if (null != response.getConfigType()) {
cache.setType(response.getConfigType());
}
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//通知所有针对当前配置设置了监听的监听器
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 继续执行该任务
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
// 如果任务出现异常,那么下次的执行时间就要加长,类似衰减重试
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
4.本地配置文件与缓存数据的对比
注意:该配置文件是项目启动就存在的,不是通过拉取服务端缓存下来的缓存文件
ClientWorker.checkLocalConfig
这个就好比,我本地已经有该配置文件了然后又在服务端找到了该配置文件,要怎么处理?
1.如果缓存配置为不使用本地配置,但是本地配置又存在,则设置该缓存配置为使用本地配置(本地配置文件的内存写到缓存配置中更新),需要发布变更通知
2.如果缓存配置为使用本地配置,但是本地配置又不存在,则设置该缓存配置为不使用本地配置,不需要通知
3.如果缓存配置为使用本地配置,本地配置也存在,但是缓存配置和本地配置的版本不同,则也需要将本地配置文件的内存写到缓存配置中更新
5.开启长轮询与服务端对比
ClientWorker.checkUpdateDataIds
就是把需要对比的配置关键信息拼接成字符串发送到服务端对比
ClientWorker.checkUpdateConfigStr
开启长轮询,处理对比逻辑,长轮询请求URL: v1/ns/configs/listener
6.通知监听器
CacheData.checkListenerMd5
在该方法中会对比listener和cacheData的MD5,如果不一样则代表cacheData发生了变化,则会触发监听器的回调处理
那么CacheData的MD5又是什么时候变得呢?在更新content内容的时候就变了,方法内同时更新了MD5
7.监听回调处理
CacheData.safeNotifyListener
在该方法内,cacheData会遍历所有listener,只要两者MD5不同就会在该方法内触发调用listener.receiveConfigInfo方法
拓展:我们也可以自定义listener加入进去,在变更的时候做自己的处理NacosConfigService.addListener调用该方法加入自定义监听器
服务端
1.入口
直接看长轮询的接口
ConfigController.listener
前面没啥说的就处理字符串数据
ConfigServletInner.doPollingConfig
这个方法里面其实就是短轮询和长轮询的判断,根据请求头Long-Pulling-Timeout,我们主要看长轮询,短轮询就不看了
2.长轮询机制
LongPollingService.addLongPollingClient
这个方法主要做几件事:
1.获取客户端请求的超时时间,减去500ms后赋值给timeout变量。
2.判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行
3.和服务端的数据进行MD5对比,如果发送变化则直接返回
4.如果没有变化则将请求转化为异步请求挂起,然后延迟执行ClientLongPolling线程
3.长轮询的延迟任务
ClientLongPolling.run
内部就一个延迟任务,延迟执行,执行时通过比较MD5判断客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端
所以整个过程来看,服务端就做了两件事,先判断是否配置有变化,有则立刻返回,没有则延迟返回,并在返回时再进行一次对比,这样使得配置在30s内没变化的情况下一直处理连接状态,这就是长轮询机制
4.数据变更事件
上述有个延迟30s的周期,如果这时间段内配置发生变化了,那配置岂不是得不到及时的更新?
当然不是,注意到上述框出来的allSubs了吗,这是个ClientLongPolling对象的队列,这是干嘛的?
而且从上述逻辑上不难看出,在延迟任务执行前的这30s内ClientLongPolling是一直存在于这个队列中的,因为执行完后就被添加进了这个队列,执行时才会移除,所以我们需要看看这个是干嘛的
顺着思路找我们不难发现,在LongPollingService生成的时候,订阅了一个LocalDataChangeEvent事件,触发这个事件的时候会执行一个DataChangeTask异步任务
DataChangeTask
1.会遍历队列中所有的ClientLongPolling对象
2.判断请求过来的需要对比的key里面是否包含当前变更的配置key
3.包含则移除出队列,并直接返回响应客户端信息
这里说明在长轮询的延迟执行的时间内,服务端也没闲着,一直在监听配置的变更,一旦有配置变更则发布LocalDataChangeEvent事件,触发事件后则提前响应客户端
总结
- 配置分三种形态,本地配置文件,本地缓存文件,本地缓存数据
- 客户端通过主动拉取和长轮询的方式来获取配置以及更新配置
- 主动拉取的顺序是本地配置文件→服务端→本地缓存文件
- 客户端长轮询中对比配置不同的方式是对比本地文件与本地缓存数据的MD5
- 长轮询是在客户端与服务端对比配置不同中发起的,存在不同配置服务端则立刻返回,没有则服务端会保持长连接延迟执行任务(30s左右),这中间服务端一旦有配置变更(LocalDataChangeEvent事件)则会提前响应返回
- 长轮询在获取到不同的配置后还会遍历这些配置主动拉取一次获取具体配置内容并写入本地缓存文件中