Consumer服务调用过程分析

1. 前言

之前的文章对Dubbo服务暴露和引用流程进行了分析,再然后了解了Dubbo网络通讯协议的设计细节,现在终于到了最激动人心的时刻,我们将分析Dubbo是如何实现服务调用的。篇幅原因,文章会拆分成两篇,本篇从Consumer的视角,分析服务的调用过程和对响应结果的处理,下一篇会从Provider的视角分析Dubbo是如何处理RPC请求并响应结果的。
image.png
这是官方文档给的图,从中就可以看出大概的处理流程。代理对象通过客户端发送网络请求,请求/响应对象会经过Codec编解码、序列化发送到对端,对端接收到数据后进行解码和反序列化,通过Dispatcher将请求派发到具体的线程处理,最终响应结果。

2. 源码分析

在这里插入图片描述

通过一个简单的Demo开始分析,如下:

public interface HelloService {
	R<String> say(String name);
}
public class HelloServiceImpl implements HelloService {
	@Override
	public R<String> say(String name) {
		return R.ok("hello " + name);
	}
}
复制代码

Consumer在只有接口的情况下是不能实例化的,接口对象是Dubbo帮我们生成的代理对象,这个代理对象的实现细节是怎样的呢?能否窥探一下实现类的源码呢?我这里借助Arthas工具,反编译代理类的源码,如下:

public class proxy0 implements ClassGenerator.DC, Destroyable,EchoService,HelloService {
	public static Method[] methods;
	private InvocationHandler handler;

	public R say(String string) {
		Object[] objectArray = new Object[]{string};
		Object object = this.handler.invoke(this, methods[0], objectArray);
		return (R)object;
	}

	public void $destroy() {
		Object[] objectArray = new Object[]{};
		Object object = this.handler.invoke(this, methods[1], objectArray);
	}

	public Object $echo(Object object) {
		Object[] objectArray = new Object[]{object};
		Object object2 = this.handler.invoke(this, methods[2], objectArray);
		return object2;
	}

	public proxy0() {
	}

	public proxy0(InvocationHandler invocationHandler) {
		this.handler = invocationHandler;
	}
}
复制代码

Dubbo生成的代理类自动帮我们实现了很多接口:DC接口,它是一个标记接口,仅代表它是动态生成的类。还实现了Destroyable接口,代表它是可被销毁的,它的$destroy()方法会调用对应Invoker的destroy()方法。EchoService接口用于回声测试,用来测试服务的可用性。

通过查看代理类的源码,我们发现,调用say()方法它会帮我们转交给InvocationHandler处理,这个InvocationHandler对象就是在生成代理对象时指定的,代码如下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}
复制代码

综上所述,当我们调用接口的自定义方法时,代理对象会触发InvokerInvocationHandler#invoke()方法,我们以此为入口分析Dubbo服务调用的奥秘。

2.1 发送请求

RPC调用最终就是要发送网络请求,但是在那之前,Dubbo需要做大量的前置处理,例如服务降级、拦截器、过滤器、集群容错、异步转同步等等。Dubbo在设计上,遵循了单一职责,这些功能全都通过一个个Invoker类来实现,采用装饰者模式,将最基础的DubboInvoker经过一层层的包装,最终实现这一整套复杂的功能。

2.1.1 InvokerInvocationHandler

InvokerInvocationHandler#invoke()方法里,逻辑很简单,如果调用的方法来自Object,则直接调用Invoker本身,无需触发RPC调用。如果是自定义的方法,则需要创建RpcInvocation对象,交给Invoker处理。

RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);

if (consumerModel != null) {
    rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
    rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
复制代码

2.1.2 MockClusterInvoker

该类的职责是服务降级、数据Mock。当接口调用失败时,Dubbo会尝试Mock数据并返回,适用于非关键流程。
需要注意的是,对于业务异常,Dubbo是不会Mock的,只针对非业务异常,例如超时。
具体实现就是对invoke逻辑进行try catch,如果捕获到非业务异常,则执行Mock逻辑,返回Mock数据。

2.1.3 InterceptorInvokerNode

该类的职责是实现Cluster层的拦截器,通过实现ClusterInterceptor接口,来对invoke调用做拦截。
Dubbo目前提供了两个实现类:ConsumerContextClusterInterceptor和ZoneAwareClusterInterceptor。前者用于设置和清理RpcContext,后者猜测是让Invoker具备区域感知的能力,可以优先调用同机房的服务。

Result asyncResult;
try {
    interceptor.before(next, invocation);
    asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
    if (interceptor instanceof ClusterInterceptor.Listener) {
        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
        listener.onError(e, clusterInvoker, invocation);
    }
    throw e;
} finally {
    interceptor.after(next, invocation);
}
复制代码

2.1.4 AbstractClusterInvoker

该类的职责是实现ClusterInvoker的基础逻辑,采用模板方法模式,实现一套算法骨架,子类只需要实现自己特有的逻辑。
AbstractClusterInvoker#invoke()方法首先会将RpcContext的attachments写入到RpcInvocation,然后通过Directory过滤可调用的服务列表得到一组Invoker,再初试化LoadBalance,后续负载均衡将在这一组Invoker里面选出一个最终的Invoker进行调用。

public Result invoke(final Invocation invocation) throws RpcException {
    // 确保服务没注销
    checkWhetherDestroyed();
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    // 通过Directory过滤服务列表
    List<Invoker<T>> invokers = list(invocation);
    // 初始化 负载均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
复制代码

2.1.5 FailoverClusterInvoker

Cluster是集群容错接口,默认的集群容错方案是FailoverCluster,我们这里只拿它举例。
该类的职责是实现服务调用失败重试的逻辑,doInvoke()方法首先会获取重试次数,然后利用Loadbalance进行负载均衡,选择一个最终的Invoker并对它进行调用,如果捕获到非业务异常,则利用其它Invoker做重试。

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 重试次数
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    RpcException le = null;
    // 记录已经调用过的Invoker,重试时规避
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // 记录已经调用过的Provider
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        // 负载均衡
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 服务调用
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
}
复制代码

2.1.6 ProtocolFilterWrapper

该类的职责是执行过滤器链,它是一个包装类,Dubbo SPI机制会自动把Protocol对象进行包装。
buildInvokerChain()方法用于构建FilterChain,利用SPI加载激活的的Filter,将它们编排成一个单向链表,Invoker在链表的末尾,确保在执行invoke方法前,先经过所有的Filter。

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        asyncResult = filter.invoke(next, invocation);
    } catch (Exception e) {
        throw e;
    }
}
// 代码有精简...
复制代码

