grpc流服务实现传输文件及源码解析|JavaDebu

本文正在参加「Java主题月 - Java Debug笔记活动」,详情查看 活动链接

前言

由于需要传送200M的压缩包文件,之前的同步存根Stub无法满足需求。再经过调研后,发现客户端流能够很好的解决这个问题。
注:流服务本质上就是通过异步存根Stub来实现的,具体到服务端和客户端只需实现观察者的接口来处理业务逻辑即可


一、grpc是什么?

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

在这里插入图片描述

二、简单的grpc接口

一个grpc接口包括以下几个部分
注:由于不需要流,所以此处使用的为同步阻塞存根

1.定义proto文件

service RouteGuide {
  // 一个简单的rpc接口
  rpc GetFeature(Point) returns (Feature) {}
}
// 消息定义
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
message Feature {
  string name = 1;

  Point location = 2;
}
复制代码

2.服务端代码

public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
            responseObserver.onNext(checkFeature(request));
            responseObserver.onCompleted();
        }
复制代码

3.客户端代码

public void getFeature(int lat, int lon) {
        Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();

        Feature feature = blockingStub.getFeature(request);
        // 对返回数据进行校验
        if (RouteGuideUtil.exists(feature)) {
            info("Found feature called \"{0}\" at {1}, {2}",
                    feature.getName(),
                    RouteGuideUtil.getLatitude(feature.getLocation()),
                    RouteGuideUtil.getLongitude(feature.getLocation()));
        } else {
            info("Found no feature at {0}, {1}",
                    RouteGuideUtil.getLatitude(feature.getLocation()),
                    RouteGuideUtil.getLongitude(feature.getLocation()));
        }
    }
复制代码

三、流服务接口

此处使用为客户端流来实现文件传输的

1.proto文件

service RouteGuide {
  // 客户端文件流例子
  rpc sendFile(stream FileInfo) returns (Info) {}
}
message FileInfo {
    int32 index = 1;

    bytes arrs = 2;
}

message Info {
    string msg = 1;
}
复制代码

2.服务端代码

		// 测试文件流
        @Override
        public StreamObserver<FileInfo> sendFile(StreamObserver<Info> responseObserver) {

            try {
                return new StreamObserver<FileInfo>() {
                    final long startTime = System.nanoTime();
                    OutputStream os = new FileOutputStream(new File(System.currentTimeMillis() + ".zip"));
                    @Override
                    public void onNext(FileInfo fileInfo) {
                        try {
                            logger.log(Level.INFO, "接收到文件流");
                            fileInfo.getArrs().writeTo(os);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("发生错误");
                        logger.log(Level.WARNING, "sendFile cancelled");
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("完成");
                        // 关闭流
                        try {
                            os.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
                        responseObserver.onNext(Info.newBuilder().setMsg("success, spend time :" + seconds).build());
                        responseObserver.onCompleted();
                    }
                };
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
复制代码

3.客户端代码

public void sendFile() {
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<Info> responseObserver = new StreamObserver<Info>() {
            @Override
            public void onNext(Info info) {
                logger.info("end :"+info.getMsg());
            }

            @Override
            public void onError(Throwable t) {
                warning("sendFile Failed: {0}", Status.fromThrowable(t));
                if (testHelper != null) {
                    testHelper.onRpcError(t);
                }
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                info("Finished RecordRoute");
                finishLatch.countDown();
            }
        };

        StreamObserver<FileInfo> requestObserver = asyncStub.sendFile(responseObserver);
        try {
            InputStream is = new FileInputStream(new File("/home/test/test.zip"));
            byte[] buff = new byte[2048];
            int len;
            int index = 0;
            while ((len = is.read(buff)) != -1) {
                requestObserver.onNext(FileInfo.newBuilder().setIndex(index).setArrs(ByteString.copyFrom(buff)).build());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // Mark the end of requests
        requestObserver.onCompleted();

        // Receiving happens asynchronously
        try {
            if (!finishLatch.await(1, TimeUnit.MINUTES)) {
                warning("send file can not finish within 1 minutes");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("sendFile success");
    }
复制代码

4.运行结果
在这里插入图片描述

四、流服务原理

直接看源码

1.调用开始的地方
ServerCalls.class(io.grpc:grpc-stub:1.26.0)

public Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
            ServerCalls.ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = new ServerCalls.ServerCallStreamObserverImpl(call);
            // 通过RouteGuideGrpc的invoke方法,拿到StreamObserver
            StreamObserver<ReqT> requestObserver = this.method.invoke(responseObserver);
            responseObserver.freeze();
            if (responseObserver.autoFlowControlEnabled) {
                call.request(1);
            }
            // 将观察者添加到上下文中
            return new ServerCalls.StreamingServerCallHandler.StreamingServerCallListener(requestObserver, responseObserver, call);
        }
复制代码

2.实际添加观察者到上下文的代码
ServerCallImpl.class(io.grpc:grpc-core:1.26.0)

public ServerStreamListenerImpl(ServerCallImpl<ReqT, ?> call, Listener<ReqT> listener, CancellableContext context) {
			// 校验
            this.call = (ServerCallImpl)Preconditions.checkNotNull(call, "call");
            this.listener = (Listener)Preconditions.checkNotNull(listener, "listener must not be null");
            this.context = (CancellableContext)Preconditions.checkNotNull(context, "context");
            // 实际添加
            this.context.addListener(new CancellationListener() {
                public void cancelled(Context context) {
                    ServerStreamListenerImpl.this.call.cancelled = true;
                }
            }, MoreExecutors.directExecutor());
        }
复制代码

Context.class (io.grpc:grpc-context:1.26.0)

public void addListener(Context.CancellationListener cancellationListener, Executor executor) {
        checkNotNull(cancellationListener, "cancellationListener");
        checkNotNull(executor, "executor");
        if (this.canBeCancelled()) {
            Context.ExecutableListener executableListener = new Context.ExecutableListener(executor, cancellationListener);
            synchronized(this) {
                if (this.isCancelled()) {
                    executableListener.deliver();
                } else if (this.listeners == null) {
                    this.listeners = new ArrayList();
                    this.listeners.add(executableListener);
                    if (this.cancellableAncestor != null) {
                        this.cancellableAncestor.addListener(this.parentListener, Context.DirectExecutor.INSTANCE);
                    }
                } else {
                    this.listeners.add(executableListener);
                }
            }
        }
    }
复制代码

3.每次收到客户端的消息调用发起的地方
在ServerImpl.class(io.grpc:grpc-core:1.26.0)中,主要为getListener().messagesAvailable(producer)发起调用

public void messagesAvailable(final MessageProducer producer) {
      PerfMark.startTask("ServerStreamListener.messagesAvailable", tag);
      final Link link = PerfMark.linkOut();

      final class MessagesAvailable extends ContextRunnable {

        MessagesAvailable() {
          super(context);
        }

        @Override
        public void runInContext() {
          PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag);
          PerfMark.linkIn(link);
          try {
          	// 获取当前的监听者队列中的观察者,并消费客户端发来的消息
            getListener().messagesAvailable(producer);
          } catch (RuntimeException e) {
            internalClose();
            throw e;
          } catch (Error e) {
            internalClose();
            throw e;
          } finally {
            PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
          }
        }
      }

      try {
        callExecutor.execute(new MessagesAvailable());
      } finally {
        PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag);
      }
    }
复制代码

五、总结

以上就是我对grpc流服务的一些理解了,欢迎在评论区留言讨论~

六、参考资料

  1. gRPC 官方文档中文版V1.0
  2. 官方github地址

七、下载

CSDN下载链接:下载👈