本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。
简介
泛化接口调用方式主要用于客户端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map表示。使用泛化调用时,服务提供者应用没有特殊操作,服务消费者应用不再需要引入服务提供者的SDK二方包。
适用于API网关服务、框架集成等场景,提供一个Dubbo服务的统一管理平台,让各个消费方调用在统一管理平台注册了的服务,而不需要引入SDK二方库。
用法示例
public interface GreetingService {
String sayHello(String name);
}
/**
* 通过API方式使用泛化调用
*/
@Test
public void test() {
// 应用配置
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("dubbo-consumer");
// 注册中心配置
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
// 引用远程服务
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface("com.xxx.GreetingService");
reference.setGeneric(true);
// 获取GenericService,代替
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
GenericService genericService = cache.get(reference);
// 调用服务
String[] parameterTypes = new String[] { "java.lang.String" };
Object[] args = Stream.of("xiaoming").toArray();
Object result = genericService.$invoke("sayHello", parameterTypes, args);
System.out.println("result = " + result);
}
复制代码
源码分析
服务消费方
在dubbo的责任链中,GenericImplFilter会拦截泛化调用,进行参数校验,并发起RPC调用。
org.apache.dubbo.rpc.filter.GenericImplFilter#invoke
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
// 判断是否是泛化调用
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
// 调用参数
Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
// 传递泛化调用方式参数,以便于服务提供方解析请求参数等数据
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
// 发起RPC调用
return invoker.invoke(invocation);
}
复制代码
服务提供方
服务提供方使用GenericFilter拦截请求,把泛化参数进行反序列化解析,将请求转发给具体的服务提供实现类对象进行业务执行。
org.apache.dubbo.rpc.filter.GenericFilter#invoke
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 泛化调用
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// 参数信息
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 获取调用方法
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
// 泛化类型为空,使用默认反序列化方式
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
// nativejava反序列化方式
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(...);
}
}
// bean反序列化方式
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(...));
}
}
}
// 执行具体服务
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
// 对结果序列化处理并返回
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, os).writeObject(result.getValue());
return new RpcResult(os.toByteArray());
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
// 非泛化调用,把请求传递给下一个过滤器
return invoker.invoke(inv);
}
复制代码
近期评论