小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。
NacosConfigService
NacosConfigService构造方法:
- 初始化一个HttpAgent,用到装饰器模式,实际工作类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent方法,增加了监控统计信息。
- ClientWorker是客户端工作类,agent参数传入ClientWorker,用agent做一些与远程相关的工作。
public NacosConfigService(Properties properties) throws NacosException{
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if(StringUtils.isBlank(encodeTmp)){
encode=Constants.ENCODE;
}else{
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MeticsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent,configFilterChainManager,properties);
}
复制代码
ClientWorker
ClientWorker构造方法:
- 构建两个定时调度的线程池,并启动一个定时任务。
- 第一个线程池executor只拥有一个核心线程,每隔10ms会执行一次checkConfigInfo()方法,从方法名上可知道每个10ms检查一次配置信息。
第二个线程池executorService只完成了初始化,用于实现客户端的定时长轮询功能。
public ClientWorker(final HttpAgent aagent,final ConfigFilterChainManager configFilterChainManager,final Properties properties){
this.agent=agent;
this.configFilterChainManager=configFilterChainManager;
init(properties);
executor=Executors.newScheduledThreadPool(1,new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker."+agent.getName());
t.setDaemon(true);
return t;
}
});
excutorService= Excutors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.longPilling."+agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable(){
@Override
public void run(){
try{
checkConfigInfo();
}catch(Throwable e){
LOGGER.error("["+agent.getName()+"][sub-check] rotate check error",e);
}
}
});
}
复制代码
ClientWorker.checkConfigInfo
接着深入到ClientWorker 构造方法中,通过executor.scheduleWithFixedDelay启动一个每隔10s执行一次的定时任务,调用checkConfigInfo方法,这个方法主要检查配置是否发生修改,用executorService定时调度的线程池。
public void checkConfigInfo(){
int listenerSize = cacheMap.get().size();
int longingTasjCount=(int)Math.ceil(listenerSize/ParamUtil.getPerTaskConfigSize());
if(longTaskCount> currentLongingTaskCount){
for(int i=(int)currentLongingTaskCount;i<longingTaskCount;i++){
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
复制代码
- cacheMap:
AtomicReference<Map<String,CacheData>> cacheMap用来存储监听变更的缓存集合,key是根据dataId
/group/tenant(租户)拼接的值。Value是对应的存储在Nacos服务器的配置文件内容。
近期评论