这是我参与8月更文挑战的第20天,活动详情查看:8月更文挑战
前言
现在网上有很多使用Netty构建RPC框架的例子。为什么我在这里写一篇文章来讨论它?我很清楚,我可能没有把它们写得那么好。我们写它有两个原因:
- 首先,因为在学习了Netty之后,我们需要不断的练习才能更好的掌握Netty的使用。显然,基于 Netty 实现 RPC 框架是一个很好的实践。
- 其次,因为市面上有很多RPC框架,比如Dubbo,都是框架通信底层的Netty,所以通过这个例子我们也可以更好的体验RPC的设计。
接下来,我将从以下几点来讲解如何基于Netty实现一个简单的RPC框架:
- 什么是RPC?
- 实现RPC框架需要注意哪些方面?
- 如何使用 Netty?
什么是RPC?
RPC,Remote Procedure Call,可以像本地调用一样调用远程服务。它是进程间通信的一种方式。每个人都必须清楚这个概念。你可以看RPC文章,后来发现其实我们可以换一种方式来思考RPC,即从本地调用开始,再派生出RPC调用。
1.本地函数调用
局部函数对我们来说很常见,比如下面的例子:
public String sayHello(String name) { return "hello, " + name; } 复制代码
我们只需要传入一个参数,调用sayHello方法就可以得到一个输出,即输入参数->方法体->输出。输入参数、输出参数和方法体都在同一个进程空间中。这是本地函数调用。
2. 套接字通信
有没有办法在不同的进程之间进行通信?调用者在进程A中,需要调用方法A,而方法A在进程B中。
最容易想到的方法是使用Socket通信,使用Socket可以完成跨进程调用,我们需要约定一个进程通信协议来进行参数、调用函数、参数。写Socket应该知道Socket是一种比较原始的方式,我们需要多注意一些细节,比如参数和函数需要转换成字节流进行网络传输,也就是序列化操作,然后反序列化;使用socket进行底层通信,代码编程更容易出错。
如果一个调用者需要关注这么多问题,那无疑是一场灾难。那么有没有什么简单的方法可以让我们的调用者不需要关注细节,让调用者可以调用本地函数,也可以只调用参数,调用方法,等待返回结果呢?
3.RPC框架
RPC框架就是用来解决上述问题的。它使调用者能够调用远程服务,例如调用本地函数。底层通信细节对调用者是透明的。它屏蔽了各种复杂性,并为呼叫者提供了终极体验。
需要解决 RPC 调用的哪些方面
如前所述,RPC 框架允许调用者像调用本地函数一样调用远程服务。那你怎么做呢?
在使用时,调用者直接调用本地函数并传入相应的参数。它不需要处理其他细节。至于通信细节,交给RPC框架来实现。实际上,RPC 框架采用代理类的方式,特别是动态代理。它在运行时动态创建新的类,即代理类,并在该类中实现通信细节,如参数序列化等。
当然,不仅仅是序列化,我们还需要约定一个双方通信的协议格式,指定协议格式,比如请求参数的数据类型,请求的参数,方法名等请求等,这样就可以按照格式序列化后进行网络传输,然后服务器接收到请求对象,按照指定的格式进行解码,让服务器知道具体的音调。通过哪种方法,会传递哪些参数?
刚才提到网络传输,RPC框架的一个重要部分就是网络传输。服务部署在不同的主机上。如何高效地进行网络传输,尽可能不丢包,保证数据完整准确的快速传输?事实上,它使用了我们今天的主角 Netty 作为一个高性能的网络通信框架,足以完成我们的任务。
前面说了这么多,接下来的RPC框架应该重点关注哪些点呢?
- 代理(动态代理)
- 通讯协议
- 序列化
- 网络传输
当然,一个好的RPC框架需要注意的不止以上几点,但本文旨在做一个简单的RPC框架,了解以上要点就足够了。
基于Netty的RPC框架的实现
最后,本文的重点是根据需要注意的几个关键点(代理、序列化、协议、编解码)使用Netty来一一实现RPC。
1. 协议
首先,我们需要确定通信双方的协议格式、请求对象和响应对象。
请求对象:
@Data
@ToString
public class RpcRequest {
/**
* ID of Request Object
*/
private String requestId;
/**
* Class name
*/
private String className;
/**
* Method name
*/
private String methodName;
/**
* Parameter type
*/
private Class<?>[] parameterTypes;
/**
* Participation
*/
private Object[] parameters;
}
复制代码
- 客户端使用请求对象的 ID 来验证服务器请求和响应是否匹配。
响应对象:
@Data
public class RpcResponse {
/**
* Response ID
*/
private String requestId;
/**
* error message
*/
private String error;
/**
* Return results
*/
private Object result;
}
复制代码
2.序列化
市面上的序列化协议很多,比如jdk、protobuf、kyro、Google的Hessian等,只要不选择JDK的序列化方式(因为它的性能太差,序列化产生的流太大),也可以使用其他方法。为方便起见,选择JSON作为序列化协议,使用fast JSON作为JSON框架。
为方便后续扩展,定义了序列化接口:
public interface Serializer {
/**
* java Object to binary
*
* @param object
* @return
*/
byte[] serialize(Object object) throws IOException;
/**
* Binary conversion to java objects
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
<T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException;
}
复制代码
因为我们采用的是JSON方式,所以定义了JSONSerializer的实现类:
public class JSONSerializer implements Serializer{ @Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); } } 复制代码
如果以后要使用其他序列化方法,可以自行实现序列化接口。
3. 编解码器
协议格式和序列化方式约定好后,我们还需要编解码器。编码器将请求对象转换为适合传输的格式(通常为字节流),相应的解码器则是将网络字节流转换为应用程序的消息格式。
编码器实现:
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clazz;
private Serializer serializer;
public RpcEncoder(Class<?> clazz, Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
if (clazz != null && clazz.isInstance(msg)) {
byte[] bytes = serializer.serialize(msg);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
}
}
复制代码
解码器实现:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clazz;
private Serializer serializer;
public RpcDecoder(Class<?> clazz, Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//Because before encoding, write a Int type, 4 bytes to indicate the length.
if (byteBuf.readableBytes() < 4) {
return;
}
//Mark the current reading position
byteBuf.markReaderIndex();
int dataLength = byteBuf.readInt();
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
//Read the data in byteBuf into the data byte array
byteBuf.readBytes(data);
Object obj = serializer.deserialize(clazz, data);
list.add(obj);
}
}
复制代码
4.Netty客户端
下面我们来看看Netty客户端是如何实现的,也就是如何使用Netty打开客户端。
其实熟悉Netty的朋友应该都知道,我们需要注意以下几点:
- 写一个启动方法指定传输使用Channel
- 指定ChannelHandler在网络传输中读写数据
- 添加编解码器
- 添加失败重试机制
- 添加发送请求消息的方法
我们来看一下具体的实现代码:
@Slf4j
public class NettyClient {
private EventLoopGroup eventLoopGroup;
private Channel channel;
private ClientHandler clientHandler;
private String host;
private Integer port;
private static final int MAX_RETRY = 5;
public NettyClient(String host, Integer port) {
this.host = host;
this.port = port;
}
public void connect() {
clientHandler = new ClientHandler();
eventLoopGroup = new NioEventLoopGroup();
//Startup class
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
//Channel for specified transmission
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//Add encoder
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
//Add Decoder
pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
//Request Processing Class
pipeline.addLast(clientHandler);
}
});
connect(bootstrap, host, port, MAX_RETRY);
}
/**
* Failure reconnection mechanism, refer to Netty's entry actual gold digging Brochure
*
* @param bootstrap
* @param host
* @param port
* @param retry
*/
private void connect(Bootstrap bootstrap, String host, int port, int retry) {
ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
log.info("Successful connection to server");
} else if (retry == 0) {
log.error("The number of retries has been exhausted and the connection has been abandoned");
} else {
//The number of reconnections:
int order = (MAX_RETRY - retry) + 1;
//The interval between reconnections
int delay = 1 << order;
log.error("{} : Connection failed, para. {} Reconnect....", new Date(), order);
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
});
channel = channelFuture.channel();
}
/**
* send message
*
* @param request
* @return
*/
public RpcResponse send(final RpcRequest request) {
try {
channel.writeAndFlush(request).await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return clientHandler.getRpcResponse(request.getRequestId());
}
@PreDestroy
public void close() {
eventLoopGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
}
}
复制代码
我们对数据处理的重点是ClientHandler类,它继承了ChannelDuplexHandler类,可以处理出站和入站数据。
public class ClientHandler extends ChannelDuplexHandler { /** * Maintaining mapping relationship between request object ID and response result Future using Map */ private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcResponse) { //Get the response object RpcResponse response = (RpcResponse) msg; DefaultFuture defaultFuture = futureMap.get(response.getRequestId()); //Write the result to DefaultFuture defaultFuture.setResponse(response); } super.channelRead(ctx,msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof RpcRequest) { RpcRequest request = (RpcRequest) msg; //Before sending the request object, save the request ID and construct a mapping relationship with the response Future. futureMap.putIfAbsent(request.getRequestId(), new DefaultFuture()); } super.write(ctx, msg, promise); } /** * Get response results * * @param requsetId * @return */ public RpcResponse getRpcResponse(String requsetId) { try { DefaultFuture future = futureMap.get(requsetId); return future.getRpcResponse(5000); } finally { //After success, remove from map futureMap.remove(requsetId); } } } 复制代码
从上面的实现可以看出,我们定义了一个Map来维护请求ID和响应结果的映射关系。目的是验证服务器响应是否与请求匹配。因为 Netty 通道可能被多个线程使用,当结果返回时,你不知道从哪个线程返回,所以需要一个映射关系。
我们的结果封装在 DefaultFuture 中,因为 Netty 是一个异步框架,所有的返回都是基于 Future 和 Callback 机制。这里我们自定义Future来实现客户端“异步调用”
public class DefaultFuture { private RpcResponse rpcResponse; private volatile boolean isSucceed = false; private final Object object = new Object(); public RpcResponse getRpcResponse(int timeout) { synchronized (object) { while (!isSucceed) { try { object.wait(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } return rpcResponse; } } public void setResponse(RpcResponse response) { if (isSucceed) { return; } synchronized (object) { this.rpcResponse = response; this.isSucceed = true; object.notify(); } } } 复制代码
- 实际上,使用了等待和通知机制,并由布尔变量辅助。
5. Netty 服务器
Netty server 的实现与client 类似,但需要注意的是,在对请求进行解码时,需要通过proxy 调用本地函数。以下是服务器端代码:
public class NettyServer implements InitializingBean { private EventLoopGroup boss = null; private EventLoopGroup worker = null; @Autowired private ServerHandler serverHandler; @Override public void afterPropertiesSet() throws Exception { //zookeeper is used here as the registry, which is not covered in this article and can be ignored. ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181"); start(registry); } public void start(ServiceRegistry registry) throws Exception { //Thread pool responsible for handling client connections boss = new NioEventLoopGroup(); //Thread pool for read and write operations worker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //Add Decoder pipeline.addLast(new RpcEncoder(RpcResponse.class, new JSONSerializer())); //Add encoder pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer())); //Adding Request Processor pipeline.addLast(serverHandler); } }); bind(serverBootstrap, 8888); } /** * If port binding fails, port number + 1, rebind * * @param serverBootstrap * @param port */ public void bind(final ServerBootstrap serverBootstrap,int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { log.info("port[ {} ] Binding success",port); } else { log.error("port[ {} ] Binding failed", port); bind(serverBootstrap, port + 1); } }); } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); log.info("Close Netty"); } } 复制代码
下面是处理读写操作的Handler类:
@Component @Slf4j @ChannelHandler.Sharable public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware { private ApplicationContext applicationContext; @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setRequestId(msg.getRequestId()); try { Object handler = handler(msg); log.info("Get the return result: {} ", handler); rpcResponse.setResult(handler); } catch (Throwable throwable) { rpcResponse.setError(throwable.toString()); throwable.printStackTrace(); } ctx.writeAndFlush(rpcResponse); } /** * Server uses proxy to process requests * * @param request * @return */ private Object handler(RpcRequest request) throws ClassNotFoundException, InvocationTargetException { //Use Class.forName to load Class files Class<?> clazz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clazz); log.info("serviceBean: {}",serviceBean); Class<?> serviceClass = serviceBean.getClass(); log.info("serverClass:{}",serviceClass); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //Using CGLIB Reflect FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); log.info("Start calling CGLIB Dynamic Proxy Execution Server-side Method..."); return fastMethod.invoke(serviceBean, parameters); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } } 复制代码
6. 客户端代理
客户端使用 Java 动态代理来实现代理类中的通信细节。众所周知,Java动态代理需要实现InvocationHandler接口。
@Slf4j public class RpcClientDynamicProxy<T> implements InvocationHandler { private Class<T> clazz; public RpcClientDynamicProxy(Class<T> clazz) throws Exception { this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args); log.info("Request content: {}",request); //Open Netty client and connect directly NettyClient nettyClient = new NettyClient("127.0.0.1", 8888); log.info("Start connecting to the server:{}",new Date()); nettyClient.connect(); RpcResponse send = nettyClient.send(request); log.info("The request call returns the result:{}", send.getResult()); return send.getResult(); } } 复制代码
- 在invoke方法中封装请求对象,构造NettyClient对象,打开客户端并发送请求消息
代理工厂如下:
public class ProxyFactory { public static <T> T create(Class<T> interfaceClass) throws Exception { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass}, new RpcClientDynamicProxy<T>(interfaceClass)); } } 复制代码
- 通过 Proxy.newProxyInstance 为接口创建代理类
7.RPC远程调用测试
应用程序接口:
public interface HelloService { String hello(String name); } 复制代码
- 准备测试 API 接口
客户:
@SpringBootApplication @Slf4j public class ClientApplication { public static void main(String[] args) throws Exception { SpringApplication.run(ClientApplication.class, args); HelloService helloService = ProxyFactory.create(HelloService.class); log.info("Response results“: {}",helloService.hello("pjmike")); } } 复制代码
- 客户端调用接口的方法
服务器:
//Server-side implementation @Service public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "hello, " + name; } } 复制代码
运行结果:
概括
最重要的是,我们基于Netty实现了一个非非常简单的RPC框架,离成熟的RPC框架还很远,甚至基本的registry都没有实现。但是通过这次的实践,我可以说对RPC有了更深入的了解,明白了一个RPC框架需要注意哪些方面。当我们以后使用成熟的RPC框架,比如Dubbo的时候,要能够了解它的底层,就是使用Netty作为基础的通信框架。未来,深入挖掘开源 RPC 框架源代码将相对容易。




近期评论