全网最全的dubbo教程,万字讲解服务引用

「本文已参与好文召集令活动,点击查看后端、大前端双赛道投稿,2万元奖池等你挑战!」

好吧,又是一篇吃力不讨好的超长源码解析文章,肝的让阅读完成率低入谷底,不过竟然说好的写一系列dubbo,跪着也得写,虽然阅读量低的可怜,但是我相信,这一系列写完,自成体系了,应该阅读量就上来了,应该吧,实在不应该,就当我没说了。

所以dubbo是啥?dubbo就是你要的banana。看看前两篇

一起玩dubbo,先入个门

一起玩dubbo,万字长文揭秘服务暴露

好了,废话不多说了,也说了那么多了,接下来正经点聊聊服务引入这块。

什么是服务引入

我在我两千平方的小房间里边,想了好久,发现还是有点难解答这个问题。

这么说吧,学java的都知道对象引入的概念吧,服务引入其实就是将该对象引入和服务绑定在一起,你以为你调的是A对象,其实该对象已经被代理了,最终你调用的是很远很远某个角落的某台服务器上的某个对象。

话糙理也糙,接下来还是按照我日常思考人生的姿势,分为几个方向说说服务引入吧,希望大家看完,可以和我一样,用一样的姿势思考人生。

接下来会涉及到源码部分,以下源码的示例接来自dubbo2.6x,源码方面的注释都已经提交到github上,有需要的可以clone:

github.com/wiatingpub/…

什么时候触发的服务引入

依旧是从服务引入的配置入手

在说服务暴露的时候有说过,dubbo采用了比较经典的xml配置,并理所当然的使用了NamespaceHandlerSupport将xml中的节点配置映射成了对应对象,配置我们已经看到了,接下来继续看看NamespaceHandlerSupport解析了哪个对象

哦,是它;

看看这个对象有什么特别的。

对着类来一波Ctrl + H,可以看到该类的实现结构

哦豁,看到实现了FactoryBean和InitializingBean,

首先实现了接口InitializingBean,该接口提供了方法afterPropertiesSet,看看有什么用

看到了吧,其实这个是提供给恶汉模式用的,如果在配置中配置了预处理则直接引入服务,也就是init参数

竟然有饿汉模式了,那么懒汉在哪里呢?懒汉懒汉你在哪啊

这个时候FactoryBean出场了,看看这个接口提供的方法,也就是getObject方法了

看到了吧,如果是懒汉模式则在ReferenceBean对应的服务被注入到其他类中时被引入。

好了,文章到这就结束了,我也困了。

这个时候肯定一堆人吐槽了,就这?你以为我会怕吗?不可能告诉你,这么肝的文章,基本就没人会往后看完,甚至可能也没人会看到这里。也是,那就这样吧,先睡觉了,毕竟现在凌晨一点了,不如早点睡对身体好。

(⊙o⊙)…三点了,睡不着,算了,继续写吧,反正睡不着,当做自我消遣就可以了。

貌似还少画了一个流程图,那就勉为其难的画吧,说不定画完就想睡觉了。

还是那样,想看整体流程图的可以点击链接:
流程图链接

接下来进入少儿不宜,毁灭你的人生观、世界观、爱情观三观的超长服务引入过程,该过程可能引起晕厥,不适应者可以提前下车。

提一波URL

上篇文章有说这个,如果看过了,可以直接忽略,一模一样的,放在这里是为了避免有读者不看上一篇。

在说服务引入之前必须先提一波URL,否则主线没了,后续不好讲。

在我没有接触到dubbo之前,我对URL的定位是指网络地址,而在dubbo中,可以认为是一种约定,几乎dubbo的所有模块都是通过URL来传参,这有什么好处呢?

我们可以想想,如果没有约定好,那么不同的接口之间进行交互的参数便会乱掉,一会是字符串,一会是map,而有了统一的约定后,代码便会更加的规范和统一,我们在看代码的时候也会比较清晰,也容易拓展,比如如果你想拓展什么东西,直接往URL上拼接参数就可以了。

我们可以看到,除了几个基础的参数外,很多参数其实最终都放到了parameters中。

而在我司项目中,我们参考了URL的设计,构建了元数据的结构,也就是map,将服务的部分动态参数通过map进行传递。

