ETCD源码分析Client端启动流程分析

ETCD源码基于v3.5,在分析之前,需要搭建好源码分析的环境。首先,从GitHub的仓库中克隆下ETCD的源码,再利用docker搭建我们的ETCD测试集群,命令如下:

REGISTRY=quay.io/coreos/etcd
NAME_1=etcd-node-0
NAME_2=etcd-node-1
NAME_3=etcd-node-2
# IP在不同机器上不同,请查看docker的子网网段
HOST_1=172.20.0.2  
HOST_2=172.20.0.3
HOST_3=172.20.0.4
PORT_1=2379
PORT_2=12379
PORT_3=22379
PORT_C_1=2380
PORT_C_2=12380
PORT_C_3=22380
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_C_3}
# 需要保证目录存在并可写
DATA_DIR=/var/folders/

# 需要创建docker网络,用于模拟集群网络分区的情况。
docker network create etcd_cluster

docker run \
  -p $PORT_1:$PORT_1 \
  -p $PORT_C_1:$PORT_C_1 \
  --volume "${DATA_DIR}${NAME_1}:/etcd-data" \
  --name ${NAME_1} \
  --network etcd_cluster \
  ${REGISTRY}:v3.5.0 \
  /usr/local/bin/etcd \
  --name ${NAME_1} \
  --data-dir /etcd-data \
  --listen-client-urls http://0.0.0.0:$PORT_1 \
  --advertise-client-urls http://$HOST_1:$PORT_1 \
  --listen-peer-urls http://0.0.0.0:$PORT_C_1 \
  --initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \
  --initial-cluster ${CLUSTER} \
  --initial-cluster-token tkn \
  --initial-cluster-state new \
  --log-level info \
  --logger zap \
  --log-outputs stderr

docker run \
  -p $PORT_2:$PORT_2 \
  -p $PORT_C_2:$PORT_C_2 \
  --volume=${DATA_DIR}${NAME_2}:/etcd-data \
  --name ${NAME_2} \
  --network etcd_cluster \
  ${REGISTRY}:v3.5.0 \
  /usr/local/bin/etcd \
  --name ${NAME_2} \
  --data-dir /etcd-data \
  --listen-client-urls http://0.0.0.0:$PORT_2 \
  --advertise-client-urls http://$HOST_2:$PORT_2 \
  --listen-peer-urls http://0.0.0.0:$PORT_C_2 \
  --initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \
  --initial-cluster ${CLUSTER} \
  --initial-cluster-token tkn \
  --initial-cluster-state new \
  --log-level info \
  --logger zap \
  --log-outputs stderr

docker run \
  -p $PORT_3:$PORT_3 \
  -p $PORT_C_3:$PORT_C_3 \
  --volume=${DATA_DIR}${NAME_3}:/etcd-data \
  --name ${NAME_3} \
  --network etcd_cluster \
  ${REGISTRY}:v3.5.0 \
  /usr/local/bin/etcd \
  --name ${NAME_3} \
  --data-dir /etcd-data \
  --listen-client-urls http://0.0.0.0:$PORT_3 \
  --advertise-client-urls http://$HOST_3:$PORT_3 \
  --listen-peer-urls http://0.0.0.0:$PORT_C_3 \
  --initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \
  --initial-cluster ${CLUSTER} \
  --initial-cluster-token tkn \
  --initial-cluster-state new \
  --log-level info \
  --logger zap \
  --log-outputs stderr

复制代码

如上,我们创建了三个ETCD节点,组成了一个集群。接下来我们正式进入源码分析流程。

ETCD Client启动流程分析

