Nacos源码分析
1. 配置中心
1. dataId完整格式
${prefix}-${spring.profiles.active}.${file-extension}
复制代码
- prefix 默认项目名称即spring.application.name,也可以通过spring.cloud.nacos.config.prefix配置
- spring.profiles.active 当前环境对应的profile
- file-extension 通过spring.cloud.nacos.config.file-extension 配置,可选有yaml,properties
2.Spring Cloud 集成Nacos配置中心原理
-
SpringApplication.run() ,springboot项目启动
public ConfigurableApplicationContext run(String... args) { try { //处理输入参数 ApplicationArguments applicationArguments = new DefaultApplicationArguments( args); //准备环境 ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments); configureIgnoreBeanInfo(environment); //打印图标 Banner printedBanner = printBanner(environment); //创建上下文 context = createApplicationContext(); //获取spring.factories中配置的类实例,spi机制,自动配置的实现重点 exceptionReporters = getSpringFactoriesInstances( SpringBootExceptionReporter.class, new Class[] { ConfigurableApplicationContext.class }, context); //准备上下文 prepareContext(context, environment, listeners, applicationArguments, printedBanner); //刷新上下文,加载Bean至容器 refreshContext(context); afterRefresh(context, applicationArguments); stopWatch.stop(); if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass) .logStarted(getApplicationLog(), stopWatch); } listeners.started(context); callRunners(context, applicationArguments); } return context; } 复制代码
-
this.prepareContext(),准备项目上下文
private void prepareContext(ConfigurableApplicationContext context, ConfigurableEnvironment environment, SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments, Banner printedBanner) { context.setEnvironment(environment); postProcessApplicationContext(context); //调用所有实现ApplicationContextInitializer接口的的子类 applyInitializers(context); listeners.contextPrepared(context); } 复制代码
-
PropertySourceBootstrapConfiguration 实现了ApplicationContextInitializer接口,initialize()方法,对所有实现PropertySourceLocator的类进行处理,重点是调用locator.locate()方法,返回PropertySource,,其中就包含NacosPropertySourceLocator,返回的PropertySource中就包含Nacos配置中心的配置,最后将其写入上下文中
public void initialize(ConfigurableApplicationContext applicationContext) { CompositePropertySource composite = new CompositePropertySource( BOOTSTRAP_PROPERTY_SOURCE_NAME); AnnotationAwareOrderComparator.sort(this.propertySourceLocators); boolean empty = true; ConfigurableEnvironment environment = applicationContext.getEnvironment(); //获取额外的配置 for (PropertySourceLocator locator : this.propertySourceLocators) { PropertySource<?> source = null; source = locator.locate(environment); if (source == null) { continue; } logger.info("Located property source: " + source); composite.addPropertySource(source); empty = false; } //加载至context if (!empty) { MutablePropertySources propertySources = environment.getPropertySources(); String logConfig = environment.resolvePlaceholders("${logging.config:}"); LogFile logFile = LogFile.get(environment); if (propertySources.contains(BOOTSTRAP_PROPERTY_SOURCE_NAME)) { propertySources.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME); } insertPropertySources(propertySources, composite); reinitializeLoggingSystem(environment, logConfig, logFile); setLogLevels(applicationContext, environment); handleIncludedProfiles(environment); } } 复制代码
那么NacosPropertySourceLocator是什么时候注入进来的呢?
-
springboot项目启动时会读取spring.factories文件里配置的类名,然后通过反射机制将其加载,spring-cloud-alibaba-nacos-config的spring.factories文件配置Nacos配置类NacosConfigBootstrapConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosConfigBootstrapConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosConfigAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosConfigEndpointAutoConfiguration org.springframework.boot.diagnostics.FailureAnalyzer=\ org.springframework.cloud.alibaba.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer 复制代码
NacosConfigBootstrapConfiguration类如下,可以看见配置类中注入了NacosPropertySourceLocator
@Configuration public class NacosConfigBootstrapConfiguration { @Bean public NacosPropertySourceLocator nacosPropertySourceLocator() { return new NacosPropertySourceLocator(); } @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); } } 复制代码
-
NacosPropertySourceLocator.locate()方法,最终所有的加载配置都是通过loadNacosDataIfPresent(),最终通过NacosConfigService.getConfig()方法加载配置中心配置
@Override public PropertySource<?> locate(Environment env) { //创建NacosConfigService ConfigService configService = nacosConfigProperties.configServiceInstance(); //省略一些基本的读取配置操作 CompositePropertySource composite = new CompositePropertySource( NACOS_PROPERTY_SOURCE_NAME); //加载共享配置 loadSharedConfiguration(composite); //加载扩展配置 loadExtConfiguration(composite); //加载程序配置 loadApplicationConfiguration(composite, nacosGroup, dataIdPrefix, fileExtension); return composite; } 复制代码
3.Nacos原理
-
创建NacosConfigService对象,生成agent,worker
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); } initNamespace(properties); //HTTP代理 agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); //重要工作类 worker = new ClientWorker(agent, configFilterChainManager, properties); } 复制代码
-
ClientWorker初始化新建两个线程池
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { //创建只有一个线程的线程池并且执行定时任务:检查配置是否发生变化 executor = Executors.newScheduledThreadPool(); //长轮训线程池 executorService = Executors.newScheduledThreadPool(); //以固定间隔10ms执行任务 executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } 复制代码
-
checkConfigInfo() 分批执行任务,防止数据量过大
public void checkConfigInfo() { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } } 复制代码
-
LongPollingRunnable.run()
public void run() { //1.检查本地配置 //code...... //2.检查服务端配置,见5 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); //3.请求具体变化数据,写入缓存 content = getServerConfig(dataId, group, tenant, 3000L); //4.检查缓存数据是否发生变化,通知监听器,如果变化则发布RefreshEvent事件,见6 cacheData.checkListenerMd5(); //code.... } 复制代码
-
checkUpdateDataIds()调用checkUpdateConfigStr()向nacos发送请求,获取变化配置
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { //发送请求,获取变化的配置,超时时间默认30秒 HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); return parseUpdateDataIdResponse(result.content); } 复制代码
-
NacosContextRefresher发布RefreshEvent事件
public void receiveConfigInfo(String configInfo) { loadCount.incrementAndGet(); String md5 = ""; if (!StringUtils.isEmpty(configInfo)) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { LOGGER.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Refresh Nacos config group{},dataId{}", group, dataId); } } 复制代码
-
RefreshEventListener处理配置刷新事件
public synchronized Set<String> refresh() { Map<String, Object> before = extract( this.context.getEnvironment().getPropertySources()); //重新准备环境 addConfigFilesToEnvironment(); Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet(); //刷新属性源–属性源相对应的属性bean从旧的换成新的 this.context.publishEvent(new EnvironmentChangeEvent(context, keys)); //触发scope的refreshAll操作,针对RefreshScope来说就是清空了他所管理的缓存bean,待再次调用时重新创建,创建过程就会注入新的属性源 this.scope.refreshAll(); return keys; } 复制代码
2. 服务注册中心
1.服务注册
-
先看spring.factories中的配置,NacosDiscoveryAutoConfiguration即是和服务注册有关的自动配置类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosDiscoveryAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.ribbon.RibbonNacosAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClientAutoConfiguration 复制代码
-
NacosDiscoveryAutoConfiguration中往容器中注入了NacosServiceRegistry,NacosRegistration,NacosAutoServiceRegistration这三个Bean,服务注册也就是由它们来完成
public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } } 复制代码
-
NacosServiceRegistry,该类实现了 spring-cloud-commons 提供的 ServiceRegistry 接口,重写了register、deregister两个方法,在register方法中主要是将配置文件装换成Instance实例,调用了namingService.registerInstance()方法,即可将服务注册到注册中心
public class NacosServiceRegistry implements ServiceRegistry<Registration> { private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class); private final NacosDiscoveryProperties nacosDiscoveryProperties; private final NamingService namingService; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.namingService = nacosDiscoveryProperties.namingServiceInstance(); } @Override public void register(Registration registration) { } @Override public void deregister(Registration registration) { } } 复制代码
-
NacosRegistration实现了Registration, ServiceInstance,主要是记录了一些配置信息
public class NacosRegistration implements Registration, ServiceInstance { public static final String MANAGEMENT_PORT = "management.port"; public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path"; public static final String MANAGEMENT_ADDRESS = "management.address"; public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path"; private NacosDiscoveryProperties nacosDiscoveryProperties; private ApplicationContext context; public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.context = context; } @PostConstruct public void init() { Map<String, String> metadata = nacosDiscoveryProperties.getMetadata(); Environment env = context.getEnvironment(); String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH); if (!StringUtils.isEmpty(endpointBasePath)) { metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath); } Integer managementPort = ManagementServerPortUtils.getPort(context); if (null != managementPort) { metadata.put(MANAGEMENT_PORT, managementPort.toString()); String contextPath = env .getProperty("management.server.servlet.context-path"); String address = env.getProperty("management.server.address"); if (!StringUtils.isEmpty(contextPath)) { metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath); } if (!StringUtils.isEmpty(address)) { metadata.put(MANAGEMENT_ADDRESS, address); } } } @Override public String getServiceId() { return nacosDiscoveryProperties.getService(); } @Override public String getHost() { return nacosDiscoveryProperties.getIp(); } @Override public int getPort() { return nacosDiscoveryProperties.getPort(); } public void setPort(int port) { this.nacosDiscoveryProperties.setPort(port); } @Override public boolean isSecure() { return nacosDiscoveryProperties.isSecure(); } @Override public URI getUri() { return DefaultServiceInstance.getUri(this); } @Override public Map<String, String> getMetadata() { return nacosDiscoveryProperties.getMetadata(); } public boolean isRegisterEnabled() { return nacosDiscoveryProperties.isRegisterEnabled(); } public String getCluster() { return nacosDiscoveryProperties.getClusterName(); } public float getRegisterWeight() { return nacosDiscoveryProperties.getWeight(); } public NacosDiscoveryProperties getNacosDiscoveryProperties() { return nacosDiscoveryProperties; } public NamingService getNacosNamingService() { return nacosDiscoveryProperties.namingServiceInstance(); } @Override public String toString() { return "NacosRegistration{" + "nacosDiscoveryProperties=" + nacosDiscoveryProperties + '}'; } } 复制代码
现在配置信息,注册方法都已经有了,又是在什么时候调用注册方法将服务注册到注册中心的呢?下面就继续看最后一个Bean
-
NacosAutoServiceRegistration实现的方法没有什么特殊,重点看它继承的抽象类AbstractAutoServiceRegistration
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> { } 复制代码
spring boot框架实现了发布订阅模式,项目启动过程中会发布一系列的事件,AbstractAutoServiceRegistration中就监听了WebServerInitializedEvent(在web server启动完成后即发布该事件),register()中获取registration中的信息,调用serviceRegistry完成服务注册操作,继承nacos时即使用上面注入进容器的两个实现,NacosServiceRegistry,NacosRegistration
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware { private static final Log logger = LogFactory .getLog(AbstractAutoServiceRegistration.class); @EventListener(WebServerInitializedEvent.class) public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals( ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } @PreDestroy public void destroy() { stop(); } protected void register() { this.serviceRegistry.register(getRegistration()); } protected void deregister() { this.serviceRegistry.deregister(getRegistration()); } public void stop() { if (this.getRunning().compareAndSet(true, false) && isEnabled()) { deregister(); if (shouldRegisterManagement()) { deregisterManagement(); } this.serviceRegistry.close(); } } } 复制代码
-
心跳机制检查服务状态。本地服务在nacos注册时同时开启BeatReactor以固定速度发送心跳包
public class BeatReactor { private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); private long clientBeatInterval = 10000L; private NamingProxy serverProxy; public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap(); public BeatReactor(NamingProxy serverProxy) { this.serverProxy = serverProxy; this.executorService.scheduleAtFixedRate(new BeatReactor.BeatProcessor(), 0L, this.clientBeatInterval, TimeUnit.MILLISECONDS); } public void addBeatInfo(String dom, BeatInfo beatInfo) { LogUtils.LOG.info("BEAT", "adding service:" + dom + " to beat map."); this.dom2Beat.put(dom, beatInfo); } public void removeBeatInfo(String dom) { LogUtils.LOG.info("BEAT", "removing service:" + dom + " from beat map."); this.dom2Beat.remove(dom); } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } public void run() { Map<String, String> params = new HashMap(2); params.put("beat", JSON.toJSONString(this.beatInfo)); params.put("dom", this.beatInfo.getDom()); try { String result = BeatReactor.this.serverProxy.callAllServers("/nacos/v1/ns/api/clientBeat", params); JSONObject jsonObject = JSON.parseObject(result); if (jsonObject != null) { BeatReactor.this.clientBeatInterval = jsonObject.getLong("clientBeatInterval"); } } catch (Exception var4) { LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(this.beatInfo), var4); } } } class BeatProcessor implements Runnable { BeatProcessor() { } public void run() { try { Iterator var1 = BeatReactor.this.dom2Beat.entrySet().iterator(); while(var1.hasNext()) { Entry<String, BeatInfo> entry = (Entry)var1.next(); BeatInfo beatInfo = (BeatInfo)entry.getValue(); BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS); LogUtils.LOG.debug("BEAT", "send beat to server: ", new Object[]{beatInfo.toString()}); } } catch (Exception var4) { LogUtils.LOG.error("CLIENT-BEAT", "Exception while scheduling beat.", var4); } } } } 复制代码
2.服务发现
- NacosNamingService在初始化时创建hostReactor用于获取,保存,更新service实例信息
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
//failoverReactor用于故障转移
this.failoverReactor = new FailoverReactor(this, cacheDir);
//pushReceiver用于接收Nacos服务端的推送,以UDP方式传递消息
this.pushReceiver = new PushReceiver(this);
}
复制代码
- PushReceiver启动一个线程接受服务端的UDP推送消息
public class PushReceiver implements Runnable {
private ScheduledExecutorService executorService;
private static final int UDP_MSS = 64 * 1024;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public static class PushPacket {
public String type;
public long lastRefTime;
public String data;
}
public int getUDPPort() {
return udpSocket.getLocalPort();
}
}
复制代码
- hostReactor主要方法介绍
public class HostReactor {
//获取服务map
public Map<String, ServiceInfo> getServiceInfoMap() {
return serviceInfoMap;
}
//添加更新任务
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
//处理服务器返回json,并且调用eventDispatcher下发相应的事件
public ServiceInfo processServiceJSON(String json);
//从本地map中获取服务信息
private ServiceInfo getServiceInfo0(String serviceName, String clusters);
//直接从服务器获取服务信息
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters);
//获取服务信息
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
//1.failoverReactor.isFailoverSwitch(),根据容灾开关决定是否直接从本地容灾缓存取数据
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//2.getServiceInfo0(serviceName, clusters)从本地缓存取数据
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
//3.如果没取到数据将更新服务数据,updateServiceNow
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
}
//当前服务正在更新则等待
else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//启动更新任务任务更新服务
scheduleUpdateIfAbsent(serviceName, clusters);
//返回服务
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
}
//请求服务器同步更新服务
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
//只请求服务不做任何事
public void refreshOnly(String serviceName, String clusters) {
try {
serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
//更新任务,调用一次后以固定的速度执行任务
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
}
}
复制代码
近期评论