服务引入过程

看代码的最好方式就是带着疑问去看。----帅饭饭

很有道理,那么请问服务引入的时候是什么时候连接上远程服的?

为了解答这个问题,我们直接看答案

没错,就是这里了,那么流程又是什么样的呢?

看了下这么长的流程,我知道你已经准备关闭掉这篇该死的文章了,莫急,大哥先喝口茶,我这边已经整理了下几个重要的步骤了,只需要搞清楚以下几个步骤就可以了, 其他的全是包装以及SPI的查找

  • init做了啥
  • createProxy做了啥
  • RegistryProtocol做了啥
  • RegistryDirectory做了啥
  • DubboProtocol做了啥

什么?还多?那其实这整个流程可以浓缩为:

不皮了,爱看不看,反正看完不点赞,接下来详细讲讲了。

1、init做了啥

首先是服务引入入口无论是饿汉模式还是懒汉模式,都是走向了get方法,get方法最终又走向了init方法,所以就从init方法开始分析了,先看下Init方法做了啥

先直接炸死人不赔偿的代码

private void init() {
    // 如果已经初始化过了,则直接return
    if (initialized) {
        return;
	/**设置各种参数**/
    checkApplication();
    // 本地存根合法性校验
    checkStub(interfaceClass);
    // mock合法性校验
    checkMock(interfaceClass);
    // 使用map来存放配置
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    // 表明这是消费者端
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    // 添加dubbo版本
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    // 添加时间戳
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        // pid等
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    if (!isGeneric()) {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            // 设置版本号
            map.put("revision", revision);
        }
        // 获得所有方法,并将所有方法签名拼接起来放入map中
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 加入服务接口名称
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 继续添加各种信息到map中
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        // 遍历方法配置
        for (MethodConfig method : methods) {
            // 把方法配置放入map中
            appendParameters(map, method, method.getName());
            // 生成重试的配置key
            String retryKey = method.getName() + ".retry";
            // 如果配置中已经有配置了,则移除配置
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 如果配置为false,意味着不重试,则设置重试次数为0
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            appendAttributes(attributes, method, prefix + "." + method.getName());
            // 设置异步配置
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    // 获取服务消费者ip地址等
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        // 可以看到如果为空,则获取本地ip
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
    }
    // 将消费者ip放入map中
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    //attributes are stored by system context.
    StaticContext.getSystemContext().putAll(attributes);
    // 创建代理对象
    ref = createProxy(map);
    // 生成ConsumerModel放入AppliacitonModel中
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
复制代码

可以看到我浓缩了一些代码,最后总结下三个个步骤

  • 检测本地存根和mock的合法性
  • 添加协议版本、时间戳、发布版本、方法配置、重试配置、异步配置、消费者ip等信息到map中
  • 创建代理对象

补充下流程图

在我司的rpc组件中,我们也考虑是否支持两种模式的服务引入,最终放弃了,觉得没什么必要,在我司的项目中,我们是在起服的时候便去做了服务的引入,属于饿汉模式的这种。

2、createProxy做了啥

接下来重头戏主要还是在创建代理对象这里,我们继续分析

private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        // 根据配置检查是否为本地调用
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            // 如果是本地调用,生成URL
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 利用InjvmProtocol生成InjvmInvoker实例
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            // 如果不是则判断是否为直连,url不为空则表明想通过直连来调用
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                // 为空则意味着想用注册中心的方式来调用,这里现在注册中心的URL
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                // 有多个URL连接的情况,则调用RegistryProtocl的refer构建Invoker实例
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                // 遍历所有连接的URL
                for (URL url : urls) {
                    // 调用RegistryProtocl的refer构建Invoker实例
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url;
                    }
                }
                // 由集群进行多个invoker的合并
                if (registryURL != null) {
                    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }
复制代码

代码就不用看了,直接看注释可能好点,之所以贴这么多代码,可能是我为了让文章看起来比较多内容。

该方法的大致逻辑可以分为以下几步:

  • 判断是否是本地调用,如果是则用InjvmProtocol的refer方法生成Invoker实例
  • 如果不是本地调用,根据直连配置或者注册中心配置,合并远程连接的urls
  • 遍历每个url,加入监控中心的配置
  • 如果是单个url,则直接通过SPI引入RegistryProtocol的refer构建Invoker实例,如果是多个urls则遍历urls,对每个url生成Invoker,并利用集群进行多个invoker的合并。
  • 最终输出一个invoker,因为如果是多个的话,集群也会将其封装成一个。

继续补充流程图:

为啥一定要封装Invoker?

上篇文章也有说过这个,其实就是为了屏蔽本地调用或者远程调用或者集群调用的细节,统一暴露出一个可执行体,方便调用者调用,而不管怎么封装,其实最终都是调向目标方法,可以看到,就算是有多个invoker,最终也会通过集群Cluster封装成一个invoker。

dubbo的设计上来说,可以说invoker承载的逻辑的调度,而URL则承载了配置的传参,可以这么认为,URL就是材料,而最终生成了对应的食物,也就是Invoker,在我司,我们也是通过invoker包装了逻辑。

3、RegistryProtocol做了啥

接下来看RegistryProtocol,在这里有具体生成invoker的逻辑,至于SPI机制是如何找到RegistryProtocol,有兴趣可以持续关注,后续会单独写

@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取registry参数值,并将其设置成协议头,默认是dubbo
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 获得注册中心实例
    Registry registry = registryFactory.getRegistry(url);
    // 如果是注册中心服务,则返回注册中心服务的invoker
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 将url配置转撑map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // 获得group值
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // 如果有多个组,则使用MergeableCluster调用doRefer继续执行服务引入逻辑
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 只有一个组则继续执行doRefer
    return doRefer(cluster, registry, type, url);
}
复制代码