我们先看一段启动代码样例:

        cli, err := clientv3.New(clientv3.Config{
		Endpoints:   exampleEndpoints(),
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	_, err = cli.Put(ctx, "sample_key", "sample_value")
	cancel()
	if err != nil {
		log.Fatal(err)
	}
复制代码

一个最简单的程序只需要提供集群的所有节点的ip和端口就能访问,这里需要注意的是,一定要填写ETCD集群的所有节点,这样才能有故障转移、负载均衡的特性。或者运行一个ETCD的代理节点(ETCD网关)负责转发请求,这样只填写代理节点ip即可,当然性能上会有所损失。

一、ETCD的Client启动流程分析

接下来我们看看Client是如何被创建出来的:


func newClient(cfg *Config) (*Client, error) {
    // -----A-----
	ctx, cancel := context.WithCancel(baseCtx)
	client := &Client{
		conn:     nil,
		cfg:      *cfg,
		creds:    creds,
		ctx:      ctx,
		cancel:   cancel,
		mu:       new(sync.RWMutex),
		callOpts: defaultCallOpts,
		lgMu:     new(sync.RWMutex),
	}
    // -----A-----

    // -----B-----
	client.resolver = resolver.New(cfg.Endpoints...)

	conn, err := client.dialWithBalancer()
	if err != nil {
		client.cancel()
		client.resolver.Close()
		return nil, err
	}
	client.conn = conn
	 // -----B-----

    // -----C-----
	client.Cluster = NewCluster(client)
	client.KV = NewKV(client)
	client.Lease = NewLease(client)
	client.Watcher = NewWatcher(client)
	client.Auth = NewAuth(client)
	client.Maintenance = NewMaintenance(client)
	
	...
    // -----C-----
    
	return client, nil
}

复制代码
A段代码分析

首先来看第A段代码,其主要是初始化了一个client的实例,并把Config结构体传递给它,那么Config中包含了什么配置项呢?

type Config struct {
	// ETCD服务器地址,注意需要提供ETCD集群所有节点的ip
	Endpoints []string `json:"endpoints"`

	// 设置了此间隔时间,每 AutoSyncInterval 时间ETCD客户端都会
	// 自动向ETCD服务端请求最新的ETCD集群的所有节点列表
	// 
	// 默认为0,即不请求
	AutoSyncInterval time.Duration `json:"auto-sync-interval"`

	// 建立底层的GRPC连接的超时时间
	DialTimeout time.Duration `json:"dial-timeout"`

	// 这个配置和下面的 DialKeepAliveTimeoutt
	// 都是用来打开GRPC提供的 KeepAlive
	// 功能,作用主要是保持底层TCP连接的有效性,
	// 及时发现连接断开的异常。
	// 
	// 默认不打开 keepalive
	DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`

	// 客户端发送 keepalive 的 ping 后,等待服务端的 ping ack 包的时长
	// 超过此时长会报 `translation is closed`
	DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
		
	// 也是 keepalive 中的设置,
	// true则表示无论有没有活跃的GRPC连接,都执行ping
	// false的话,没有活跃的连接也就不会发送ping。
	PermitWithoutStream bool `json:"permit-without-stream"`

	// 最大可发送字节数,默认为2MB
	// 也就是说,我们ETCD的一条KV记录最大不能超过2MB,
	// 如果要设置超过2MB的KV值,
	// 只修改这个配置也是无效的,因为ETCD服务端那边的限制也是2MB。
        // 需要先修改ETCD服务端启动参数:`--max-request-bytes`,再修改此值。
	MaxCallSendMsgSize int

        // 最大可接收的字节数,默认为`Int.MaxInt32`
        // 一般不需要改动
	MaxCallRecvMsgSize int

	// HTTPS证书配置
	TLS *tls.Config
	
	// 上下文,一般用于取消操作
	ctx.Context

	// 设置此值,会拒绝连接到低版本的ETCD
	// 什么是低版本呢?
	// 写死了,小于v3.2的版本都是低版本。
	RejectOldCluster bool `json:"reject-old-cluster"`

	// GRPC 的连接配置,具体可参考GRPC文档
	DialOptions []grpc.DialOption

	// zap包的Logger配置 
	// ETCD用的日志包就是zap
	Logger *zap.Logger
	LogConfig *zap.Config
	
	...
}
复制代码

还有一些常用配置项,比较简单,这里就不再列出了。

B段代码分析

本段是整个代码的核心部分,主要做了两件事:

  1. 创建了 resolver 用于解析ETCD服务的地址
    resolver(解析器)其实是grpc中的概念,比如:DNS解析器,域名转化为真实的ip;服务注册中心,也是一种把服务名转化为真实ip的解析服务。

    具体的概念就不展开了,如果对grpc这方面比较感兴趣,文末会推荐一个讲的很好的grpc源码分析博客。

    总之,etcd自己写了一个解析器,就在resolver包里,这个解析器提供了以下几个功能:

    1. 把Endpoints里的ETCD服务器地址传给grpc框架,这里,因为ETCD自己实现的解析器不支持DNS解析,所以Endpoints只能是ip地址或者unix套接字。
    2. 告诉grpc,如果Endpoints有多个,负载均衡的策略是轮询,这点很重要。
  2. dialWithBalancer() 建立了到ETCD的服务端链接

func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
	creds := c.credentialsForEndpoint(c.Endpoints()[0])
	opts := append(dopts, grpc.WithResolvers(c.resolver))
	return c.dial(creds, opts...)
}
复制代码

这个用于建立到ETCD服务端的连接的方法名很有意思,虽然叫dialWithBalancer但内部代码很简单,可以看到里面并无Balancer(负载均衡器)的出现。但其实因为上面说到,ETCD使用了自己的resolver,其内部已经写好了负载均衡策略:round_robin。所以这里通过grpc.WithResolvers()把resolver传进去,也是达到了负载均衡的效果。

接下来进入dial(),这个方法虽然有些长,但整体逻辑是非常清晰的,省略无关代码后,其内部是做了以下几件事:

func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
	// 首先,ETCD通过这行代码,向GRPC框架加入了一些自己的
    // 配置,比如:KeepAlive特性(配置里提到的配置项)、
    // TLS证书配置、还有最重要的重试策略。
	opts, err := c.dialSetupOpts(creds, dopts...)
    ...
    
    // context 的一段经典样例代码
    // 问:如果我同时把非零的DialTimeout和
    // 带超时的 context 传给客户端,
    // 到底以哪个超时为准?
    // 答: 这里新建了子context(dctx),父context和DialTimeout
    // 哪个先到deadline,就以哪个为准。
	dctx := c.ctx
	if c.cfg.DialTimeout > 0 {
		var cancel context.CancelFunc
		// 同时包含父context和DialTimeout
		// 哪个先倒时间就以哪个为准。
		dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
		defer cancel() 
	}

    // 最终调用grpc.DialContext()建立连接
	conn, err := grpc.DialContext(dctx, target, opts...)
	...
	return conn, nil
}
复制代码
C段代码分析

C段代码无非就是做一些功能接口的初始化,比如:KV接口(用于提供Put、Get等)、Wathcer接口(用于监听Key)等,具体如何初始化到分析各接口再讲。

再回到启动流程,初始化功能完毕后,就是getToken了,这个token是我们开启了ETCD的账号密码功能后,通过账号密码获取到了token,然后才能访问ETCD提供的GRPC接口。

然后是提供 RejectOldCluster 和 autoSync 功能,这个在介绍Config时也提过,这里就不再赘述了。

ETCD Client重试策略分析

对ETCD客户端提供的自动重试策略的分析,是本文的重点。自动重试是ETCD能提供高可用特性的重要保证,在往下分析之前,一定要记住以下两个概念:

  1. 自动重试不会在ETCD集群的同一节点上进行,这跟我们平常做的重试不同,因为前面说了ETCD是通过GRPC框架提供对集群访问的负载均衡策略的,所以会轮询的重试集群的每个节点。
  2. 自动重试只会重试一些特定的错误,比如:codes.Unavailable

接下来,就让我们来看看ETCD是如何利用GRPC提供的拦截器做自动重试的,学会这个,我们也能在自己的GRPC项目中用上同样的套路:

    // 这段代码在dialWithBalancer->dial->dialSetupOpts中
    rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
	
	opts = append(opts,
		grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
		grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
	)
	
复制代码

看以上的代码,要自动重试只需两步:

  1. 创建backoff函数,也就是计算重试等待时间的函数。
  2. 通过WithXXXInterceptor(),注册重试拦截器,这样每次GRPC有请求都会回调该拦截器。

这里,grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),我们看到Stream的重试拦截器,其最大重试次数设置为了0(withMax()),也就是不重试,这其实是故意为之,因为Client端的Stream重试不被支持。(Client端需要重试Stream,需要自己做单独处理,不能通过拦截器。)

那我们首先看看如何计算等待时间:

// waitBetween 重试间隔时长
// jitterFraction 随机抖动率,
// 比如:默认重试间隔为25ms,抖动率:0.1,
// 那么实际重试间隔就在 25土2.5ms 之间。
// attempt 实际重试了多少次
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration,
    jitterFraction float64) backoffFunc {
    
	return func(attempt uint) time.Duration {
		n := uint(len(c.Endpoints()))
		quorum := (n/2 + 1)
		if attempt%quorum == 0 {
			c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
			return jitterUp(waitBetween, jitterFraction)
		}
		c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
		return 0
	}
}
复制代码

可以看到roundRobinQuorumBackoff返回了一个闭包,内部是重试间隔时长计算逻辑,这个逻辑说来也简单:

    1. 若重试次数已经达到集群的法定人数(quorum),则真正的计算间隔时长,
       间隔时长到期后,才进行重试。
    2. 否则,直接返回0,也就是马上重试。
复制代码

还记得刚才说的必须记住的两个概念吗?其中一点就是负载均衡策略写死是轮询,而这个重试逻辑一定要配合负载均衡是轮询策略,达到的效果是:假如你访问集群中的一台节点失败,可能是那台节点出问题了,但如果整个集群是好的,这时候马上重试,轮询到下台节点就行。

但是,如果重试多次,集群大多数节点(法定人数)都失败了,那应该是集群出问题了,这时候就需要计算间隔时间,等会儿再重试看看问题能不能解决。

这里也可以看到ETCD的Client端,考虑的细节问题是非常多的,一个简单的重试时间计算,也能进行逻辑上的小小优化。

那么重试拦截器又是如何实现的呢?接着看拦截器的相关代码:

func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
		... 
		// 如果最大重试次数设置为0,那就不重试。
		if callOpts.max == 0 {
			return invoker(ctx, method, req, reply, cc, grpcOpts...)
		}
		var lastErr error
		// 开始重试计数
		for attempt := uint(0); attempt < callOpts.max; attempt++ {
		    // 计算重试间隔时间,并阻塞代码,等待
		    // 这里最终会调用到 roundRobinQuorumBackoff 来计算时间
			if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
				return err
			}
		
		    // 再次重新执行GRPC请求
			lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
			if lastErr == nil {
			    // 重试成功,退出
				return nil
			}
			
			// 这段代码分析了两种情况
			// 1. 服务端返回了 Context Error(超时、被取消),直接重试
			// 2. 客户端的 Context 也出现了Error
			if isContextError(lastErr) {
				if ctx.Err() != nil {
					// 客户端本身的ctx也报错了,不重试了,退出。
					return lastErr
				}
				// 服务端返回,直接重试
				continue
			}
			
			if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
				// 是AuthToken不正确,重新获取Token
				gterr := c.getToken(ctx)
				...
				continue
			}
			// 只有在特定错误才重试(code.Unavailable)
			// 否则返回Err,不重试。
			if !isSafeRetry(c.lg, lastErr, callOpts) {
				return lastErr
			}
		}
		return lastErr
	}
}
复制代码

代码做了一定程度的精简,但是主要流程都是保留的。

由此,ETCD的整体重试流程也介绍完毕了。

总结

通过对ETCD整个启动流程的代码分析,我们可以总结出以下几点:

1. Endpoints 用来做负载均衡和重试策略计算法定人数,一定要填写集群的全部节点,
或者打开AutoSync功能。

2. ETCD 自己编写了GRPC的resolver和balancer,可以借鉴到GRPC的相关项目中去。
resolver只能解析ip和unix套接字,balancer策略写死是轮询策略。

3. ETCD 重试流程只重试部分错误,所以不要完全指望ETCD的自动重试,一定要自己做好错误处理。

复制代码

启动流程图,其中列出的函数就是整个启动流程上的重要函数:

Config

New

newClient

resolver.New

dialWithBanlancer

dial

grpc.DialContext

最后,本文涉及到一些GRPC的基础知识,不了解的小伙伴可以去(blog.csdn.net/u011582922/… )这里看看,讲的很详细。