这是我参与8月更文挑战的第19天,活动详情查看: 8月更文挑战
本文使用源码地址:simple-rpc
分布式服务框架部署在多台不同的机器上,如下图所示:
这将面临如下问题需要解决:
- 集群A中的服务调用者如何发现集群B中的服务提供者。
- 集群A中的服务调用者如何选择集群B中的某一台服务提供者机器发起调用。
- 集群B中的服务某台提供者机器下线后,集群A中的服务调用者如何感知到这台机器的下线,不再对已下线的机器发起调用。
- 集群B提供的某个服务如何获知集群A中的那些机器正在消费改服务。
以上问题都通过注册中心来解决,我们采用服务注册中心来实时存储更新服务提供者信息及该服务的实时调用者信息。
服务注册中心有如下优点:
- 软负载及透明化路由:服务提供者和服务调用者之间互相解耦,服务调用者不需要硬编码服务提供者地址。
- 服务动态发现及可伸缩扩展能力:服务提供者机器增减能被服务调用者通过注册中心动态感知,而且通过增减机制可以实现服务的弹性伸缩。
- 通过注册中心可以动态监控服务运行质量及服务依赖,为服务提供服务治理能力。
Zookeeper实现服务注册中心
如何部署Zookeeper可以参考使用 Docker 一步搞定 ZooKeeper 集群的搭建。
ZkClient
ZkClient是一个开源的ZooKeeper客户端,是在原生的ZooKeeper API接口之上进行包装,是一个更易使用的ZooKeeper客户端。ZkClient在内部实现了Session超时重连、Watcher反复注册等功能,使得ZooKeeper客户端的繁琐细节对开发人员透明。如何使用可以参考zookeeper之ZkClient。
服务注册中心实现
目标:
- 服务端服务启动时将服务提供者信息(主机IP地址、服务端口、服务接口类全限类名等)组成znode作为临时节点写入Zookeeper叶子节点,这样就完成了服务的注册动作。
- 服务的消费端在发起服务调用之前,先连接到Zookeeper,对服务提供者节点路径注册监听器,同时获取服务提供者信息到本地缓存。
- 服务注册中心能够感知服务提供者集群中某一台机器下线,将该机器服务提供者信息从服务注册中心删除,并主动通知服务调用者集群中的每一台机器,使得服务调用者不再调用该机器。
- 可以通过服务注册中心收集服务消费者信息。
主要方法
public interface IRegisterCenter {
/**
* 注册服务提供者信息
* @param providers 服务提供者
*/
void registerProvider(List<Provider> providers);
/**
* 获取服务提供者列表
* key:接口
* @return
*/
Map<String, List<Provider>> getProvidersMap();
/**
* 销毁
* @param serviceItfName 接口名称
*/
void destroy(String serviceItfName);
/**
* 消费端初始化本地服务缓存
* @param remoteAppKey 服务提供者唯一标识
* @param groupName 服务组名
*/
void loadProviderMap(String remoteAppKey, String groupName);
/**
* 获取服务提供者信息
* @return
*/
Map<String, List<Provider>> getServiceMetadata();
/**
* 注册服务消费者信息 用于监控
* @param invoker
*/
void registerInvoker(Invoker invoker);
}
复制代码
整个实现比较长,我们分段来看,服务提供者和服务消费者注册比较类似就不看了,销毁操作也只是本地缓存的清楚,所以我们只看两个最重要的方法void registerProvider(List<Provider> providers)
和void loadProviderMap(String remoteAppKey, String groupName)
。
首先看一下相关变量设置:
private volatile ZkClient zkClient = null;
/**
* zk 服务列表 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
*/
private static final String ZK_SERVERS = SimpleRpcPropertiesUtil.getZkServers();
/**
* zk会话超时时间
*/
private static final int ZK_SESSION_TIMEOUT = SimpleRpcPropertiesUtil.getSessionTimeout();
/**
* zk 连接超时时间
*/
private static final int ZK_CONNECT_TIMEOUT = SimpleRpcPropertiesUtil.getConnectionTimeout();
/**
* zk 根目录
*/
private static final String ROOT_PATH = "/config_register";
private static final String PROVIDER_TYPE = "/provider";
private static final String INVOKER_TYPE = "/invoker";
private static final int SERVER_PORT = SimpleRpcPropertiesUtil.getServerPort();
private static final String LOCAL_IP = IpUtil.getLocalIP();
/**
* 服务提供者列表
* key:interface name
* value: List<Provider> (methods)
*/
private static final Map<String, List<Provider>> PROVIDER_MAP = Maps.newConcurrentMap();
/**
* 客户端引入的服务列表,启动时加入本地缓存
* key:interface name
*/
private static final Map<String, List<Provider>> SERVICE_METADATA = Maps.newConcurrentMap();
复制代码
最重要肯定还是PROVIDER_MAP
和SERVICE_METADATA
,使用ConcurrentHashMap
来保存服务提供者和客户端引入的服务列表保证线程安全,保存的key
为服务的全限类名,value
则是服务的具体信息(目前的服务发现还是类级别,一个类下有多个方法,就代表有多个服务)。
接着我们继续看两个重要方法的实现。
registerProvider-完成服务注册
这个方法看名字也知道是干什么的,这是执行服务注册的方法。我们看下具体执行步骤如下:
具体代码如下:
@Override
public void registerProvider(List<Provider> providers) {
if (CollectionUtils.isEmpty(providers)) {
log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", providers);
} else {
synchronized (RegisterCenterImpl.class) {
// 加载zkClient
this.lazyInitZkClient();
// 设置本地服务列表
this.setLocalCache(providers);
String rootNode = ROOT_PATH + UrlConstants.SLASH + providers.get(0).getAppKey();
// 创建根节点
this.createRootNode(rootNode);
for (Map.Entry<String, List<Provider>> entry : PROVIDER_MAP.entrySet()) {
// 服务接口类路径
String serviceItfName = entry.getKey();
Provider provider = entry.getValue().get(0);
// 服务组名
String groupName = entry.getValue().get(0).getGroupName();
// 创建服务提供者节点
String servicePath = rootNode + UrlConstants.SLASH + groupName + UrlConstants.SLASH + serviceItfName + PROVIDER_TYPE;
this.createServiceNode(servicePath);
// 创建当前服务器临时节点
String currentServiceIpNode = servicePath + UrlConstants.SLASH + LOCAL_IP + UrlConstants.VERTICAL_LINE
+ SERVER_PORT + UrlConstants.VERTICAL_LINE + provider.getWeight() + UrlConstants.VERTICAL_LINE
+ provider.getWorkerThreads() + UrlConstants.VERTICAL_LINE + groupName;
this.createCurrentServiceIpNode(currentServiceIpNode);
log.debug("create current service node success, node path = {}", currentServiceIpNode);
// 监听本地服务变化,加入本地缓存
this.subscribeChildChanges(servicePath, PROVIDER_MAP);
}
}
}
}
复制代码
整个逻辑还是较为简单的,本质上就是将服务发现(这个后面会说)中获取到的服务列表List<Provider>
封装成znode
,作为临时节点(Zookeeper会监听服务端的存活,一旦服务端下线,该临时节点会被自动删除,同时推送给服务消费端,从而达到服务提供者自动下线的母的)保存到zookeeper
中,并且注册监听器监听服务变化的过程。这个过程中可能有一些特殊的处理,如锁、懒加载等也都是为了安全和性能上的考量。
loadProviderMap-加载服务列表
具体代码如下:
@Override
public void loadProviderMap(String remoteAppKey, String groupName) {
if (MapUtils.isEmpty(SERVICE_METADATA)) {
SERVICE_METADATA.putAll(fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
}
}
/**
* 获取或更新服务元数据信息
* @param remoteAppKey 服务提供者appkey
* @param groupName 服务组名
* @return
*/
private Map<String, List<Provider>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
final Map<String, List<Provider>> providerServiceMap = Maps.newHashMap();
// 连接ZK
this.lazyInitZkClient();
// 服务提供者节点
String providerNode = ROOT_PATH + UrlConstants.SLASH + remoteAppKey + UrlConstants.SLASH + groupName;
// 从ZK获取方服务提供者列表
List<String> providerServices = zkClient.getChildren(providerNode);
for (String serviceName : providerServices) {
String servicePath = providerNode + UrlConstants.SLASH + serviceName + PROVIDER_TYPE;
List<String> nodeList = zkClient.getChildren(servicePath);
log.info("get zk nodeList={} from path={}", nodeList, servicePath);
for (String node : nodeList) {
// 封装服务信息 : ip|port|weight|workerThreads|groupName
String[] serverAddress = StringUtils.split(node, UrlConstants.VERTICAL_LINE);
if (ArrayUtils.isNotEmpty(serverAddress)) {
String serverIp = serverAddress[0];
int serverPort = Integer.parseInt(serverAddress[1]);
int weight = Integer.parseInt(serverAddress[2]);
int workerThreads = Integer.parseInt(serverAddress[3]);
String group = serverAddress[4];
List<Provider> providerList = providerServiceMap.get(serviceName);
if (providerList == null) {
providerList = new ArrayList<>();
}
Provider provider = Provider.builder().serverIp(serverIp).serverPort(serverPort).weight(weight)
.workerThreads(workerThreads).groupName(group).build();
try {
provider.setServiceItf(ClassUtils.getClass(serviceName));
} catch (Exception e) {
log.error("get service interface class error", e);
throw new SRpcException("get service interface class error", e);
}
providerList.add(provider);
providerServiceMap.put(serviceName, providerList);
} else {
log.debug("illegal service address, ignore it");
}
}
// 监听远程服务变化,加入本地缓存
this.subscribeChildChanges(servicePath, SERVICE_METADATA);
}
return providerServiceMap;
}
复制代码
加载服务列表是在服务引入时的重要代码,它根据appKey
和groupName
两个信息加载所有服务到本地缓存,并且注册监听器监听远程服务变化,以此保证服务能够正常使用。
小结
注册中心是整个分布式服务框架中的核心功能,它为在分布式服务中的服务质量提供了保障,并且为服务监控和服务治理提供了入口。
近期评论