这里的逻辑比较简答,如果是注册服务中心,则直接创建代理,如果不是则处理配置,根据配置来决定Cluster的实现方式,最终调用doRefer方法

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 创建RegistryDirectory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 设置注册中心
    directory.setRegistry(registry);
    // 设置协议
    directory.setProtocol(protocol);
    // 所有属性放到map中
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服务消费者连接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
        // 注册服务,在consumers目录下生成新节点
        registry.register(registeredConsumerUrl);
        directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }
    // 订阅providers、configuratios、routers等节点数据
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    // 一个注册中心有可能有多个服务提供者,因此需要使用cluster集群合并成一个invoker
    Invoker invoker = cluster.join(directory);
    // 在服务提供者处注册消费者
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
复制代码

这个方法核心几个步骤如下:

  • 创建RegistryDirectory,再生成服务消费者连接,并向注册中心注册服务,在consumers目录下生成新节点

  • 订阅订阅providers、configuratios、routers等节点数据,这样diretory便可以感知到这几个节点的变更信息,这点为了后续的服务治理,后续再聊,有兴趣持续关注

  • 合并invoker,一个注册中心有可能有多个服务提供者,因此需要使用cluster集群合并成一个invoker

这里得反问一句,Directory又是啥?

Directory简称服务目录, 简单来说就是消费者将自己能够调用的服务提供者的信息缓存到本地Directory中,当服务提供者有所变化时会通知到注册中心,消费者会监听注册中心相关服务的消息,当收到相关服务提供者变动的消息时会更新本地服务目录Directory。去掉也是可以的,就是模型没那么好看。

继续补充流程

4、RegistryDirectory做了啥

可以先看下RegistryDirectory.subscribe做了啥

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}
复制代码

由于我们配置了注册中心是ZookeeperRegistry,因此订阅的逻辑最终走向ZookeeperRegistry,而ZookeeperRegistry是继承了FailbackRegistry的,因此最终是走向FailbackRegistry.subscribe

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 真正做订阅的地方
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // 订阅失败,则放入失败容器中
        addFailedSubscribed(url, listener);
    }
}
复制代码

其实我们可以根据FailbackRegistry这个命名便可以猜到,如果订阅失败了,则会有个重试机制再继续跑,诺,最后一行代码已经证实了这一点,给你看看这个FailbackRegistry的结构,看看构造方法

