前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更广泛的实用性。
基本思路
所有的方法调用,基于反射进行相关处理实现。
服务端
核心类
RpcServer调整如下:
serverBootstrap. group (workerGroup,bossGroup) .channel(NioServerSocketChannel.class) //打印日志 .handler(newLoggingHandler(LogLevel.INFO)) .childHandler(newChannelInitializer (){ @Override protectedvoidinitChannel(Channelch)throwsException{ ch.pipeline() //解码bytes=>resp .addLast(newObjectDecoder( Integer .MAX_VALUE,ClassResolvers.cacheDisabled( null ))) //request=>bytes .addLast(newObjectEncoder()) .addLast(newRpcServerHandler()); } }) //这个参数影响的是还没有被accept取出的连接 . option (ChannelOption.SO_BACKLOG,128) //这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true );其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。
RpcServerHandler
packagecom.github.houbb.rpc.server.handler; importcom.github.houbb.log.integration.core.Log; importcom.github.houbb.log.integration.core.LogFactory; importcom.github.houbb.rpc测试数据mon.rpc.domain.RpcRequest; importcom.github.houbb.rpc测试数据mon.rpc.domain.impl.DefaultRpcResponse; importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; /** *@authorbinbin.hou *@since0.0.1 */ public classRpcServerHandlerextendsSimpleChannelInboundHandler{ private static finalLoglog=LogFactory.getLog(RpcServerHandler.class); @Override public voidchannelActive(ChannelHandlerContextctx)throwsException{ finalStringid=ctx.channel().id().asLongText(); log.info( "[Server]channel{}connected" +id); } @Override protectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{ finalStringid=ctx.channel().id().asLongText(); log.info( "[Server]channelreadstart:{}" ,id); //接受客户端请求 RpcRequestrpcRequest=(RpcRequest)msg; log.info( "[Server]receivechannel{}request:{}" ,id,rpcRequest); //回写到client端 DefaultRpcResponserpcResponse=handleRpcRequest(rpcRequest); ctx.writeAndFlush(rpcResponse); log.info( "[Server]channel{}response{}" ,id,rpcResponse); } @Override public voidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ cause.printStackTrace(); ctx. close (); } /** *处理请求信息 *@paramrpcRequest请求信息 *@ return 结果信息 *@since0.0.6 */ privateDefaultRpcResponsehandleRpcRequest(finalRpcRequestrpcRequest){ DefaultRpcResponserpcResponse=newDefaultRpcResponse(); rpcResponse.seqId(rpcRequest.seqId()); try{ //获取对应的service实现类 //rpcRequest=>invocationRequest //执行invoke Objectresult=DefaultServiceFactory.getInstance() .invoke(rpcRequest.serviceId(), rpcRequest.methodName(), rpcRequest.paramTypeNames(), rpcRequest.paramValues()); rpcResponse.result(result); }catch(Exceptione){ rpcResponse.error(e); log.error( "[Server]executemeetexforrequest" ,rpcRequest,e); } //构建结果值 return rpcResponse; } }和以前类似,不过 handleRpcRequest 要稍微麻烦一点。
这里需要根据发射,调用对应的方法。
pojo
其中使用的出参、入参实现如下:
RpcRequest
packagecom.github.houbb.rpc测试数据mon.rpc.domain; importjava.util.List; /** *序列化相关处理 *(1)调用创建时间-createTime *(2)调用方式callType *(3)超时时间timeOut * *额外信息: *(1)上下文信息 * *@authorbinbin.hou *@since0.0.6 */ public interfaceRpcRequestextendsBaseRpc{ /** *创建时间 *@ return 创建时间 *@since0.0.6 */ longcreateTime(); /** *服务唯一标识 *@ return 服务唯一标识 *@since0.0.6 */ StringserviceId(); /** *方法名称 *@ return 方法名称 *@since0.0.6 */ StringmethodName(); /** *方法类型名称列表 *@ return 名称列表 *@since0.0.6 */ List paramTypeNames(); //调用参数信息列表 /** *调用参数值 *@ return 参数值数组 *@since0.0.6 */ Object[]paramValues(); }RpcResponse
packagecom.github.houbb.rpc测试数据mon.rpc.domain; /** *序列化相关处理 *@authorbinbin.hou *@since0.0.6 */ public interfaceRpcResponseextendsBaseRpc{ /** *异常信息 *@ return 异常信息 *@since0.0.6 */ Throwableerror(); /** *请求结果 *@ return 请求结果 *@since0.0.6 */ Objectresult(); }BaseRpc
packagecom.github.houbb.rpc测试数据mon.rpc.domain; importjava.io. Serializable ; /** *序列化相关处理 *@authorbinbin.hou *@since0.0.6 */ public interfaceBaseRpcextends Serializable { /** *获取唯一标识号 *(1)用来唯一标识一次调用,便于获取该调用对应的响应信息。 *@ return 唯一标识号 */ StringseqId(); /** *设置唯一标识号 *@paramtraceId唯一标识号 *@ return this */ BaseRpcseqId(finalStringtraceId); }ServiceFactory-服务工厂
为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。
ServiceFactory
packagecom.github.houbb.rpc.server.service; importcom.github.houbb.rpc.server.config.service.ServiceConfig; importcom.github.houbb.rpc.server.registry.ServiceRegistry; importjava.util.List; /** *服务方法类仓库管理类-接口 * * *(1)对外暴露的方法,应该尽可能的少。 *(2)对于外部的调用,后期比如telnet治理,可以使用比如有哪些服务列表? *单个服务有哪些方法名称? * *等等基础信息的查询,本期暂时全部隐藏掉。 * *(3)前期尽可能的少暴露方法。 *@authorbinbin.hou *@since0.0.6 *@seeServiceRegistry服务注册,将服务信息放在这个类中,进行统一的管理。 *@seeServiceMethod方法信息 */ public interfaceServiceFactory{ /** *注册服务列表信息 *@paramserviceConfigList服务配置列表 *@ return this *@since0.0.6 */ ServiceFactoryregisterServices(finalList serviceConfigList); /** *直接反射调用 *(1)此处对于方法反射,为了提升性能,所有的class.getFullName()进行拼接然后放进 key 中。 * *@paramserviceId服务名称 *@parammethodName方法名称 *@paramparamTypeNames参数类型名称列表 *@paramparamValues参数值 *@ return 方法调用返回值 *@since0.0.6 */ Objectinvoke(finalStringserviceId,finalStringmethodName, List paramTypeNames,finalObject[]paramValues); }DefaultServiceFactory
作为默认实现,如下:
packagecom.github.houbb.rpc.server.service.impl; importcom.github.houbb.heaven.constant.PunctuationConst; importcom.github.houbb.heaven.util测试数据mon.ArgUtil; importcom.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil; importcom.github.houbb.heaven.util.util.CollectionUtil; importcom.github.houbb.rpc测试数据mon.exception.RpcRuntimeException; importcom.github.houbb.rpc.server.config.service.ServiceConfig; importcom.github.houbb.rpc.server.service.ServiceFactory; importjava.lang.reflect.InvocationTargetException; importjava.lang.reflect.Method; importjava.util.HashMap; importjava.util.List; importjava.util.Map; /** *默认服务仓库实现 *@authorbinbin.hou *@since0.0.6 */ public classDefaultServiceFactoryimplementsServiceFactory{ /** *服务map *@since0.0.6 */ privateMap serviceMap; /** *直接获取对应的method信息 *(1) key :serviceId:methodName:param1@param2@param3 *(2)value:对应的method信息 */ privateMap methodMap; private static finalDefaultServiceFactoryINSTANCE=newDefaultServiceFactory(); privateDefaultServiceFactory(){} public static DefaultServiceFactorygetInstance(){ return INSTANCE; } /** *服务注册一般在项目启动的时候,进行处理。 *属于比较重的操作,而且一个服务按理说只应该初始化一次。 *此处加锁为了保证线程安全。 *@paramserviceConfigList服务配置列表 *@ return this */ @Override public synchronizedServiceFactoryregisterServices(List serviceConfigList){ ArgUtil.notEmpty(serviceConfigList, "serviceConfigList" ); //集合初始化 serviceMap=newHashMap<>(serviceConfigList. size ()); //这里只是预估,一般为2个服务。 methodMap=newHashMap<>(serviceConfigList. size ()*2); for (ServiceConfigserviceConfig:serviceConfigList){ serviceMap.put(serviceConfig.id(),serviceConfig.reference()); } //存放方法名称 for (Map.Entry entry:serviceMap.entrySet()){ StringserviceId=entry.getKey(); Objectreference=entry.getValue(); //获取所有方法列表 Method[]methods=reference.getClass().getMethods(); for (Methodmethod:methods){ StringmethodName=method.getName(); if(ReflectMethodUtil.isIgnoreMethod(methodName)){ continue ; } List paramTypeNames=ReflectMethodUtil.getParamTypeNames(method); String key =buildMethodKey(serviceId,methodName,paramTypeNames); methodMap.put( key ,method); } } return this; } @Override public Objectinvoke(StringserviceId,StringmethodName,List paramTypeNames,Object[]paramValues){ //参数校验 ArgUtil.notEmpty(serviceId, "serviceId" ); ArgUtil.notEmpty(methodName, "methodName" ); //提供cache,可以根据前三个值快速定位对应的method //根据method进行反射处理。 //对于paramTypes进行string连接处理。 finalObjectreference=serviceMap.get(serviceId); finalStringmethodKey=buildMethodKey(serviceId,methodName,paramTypeNames); finalMethodmethod=methodMap.get(methodKey); try{ return method.invoke(reference,paramValues); }catch(IllegalAccessException|InvocationTargetExceptione){ thrownewRpcRuntimeException(e); } } /** *(1)多个之间才用:分隔 *(2)参数之间采用@分隔 *@paramserviceId服务标识 *@parammethodName方法名称 *@paramparamTypeNames参数类型名称 *@ return 构建完整的 key *@since0.0.6 */ privateStringbuildMethodKey(StringserviceId,StringmethodName,List paramTypeNames){ Stringparam=CollectionUtil. join (paramTypeNames,PunctuationConst. AT ); return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON +param; } }ServiceRegistry-服务注册类
接口
packagecom.github.houbb.rpc.server.registry; /** *服务注册类 *(1)每个应用唯一 *(2)每个服务的暴露协议应该保持一致 *暂时不提供单个服务的特殊处理,后期可以考虑添加 * *@authorbinbin.hou *@since0.0.6 */ public interfaceServiceRegistry{ /** *暴露的rpc服务端口信息 *@paramport端口信息 *@ return this *@since0.0.6 */ ServiceRegistryport(final int port); /** *注册服务实现 *@paramserviceId服务标识 *@paramserviceImpl服务实现 *@ return this *@since0.0.6 */ ServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl); /** *暴露所有服务信息 *(1)启动服务端 *@ return this *@since0.0.6 */ ServiceRegistryexpose(); }实现
packagecom.github.houbb.rpc.server.registry.impl; importcom.github.houbb.heaven.util测试数据mon.ArgUtil; importcom.github.houbb.rpc测试数据mon.config.protocol.ProtocolConfig; importcom.github.houbb.rpc.server.config.service.DefaultServiceConfig; importcom.github.houbb.rpc.server.config.service.ServiceConfig; importcom.github.houbb.rpc.server.core.RpcServer; importcom.github.houbb.rpc.server.registry.ServiceRegistry; importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory; importjava.util.ArrayList; importjava.util.List; /** *默认服务端注册类 *@authorbinbin.hou *@since0.0.6 */ public classDefaultServiceRegistryimplementsServiceRegistry{ /** *单例信息 *@since0.0.6 */ private static finalDefaultServiceRegistryINSTANCE=newDefaultServiceRegistry(); /** *rpc服务端端口号 *@since0.0.6 */ private int rpcPort; /** *协议配置 *(1)默认只实现tcp *(2)后期可以拓展实现web-service/http/https等等。 *@since0.0.6 */ privateProtocolConfigprotocolConfig; /** *服务配置列表 *@since0.0.6 */ privateList serviceConfigList; privateDefaultServiceRegistry(){ //初始化默认参数 this.serviceConfigList=newArrayList<>(); this.rpcPort=9527; } public static DefaultServiceRegistrygetInstance(){ return INSTANCE; } @Override public ServiceRegistryport( int port){ ArgUtil.positive(port, "port" ); this.rpcPort=port; return this; } /** *注册服务实现 *(1)主要用于后期服务调用 *(2)如何根据id获取实现?非常简单,id是唯一的。 *有就是有,没有就抛出异常,直接返回。 *(3)如果根据{@linkcom.github.houbb.rpc测试数据mon.rpc.domain.RpcRequest}获取对应的方法。 * *3.1根据serviceId获取唯一的实现 *3.2根据{@linkClass#getMethod(String,Class[])}方法名称+参数类型唯一获取方法 *3.3根据{@linkjava.lang.reflect.Method#invoke(Object,Object...)}执行方法 * *@paramserviceId服务标识 *@paramserviceImpl服务实现 *@ return this *@since0.0.6 */ @Override @SuppressWarnings( "unchecked" ) public synchronizedDefaultServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl){ ArgUtil.notEmpty(serviceId, "serviceId" ); ArgUtil.notNull(serviceImpl, "serviceImpl" ); //构建对应的其他信息 ServiceConfigserviceConfig=newDefaultServiceConfig(); serviceConfig.id(serviceId).reference(serviceImpl); serviceConfigList. add (serviceConfig); return this; } @Override public ServiceRegistryexpose(){ //注册所有服务信息 DefaultServiceFactory.getInstance() .registerServices(serviceConfigList); //暴露nettyserver信息 newRpcServer(rpcPort).start(); return this; } }ServiceConfig 是一些服务的配置信息,接口定义如下:
packagecom.github.houbb.rpc.server.config.service; /** *单个服务配置类 * *简化用户使用: *在用户使用的时候,这个类应该是不可见的。 *直接提供对应的服务注册类即可。 * *后续拓展 *(1)版本信息 *(2)服务端超时时间 * *@authorbinbin.hou *@since0.0.6 *@param 实现类泛型 */ public interfaceServiceConfig { /** *获取唯一标识 *@ return 获取唯一标识 *@since0.0.6 */ Stringid(); /** *设置唯一标识 *@paramid标识信息 *@ return this *@since0.0.6 */ ServiceConfig id(Stringid); /** *获取引用实体实现 *@ return 实体实现 *@since0.0.6 */ Treference(); /** *设置引用实体实现 *@paramreference引用实现 *@ return this *@since0.0.6 */ ServiceConfig reference(Treference); }测试
maven 引入
引入服务端的对应 maven 包:
com.github.houbb rpc-server 0.0.6
服务端启动
//启动服务 DefaultServiceRegistry.getInstance() .register(ServiceIdConst.CALC,newCalculatorServiceImpl()) .expose();这里注册了一个计算服务,并且设置对应的实现。
和以前实现类似,此处不再赘述。
启动日志:
[DEBUG][2021-10-0513:39:42.638][main][c.g.h.l.i.c.LogFactory.setImplementation]-Logginginitializedusing 'classcom.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO][2021-10-0513:39:42.645][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务开始启动服务端 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelRegistered 信息:[id:0xec4dc74f]REGISTERED 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerbind 信息:[id:0xec4dc74f]BIND:0.0.0.0/0.0.0.0:9527 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelActive 信息:[id:0xec4dc74f,L:/0:0:0:0:0:0:0:0:9527]ACTIVE [INFO][2021-10-0513:39:43.893][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务端启动完成,监听【9527】端口ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。
原文链接:https://HdhCmsTesttoutiao测试数据/a7017765348539892256/
查看更多关于Java 从零开始手写 RPC—Reflect 反射实现通用调用之服务端的详细内容...
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did213947