2.1.7 AsyncToSyncInvoker

该类的职责是实现异步转同步的功能,Dubbo在发送网络请求后,服务端什么时候响应结果,客户端是不知道的,所以invoke调用是异步的,返回的是AsyncRpcResult,什么时候服务端返回数据了,Dubbo才会将结果写入AsyncRpcResult。

这是Dubbo底层的实现细节,但是作为开发者而言,RPC调用就应该和调用本地方法一样,必须拿到响应结果了程序才能往下走,所以才有了AsyncToSyncInvoker。它的逻辑很简单,就是调用AsyncRpcResult#get()方法阻塞当前线程,直到服务端响应结果。

@Override
public Result invoke(Invocation invocation) throws RpcException {
    // 调用,获取异步结果
    Result asyncResult = invoker.invoke(invocation);
    if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
        // 同步调用,阻塞等待结果
        asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    }
    return asyncResult;
}
复制代码

2.1.8 DubboInvoker

该类的职责是实现dubbo协议的远程调用,它是底层的Invoker,没有再包装其它Invoker了。它的逻辑也很简单,利用ExchangeClient将RpcInvocation作为参数发送到Provider,得到CompletableFuture结果,将其包装为AsyncRpcResult并返回。

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    // 轮询客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否单向发送,不期望对端响应数据
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 计算超时
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 回调线程池
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // 发送请求
            CompletableFuture<AppResponse> appResponseFuture =
                currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
复制代码

2.1.9 HeaderExchangeChannel

该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,它记录了RequestID,协议版本,请求体等信息。
它底层又会依赖Channel,这就涉及到网络传输层了,以Netty为例,最终会调用channel.writeAndFlush()将数据发送到对端。

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
复制代码

至此,服务请求就结束了,再往后就是网络传输层的逻辑,对Request对象进行编码和序列化,这里就不细说了。

2.2 处理响应

请求发送以后,客户端线程会阻塞,直到服务端响应结果。客户端是如何处理响应结果的呢?又是如何将响应结果和每次请求匹配上的呢?

2.2.1 DefaultFuture

前面已经说过,Dubbo所有的请求都会封装成Request对象,Request的构造函数里,会自动生成RequestID,这个ID是全局自增的,利用原子类AtomicLong,这就保证了同一个客户端发出的所有请求,RequestID是唯一的。服务端在响应结果时,会将这个RequestID原样写回,客户端根据服务端响应的RequestID就知道具体是哪个请求的响应结果了。

另外,客户端每次发送请求前,都会创建一个DefaultFuture对象,它继承自CompletableFuture,没有结果时调用get()方法线程会阻塞。DefaultFuture在构造函数中,会将自身放入一个全局Map中,然后客户端阻塞等待结果。服务端响应数据后,客户端根据RequestID取出对应的DefaultFuture并写入结果,客户端线程停止阻塞,程序正常运行。

当然,客户端接收到的依然是字节序列,需要解码成Response对象,Response里的data就是方法的返回值,客户端需要对其进行反序列化才能得到最终的结果,这里不细说。

3. 总结

Dubbo自动为接口生成代理对象Proxy,当触发的是自定义方法时,Proxy会转交给InvokerInvocationHandler执行,它会创建RpcInvocation对象,然后交给后续Invoker执行,这些Invoker包括:服务降级、拦截器、集群容错、Filter、异步转同步等等,最后才到具体协议对应的Invoker,例如DubboInvoker,最终构建Request对象发送网络请求。

在Dubbo设计体系里,大部分逻辑都是在客户端实现的,因此Consumer服务调用比Provider处理请求要复杂的多。Dubbo遵循了单一原则,你会看到有各种各样的Invoker类,但是每个Invoker类的职责都很简单清晰,采用装饰者模式将Invoker一层一层的包装,最终实现了一整套复杂的功能。