// TODO: 2021/5/24 从url中获取重试频率参数,启动定时器进行重试逻辑
public FailbackRegistry(URL url) {
    super(url);
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                // TODO: 2021/5/29 定时重试 
                retry();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
复制代码

果不其然,最终如果订阅发生了异常,则会进行定时重试。

细心的小伙伴可能看到了,void subscribe(URL url, NotifyListener listener),NotifyListener 这个又是啥呢?

可以看到

其实就是RegistryDirectory本身,这里用了监听者模式,最终实现了RegistryDirectory可以感知服务实例的变更。

接下来即将进入高潮了

@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 处理URL参数中interface为*的订阅,例如监控中心的订阅
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 获得根目录
            String root = toRootPath();
            //根据url获得对应的监听器
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                // 创建监听器
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            // 获得节点监听器
            ChildListener zkListener = listeners.get(listener);
            // 如果节点监听器为空则创建
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            //创建service节点
            zkClient.create(root, false);
            // 向zookeeper的service节点发起订阅
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 发起该service层的订阅
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<URL>();
            // 遍历分类数组
            for (String path : toCategoriesPath(url)) {
                // 获得监听器集合
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                // 如果没有则创建
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // 获得监听器
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // 通知服务变化,回调NotifyListener
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 创建节点,如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 通知数据变更,如RegistryDirectory
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
复制代码

代码很多,总结下不过是以下几个步骤

  • 构建监听器,在注册中心上创建几个节点,
  • 监听这几个节点的服务变更
  • 在拿到具体的服务URL后触发监听器的变更,也就是RegistryDirectory感知到

接下来继续看RegistryDirectory感知到后做了啥

private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        // 关闭所有invoker
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 将传入的invokers转成新的urlInvokerMap
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // 转换出新的methodInvokerMap
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            // 注销不再使用的invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
复制代码

对比其实销毁旧的,根据新的构建invoker,我们可以看到,接下来就到叻DubboProtocol了

继续补充流程图

5、DubboProtocol做了啥

最终其实就是通过了几个包装类,然后通过SPI走向了DubboProtocol.refer

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 创建DubboInvoker,并且连接远程服,将连接包装ExchangeClient放入DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}
复制代码

核心便是创建DubboInvoker,并连接远程服将连接包装进DubboInvoker中,那么发起连接的行为是什么样的,继续看最后几个步骤

private ExchangeClient[] getClients(URL url) {
    // 是否一个连接对应一个服务
    boolean service_share_connect = false;
    // 获得url中连接共享的配置,默认0
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 没有配置,则是共享类,并且连接数为1
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    // 创建ExchangeClient数组
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 如果共享,则获得共享客户端对象
            clients[i] = getSharedClient(url);
        } else {
            // 创建新连接
            clients[i] = initClient(url);
        }
    }
    return clients;
}
复制代码

可以看到这里会根据URL的配置取出是否是共享的连接,如果是则取出缓存中的连接,如果不是共享的,则创建新连接,我们直接看创建新连接

private ExchangeClient initClient(URL url) {

    // client type setting.
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // 是否需要延时连接,默认为false
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 直接发起连接
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}
复制代码

终于到了最后一步了,这里边是真正做了发起连接的地方,并且还做了是否延时的支持。

继续流程图

生成后具体的Invoker后,接下来就是这么将invoker暴露给外部进行调用了,我们可以看到

最终是通过代理机制实现了这过程,这部分逻辑在上篇文章说过了,有兴趣可以看看

看到这里服务暴露流程基本理完了,还是有点东西在里面的,并且还需要掌握 Dubbo SPI,不然有些点例如自适应什么的还是很难理解的,为了写这篇文章,我前前后后也是花了不少的时间。

最后我再来一张完整的流程图带大家再过一遍,具体还是有很多细节,不过不是主干我就不做分析了,不然文章散掉了。

dubbo流程2

后续服务治理、APO、SPI机制也会在该流程图上进行拓展,有兴趣的也可以关注流程图链接:

流程图链接

总结

虽然看完了该篇文章,但是还是建议大家自己打断点过一遍,可以更加清晰,而如果是为了应付面试官提问的话,基本上记住上面流程图的内容就差不多了,当你研究完了dubbo后,其实会发现dubbo有很多东西可以写,后续安排

  • 服务消费方如何远程调用服务提供方

  • SPI

  • dubbo中的AOP机制

  • 服务治理

  • ....

    等好几个模块,最后就是带大家撸一个RPC框架了,还是那句话,想学dubbo的可以持续关注这一系列。