手写简单RPC框架实践
一、前言
随着系统业务的增长,往往将系统从单体应用升级为分布式多体应用。在单体应用中,服务调用局限于本地,而变为多体应用后,相互通信就得依赖远程。RPC(Remote Procedure Call Protocol)则提供了一种方式,能让我们像调用本地服务一样调用远程服务,而无需关心网络通讯细节,大大提高了生产力。主流的RPC框架主要有Dubbo、Thrift、GRPC等,但这些框架都大而全,对于初学者学习不太友好,参考网上资料,通过实践手写一个简易的RPC框架,不仅加深了对RPC基本实现原理的理解,也能够帮助其他主流RPC框架的学习理解。
本文主要是介绍了整个RPC框架的实现思路,框架示意图如下,有木有和Dubbo的结构图很像,毕竟万变不离其宗么,实现原理都是很相似的。主要实现内容有:
- 基于netty实现了一套自定义远程调用;
- 基于Zookeeper实现了服务的自动注册与发现;
- 实现了服务的多版本支持与负载均衡。
二、远程调用的实现
1. 实现思路
远程调用实现流程如下图所示,主要实现思路如下:
- 远程调用分为本地调用端与远程服务端;
- 调用者根据服务接口获得对应的代理对象,然后直接调用接口的方法即可获得返回结果,可以实现像调用本地服务一样调用远程服务;
- 本地调用端主要通过动态代理的方式来实现上述功能,调用接口方法的时候,其代理对象实现了具体的网络通讯细节,将接口名、方法名、方法参数等请求信息发送给远程服务端并等待远程服务端的返回信息;
- 远程服务端根据请求信息通过反射获得具体的服务实现类,执行实现类的相应方法后并将调用结果返回给调用端;
- 调用端接收到返回值,代理对象将其封装为返回结果给调用者, 整个远程调用即结束。
原先使用原生socket的BIO多线程方案来实现本地调用端和远程服务端的交互,实现简单且便于理解,在github上打了该
tag
,有兴趣的同学可以去看一下。基于Netty的高性能,后来采用了Netty作为其网络传输框架,使用
JSON
格式的字符串作为传输数据的编解码。由于Netty的异步调用与请求同步返回结果的需求不一致,因此需要实现一套异步调用的伪同步机制,这一套在Dubbo中实现的很完善,有兴趣的同学可以去看一下,之前写的文章中的MQTT同步调用就是借鉴了Dubbo底层将Netty的异步调用转化成同步的方式。在本文中,为了实现方便,使用了一个讨巧的方法,具体实现在后面阐述。
2. 远程服务端实现
远程服务端的服务绑定和启动较简单,主要代码如下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
/** * 开启一个rpc远程服务 * * @author 丁许 * @date 2019/7/14 19:03 */@Slf4jpublic class RpcServer { /** * 服务发布的ip地址 * 这边自定义因为 InetAddress.getLocalHost().getHostAddress()可能获得是127.0.0.1 */ private String serviceIp; /** * 服务发布端口 */ private int servicePort; /** * 服务名称和服务对象的关系 */ private MapString, Object handlerMap = new HashMap(); /** * 绑定服务名以及服务对象 * * @param services 服务列表 */ public void bindService(ListObject services) { for (Object service : services) { RpcService anno = service.getClass().getAnnotation(RpcService.class); if (null == anno) { //注解为空的情况,version就是空,serviceName就是 throw new RuntimeException("服务并没有注解,请检查。" + service.getClass().getName()); } String serviceName = anno.value().getName(); String version = anno.version(); if (!"".equals(version)) { serviceName += "-" + version; } handlerMap.put(serviceName, service); } } /** * 发布服务 */ public void publish() throws InterruptedException { //使用netty开启一个服务 ServerBootstrap bootstrap = new ServerBootstrap(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //数据分包,组包,粘包 p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); p.addLast(new LengthFieldPrepender(4)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new ProcessRequestHandler(handlerMap)); } }); bootstrap.bind(serviceIp, servicePort).sync(); log.info("成功启动服务,host:{},port:{}", serviceIp, servicePort); //省略代码... }}
/**
- 开启一个rpc远程服务
- @author 丁许
- @date 2019/7/14 19:03
/
@Slf4j
public class RpcServer {
/*
* 服务发布的ip地址
* 这边自定义因为 InetAddress.getLocalHost().getHostAddress()可能获得是127.0.0.1
*/
private String serviceIp;
/**
* 服务发布端口
*/
private int servicePort;
/**
* 服务名称和服务对象的关系
*/
private MapString, Object handlerMap = new HashMap();
/**
* 绑定服务名以及服务对象
*
* @param services 服务列表
*/
public void bindService(ListObject services) {
for (Object service : services) {
RpcService anno = service.getClass().getAnnotation(RpcService.class);
if (null == anno) {
//注解为空的情况,version就是空,serviceName就是
throw new RuntimeException("服务并没有注解,请检查。" + service.getClass().getName());
}
String serviceName = anno.value().getName();
String version = anno.version();
if (!"".equals(version)) {
serviceName += "-" + version;
}
handlerMap.put(serviceName, service);
}
}
/**
* 发布服务
*/
public void publish() throws InterruptedException {
//使用netty开启一个服务
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializerSocketChannel() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//数据分包,组包,粘包
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
p.addLast(new LengthFieldPrepender(4));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new ProcessRequestHandler(handlerMap));
}
});
bootstrap.bind(serviceIp, servicePort).sync();
log.info("成功启动服务,host:{},port:{}", serviceIp, servicePort);
//省略代码...
}
}
这块主要包含了服务绑定和服务启动两部分。
服务绑定主要是遍历传入的服务实现类,通过
@RpcService
自定义注解获得其服务接口和版本信息,并得到serviceName,作为
handlerMap
的key将其存入
handlerMap
中,
handlerMap
即存储了服务名称和服务对象的关系。
@RpcService
自定义注解如下,用于提供具体服务实现类的服务接口和版本信息。
123456789101112131415161718
/** * 提供服务的注解 */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface RpcService { /** * 对外发布的服务接口地址 */ Class? value(); /** * 版本 */ String version() default ""; }
/**
- 提供服务的注解
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
/**
* 对外发布的服务接口地址
*/
Class? value();
/**
* 版本
*/
String version() default "";
}
服务启动则是常规的Netty起服务的代码,其中
serviceIp
和
servicePort
代表该服务绑定的
host
和
port
,
LengthFieldBasedFrameDecoder
和
LengthFieldPrepender
这两个
ChannelHandler
用于处理数据的粘包、拆包问题,具体的业务处理逻辑由
ProcessRequestHandler
这个
ChannelHandler
来实现。
ProcessRequestHandler
代码如下,用于处理具体的业务逻辑。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
@Slf4jpublic class ProcessRequestHandler extends SimpleChannelInboundHandlerString { /** * 服务映射 */ private MapString, Object handlerMap; public ProcessRequestHandler(MapString, Object handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { log.debug("收到request:{}", s); Object result = this.invoke(JSON.parseObject(s, RpcRequest.class)); ChannelFuture future = channelHandlerContext.writeAndFlush(JSON.toJSONString(result)); future.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("Unexpected exception from downstream.", cause); ctx.close(); } /** * 服务调用返回处理结果 * * @param request 服务请求 * * @return 处理结果 */ private Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { //获得服务名称 String serviceName = request.getClassName(); //获得版本号 String version = request.getVersion(); //获得方法名 String methodName = request.getMethodName(); //获得参数数组 Object[] params = request.getParams(); //获得参数类型数据 Class?[] argTypes = Arrays.stream(params).map(Object::getClass).toArray(Class?[]::new); if (version != null && !"".equals(version)) { serviceName = serviceName + "-" + version; } Object service = handlerMap.get(serviceName); if (null == service) { return RpcResponse.fail(ResponseCode.ERROR404, "未找到服务"); } Method method = service.getClass().getMethod(methodName, argTypes); if (null == method) { return RpcResponse.fail(ResponseCode.ERROR404, "未找到服务方法"); } return RpcResponse.success(method.invoke(service, params)); }}
@Slf4j
public class ProcessRequestHandler extends SimpleChannelInboundHandlerString {
/**
* 服务映射
*/
private MapString, Object handlerMap;
public ProcessRequestHandler(MapString, Object handlerMap) {
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
log.debug("收到request:{}", s);
Object result = this.invoke(JSON.parseObject(s, RpcRequest.class));
ChannelFuture future = channelHandlerContext.writeAndFlush(JSON.toJSONString(result));
future.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Unexpected exception from downstream.", cause);
ctx.close();
}
/**
* 服务调用返回处理结果
*
* @param request 服务请求
*
* @return 处理结果
*/
private Object invoke(RpcRequest request)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//获得服务名称
String serviceName = request.getClassName();
//获得版本号
String version = request.getVersion();
//获得方法名
String methodName = request.getMethodName();
//获得参数数组
Object[] params = request.getParams();
//获得参数类型数据
Class?[] argTypes = Arrays.stream(params).map(Object::getClass).toArray(Class?[]::new);
if (version != null && !"".equals(version)) {
serviceName = serviceName + "-" + version;
}
Object service = handlerMap.get(serviceName);
if (null == service) {
return RpcResponse.fail(ResponseCode.ERROR404, "未找到服务");
}
Method method = service.getClass().getMethod(methodName, argTypes);
if (null == method) {
return RpcResponse.fail(ResponseCode.ERROR404, "未找到服务方法");
}
return RpcResponse.success(method.invoke(service, params));
}
}
首先将接收到的字符串转为
RpcRequest
对象,根据
RpcRequest
对象的服务名称、方法名、参数数组等信息从
handlerMap
中获得对应的服务实现类并通过反射的方式获得其方法的调用结果,最终将调用结果
RpcResponse
对象转为
JSON
字符串后通过
channelHandlerContext.channel().writeAndFlush(JSON.toJSONString(result));
将数据写入,写入成功后即可关闭该channel。自此完成整个流程的调用。
RpcRequest
和
RpcResponse
对象分别定义如下,约束了RPC的请求实体和响应实体。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
@Datapublic class RpcRequest implements Serializable { private static final long serialVersionUID = 5661720043123218215L; /** * 请求接口名 */ private String className; /** * 方法名 */ private String methodName; /** * 参数数组 */ private Object[] params; /** * 版本号 */ private String version;}@Datapublic class RpcResponseT implements Serializable { private static final long serialVersionUID = 715745410605631233L; /** * 响应码 */ private Integer code; /** * 响应错误消息体 */ private String message; /** * 响应数据 */ private T data; /** * 成功响应 * * @param data 数据 * @param 数据泛型 * * @return RpcResponse */ public static T RpcResponseT success(T data) { RpcResponseT response = new RpcResponse(); response.setCode(ResponseCode.SUCCESS.getValue()); if (null != data) { response.setData(data); } return response; } /** * 失败响应 * @param responseCode 响应码枚举 * @param errorMessage 错误消息 * @param 泛型 * * @return RpcResponse */ public static T RpcResponseT fail(ResponseCode responseCode, String errorMessage) { RpcResponseT response = new RpcResponse(); response.setCode(responseCode.getValue()); response.setMessage(errorMessage); return response; }}
@Data
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 5661720043123218215L;
/**
* 请求接口名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数数组
*/
private Object[] params;
/**
* 版本号
*/
private String version;
}
@Data
public class RpcResponse implements Serializable {
private static final long serialVersionUID = 715745410605631233L;
/**
* 响应码
*/
private Integer code;
/**
* 响应错误消息体
*/
private String message;
/**
* 响应数据
*/
private T data;
/**
* 成功响应
*
* @param data 数据
* @param 数据泛型
*
* @return RpcResponse
*/
public static RpcResponse success(T data) {
RpcResponse response = new RpcResponse();
response.setCode(ResponseCode.SUCCESS.getValue());
if (null != data) {
response.setData(data);
}
return response;
}
/**
* 失败响应
* @param responseCode 响应码枚举
* @param errorMessage 错误消息
* @param 泛型
*
* @return RpcResponse
*/
public static RpcResponse fail(ResponseCode responseCode, String errorMessage) {
RpcResponse response = new RpcResponse();
response.setCode(responseCode.getValue());
response.setMessage(errorMessage);
return response;
}
}
3. 本地调用端实现
本地调用端实现主要包括动态代理和Netty请求调用两部分。
首先是动态代理的实现,采用的是
JDK
的动态代理,实现代码如下:
1234
public T T clientProxy(ClassT interfaceCls, String version) { return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[] { interfaceCls }, new RpcInvocationHandler(serverDiscover, version));}
public T clientProxy(Class interfaceCls, String version) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[] { interfaceCls },
new RpcInvocationHandler(serverDiscover, version));
}
这边就是
JDK
动态代理的写法,根据传入的接口的Class对象生成相关代理类,具体执行方法由
RpcInvocationHandler
提供,具体代码如下:
123456789101112131415161718192021222324252627282930313233343536
@Slf4jpublic class RpcInvocationHandler implements InvocationHandler { //省略代码... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParams(args); request.setVersion(version); String serviceName = method.getDeclaringClass().getName(); if (null != version && !"".equals(version)) { serviceName += "-" + version; } String servicePath = serverDiscover.disvover(serviceName); if (null == servicePath) { log.error("并未找到服务地址,className:{}", serviceName); throw new RuntimeException("未找到服务地址"); } String host = servicePath.split(":")[0]; int port = Integer.parseInt(servicePath.split(":")[1]); RpcResponse response = new NettyTransport(host, port).send(request); if (response == null) { throw new RuntimeException("调用服务失败,servicePath:" + servicePath); } if (response.getCode() == null || !response.getCode().equals(ResponseCode.SUCCESS.getValue())) { log.error("调用服务失败,servicePath:{},RpcResponse:{}", servicePath, JSONObject.toJSONString(JSON.toJSONString(response))); throw new RuntimeException(response.getMessage()); } else { return response.getData(); } }}
@Slf4j
public class RpcInvocationHandler implements InvocationHandler {
//省略代码...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParams(args);
request.setVersion(version);
String serviceName = method.getDeclaringClass().getName();
if (null != version && !"".equals(version)) {
serviceName += "-" + version;
}
String servicePath = serverDiscover.disvover(serviceName);
if (null == servicePath) {
log.error("并未找到服务地址,className:{}", serviceName);
throw new RuntimeException("未找到服务地址");
}
String host = servicePath.split(":")[0];
int port = Integer.parseInt(servicePath.split(":")[1]);
RpcResponse response = new NettyTransport(host, port).send(request);
if (response == null) {
throw new RuntimeException("调用服务失败,servicePath:" + servicePath);
}
if (response.getCode() == null || !response.getCode().equals(ResponseCode.SUCCESS.getValue())) {
log.error("调用服务失败,servicePath:{},RpcResponse:{}", servicePath,
JSONObject.toJSONString(JSON.toJSONString(response)));
throw new RuntimeException(response.getMessage());
} else {
return response.getData();
}
}
}
动态代理方法在执行时,会调用
RpcInvocationHandler
里面的invoke方法去执行,主要做的事情就是获得类名、方法名、方法参数等信息并将其封装为
RpcRequest
对象并使用
RpcResponse response = new NettyTransport(host, port).send(request);
该请求调用返回相应结果。
NettyTransport
则封装了Netty请求调用的实现细节,代码如下:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
@Slf4jpublic class NettyTransport { private static Bootstrap bootstrap; private String host; private int port; public NettyTransport(String host, int port) { this.host = host; this.port = port; } static { bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group).channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializerChannel() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //处理数据的粘包、拆包问题 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ClientHandler()); } }); } public RpcResponse send(RpcRequest request) throws InterruptedException { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(JSON.toJSONString(request)); //当通道关闭了,就继续往下走 channelFuture.channel().closeFuture().sync(); AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse"); return channel.attr(key).get(); } public static class ClientHandler extends SimpleChannelInboundHandlerString { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { log.debug("收到response:{}", s); RpcResponse response = JSON.parseObject(s, RpcResponse.class); AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse"); channelHandlerContext.channel().attr(key).set(response); channelHandlerContext.channel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("Unexpected exception from upstream.", cause); super.exceptionCaught(ctx, cause); } } }
@Slf4j
public class NettyTransport {
private static Bootstrap bootstrap;
private String host;
private int port;
public NettyTransport(String host, int port) {
this.host = host;
this.port = port;
}
static {
bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializerChannel() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//处理数据的粘包、拆包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
});
}
public RpcResponse send(RpcRequest request) throws InterruptedException {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(JSON.toJSONString(request));
//当通道关闭了,就继续往下走
channelFuture.channel().closeFuture().sync();
AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse");
return channel.attr(key).get();
}
public static class ClientHandler extends SimpleChannelInboundHandlerString {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
log.debug("收到response:{}", s);
RpcResponse response = JSON.parseObject(s, RpcResponse.class);
AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse");
channelHandlerContext.channel().attr(key).set(response);
channelHandlerContext.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Unexpected exception from upstream.", cause);
super.exceptionCaught(ctx, cause);
}
}
}
这边实现思路也很简单,根据传入的
host
和
port
,使用
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
与服务端进行连接,连接成功后即发送传入的
RcpRequest
对象并等待远程服务端的返回。前面说过这边使用了一个讨巧的方法来实现伪同步的调用。
在发送完请求后即阻塞在这里
123
channel.writeAndFlush(JSON.toJSONString(request));//当通道关闭了,就继续往下走channelFuture.channel().closeFuture().sync();
channel.writeAndFlush(JSON.toJSONString(request));
//当通道关闭了,就继续往下走
channelFuture.channel().closeFuture().sync();
在接收到返回结果后即将结果设置到channel的attr中并关闭该channel
123
AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse");channelHandlerContext.channel().attr(key).set(response);channelHandlerContext.channel().close();
AttributeKeyRpcResponse key = AttributeKey.valueOf(“rpcResponse”);
channelHandlerContext.channel().attr(key).set(response);
channelHandlerContext.channel().close();
此时channel关闭后阻塞被放开,通过该channel的attr获得数据
1234
channelFuture.channel().closeFuture().sync();//当通道关闭了,就继续往下走AttributeKeyRpcResponse key = AttributeKey.valueOf("rpcResponse");return channel.attr(key).get();
channelFuture.channel().closeFuture().sync();
//当通道关闭了,就继续往下走
AttributeKeyRpcResponse key = AttributeKey.valueOf(“rpcResponse”);
return channel.attr(key).get();
该方法比较讨巧,并不能控制同步调用的超时处理,因此大家可以参考前面说的Dubbo实现机制来完善。
三、服务注册与服务发现
1. 服务注册
服务注册,就是当一个服务节点上线的时候,能够注册到配置服务当中去,通过配置服务节点向外部正式提供服务,其他需要依赖该服务的服务可以通过查询配置服务获得该节点的具体地址,并发起服务请求。服务注册所描述的对象是被依赖的服务节点。
该例中使用Zookeeper来实现服务的注册,将个服务名称节点作为永久节点注册在根节点
/rpc
下,同时将
ip
+
port
组成的服务地址作为临时节点注册在对应的服务名称节点下,Zookeeper节点示意图如下。
首先定义了个注册中心的接口
12345678910
public interface IregisterCenter { /** * 基于服务名和服务地址注册一个服务 * @param serviceName 服务名称 * @param serviceAddress 服务地址 * @throws Exception 节点创建失败 */ void register(String serviceName,String serviceAddress) throws Exception;}
public interface IregisterCenter {
/**
* 基于服务名和服务地址注册一个服务
* @param serviceName 服务名称
* @param serviceAddress 服务地址
* @throws Exception 节点创建失败
*/
void register(String serviceName,String serviceAddress) throws Exception;
}
然后使用Zookeeper实现该注册中心的接口
12345678910111213141516171819202122232425262728
@Slf4jpublic class ZkRegisterCenter implements IregisterCenter { public static final String ZK_REGISTER_PATH = "/rpc"; private String connectionAddress; private CuratorFramework curatorFramework; public ZkRegisterCenter(String connectionAddress) { this.connectionAddress = connectionAddress; //初始化curator curatorFramework = CuratorFrameworkFactory.builder().connectString(connectionAddress).sessionTimeoutMs(15000) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curatorFramework.start(); } @Override public void register(String serviceName, String serviceAddress) throws Exception { //需要注册的服务根节点 String servicePath = ZK_REGISTER_PATH + "/" + serviceName; //注册服务,创建临时节点 String serviceAddr = servicePath + "/" + serviceAddress; String nodePath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .forPath(serviceAddr, "".getBytes()); log.debug("节点创建成功,节点为:{}", nodePath); }}
@Slf4j
public class ZkRegisterCenter implements IregisterCenter {
public static final String ZK_REGISTER_PATH = "/rpc";
private String connectionAddress;
private CuratorFramework curatorFramework;
public ZkRegisterCenter(String connectionAddress) {
this.connectionAddress = connectionAddress;
//初始化curator
curatorFramework = CuratorFrameworkFactory.builder().connectString(connectionAddress).sessionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
curatorFramework.start();
}
@Override
public void register(String serviceName, String serviceAddress) throws Exception {
//需要注册的服务根节点
String servicePath = ZK_REGISTER_PATH + "/" + serviceName;
//注册服务,创建临时节点
String serviceAddr = servicePath + "/" + serviceAddress;
String nodePath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(serviceAddr, "".getBytes());
log.debug("节点创建成功,节点为:{}", nodePath);
}
}
本例中使用的是
Curator
的ZooKeeper客户端框架,上述代码就是根据服务名称和服务地址创建相关的临时节点,比较简单,节点结构可以参考上图。
调用注册中心来注册服务是在服务启动成功之后完成的,代码如下。通过遍历
handlerMap
,获得个服务名称,将该服务名称和启动成功的服务
ip
+
port
使用注册中心的
register
方法注册到Zookeeper中。该过程对应了上面RPC框架示意图中服务端与注册中心的服务注册。
12345678910111213141516171819202122232425262728293031323334
@Slf4jpublic class RpcServer { /** * 注册中心 */ private IregisterCenter registerCenter; /** * 服务名称和服务对象的关系 */ private MapString, Object handlerMap = new HashMap(); //省略代码... /** * 发布服务 */ public void publish() throws InterruptedException { //使用netty开启一个服务 //省略代码... //服务注册 handlerMap.keySet().forEach(serviceName - { try { registerCenter.register(serviceName, serviceIp + ":" + servicePort); } catch (Exception e) { log.error("服务注册失败,e:{}", e.getMessage()); throw new RuntimeException("服务注册失败"); } log.info("成功注册服务,服务名称:{},服务地址:{}", serviceName, serviceIp + ":" + servicePort); }); }}
@Slf4j
public class RpcServer {
/**
* 注册中心
*/
private IregisterCenter registerCenter;
/**
* 服务名称和服务对象的关系
*/
private MapString, Object handlerMap = new HashMap();
//省略代码...
/**
* 发布服务
*/
public void publish() throws InterruptedException {
//使用netty开启一个服务
//省略代码...
//服务注册
handlerMap.keySet().forEach(serviceName - {
try {
registerCenter.register(serviceName, serviceIp + ":" + servicePort);
} catch (Exception e) {
log.error("服务注册失败,e:{}", e.getMessage());
throw new RuntimeException("服务注册失败");
}
log.info("成功注册服务,服务名称:{},服务地址:{}", serviceName, serviceIp + ":" + servicePort);
});
}
}
2. 服务发现
服务发现就是当一个服务需要依赖别的服务的时候,可以通过配置服务的查询,获得依赖可用的节点,进而进行调用,服务发现所描述得对象是需要依赖其他服务的节点。
服务发现的目的就是能够根据服务名称获得对应的服务地址信息,定义服务发现的接口如下:
1234567891011
public interface IServerDiscover { /** * 基于服务名称获得一个远程地址 * * @param serviceName 服务名称 * * @return 远程地址 */ String disvover(String serviceName);}
public interface IServerDiscover {
/**
* 基于服务名称获得一个远程地址
*
* @param serviceName 服务名称
*
* @return 远程地址
*/
String disvover(String serviceName);
}
使用Zookeeper实现服务发现的代码如下,基本思路就是本地维护一个服务名称与服务地址列表的关系
serviceAddressMap
,若要找到该服务名称对应的服务地址列表,则可直接通过
serviceAddressMap.get(serviceName)
获得,如果获取不到,则使用
Curator
获得该服务名称节点下的所有子节点(即所有服务地址节点),并注册该服务名称节点的监听,若Zookeeper中该节点的子节点发生变化的时候则会重新更新本地
serviceAddressMap
中服务名称对应的服务地址列表。该过程对应了上面RPC框架示意图中客户端与注册中心的订阅与通知。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
@Slf4jpublic class ZkServerDiscover implements IServerDiscover { MapString, ListString serviceAddressMap = new ConcurrentHashMap(); //省略代码... @Override public String disvover(String serviceName) { ListString serviceAddresses; if (!serviceAddressMap.containsKey(serviceName)) { String path = ZkRegisterCenter.ZK_REGISTER_PATH + "/" + serviceName; try { serviceAddresses = curatorFramework.getChildren().forPath(path); serviceAddressMap.put(serviceName, serviceAddresses); registerWatcher(serviceName); } catch (Exception e) { if (e instanceof KeeperException.NoNodeException) { log.error("未获得该节点,serviceName:{}", serviceName); serviceAddresses = null; } else { throw new RuntimeException("获取子节点异常:" + e); } } } else { serviceAddresses = serviceAddressMap.get(serviceName); } //这边可以先不管,后面讲负载均衡的时候会说 return iLoadBalance.selectServiceAddress(serviceAddresses); } /** * 注册监听 * * @param serviceName 服务名称 */ private void registerWatcher(String serviceName) { String path = ZkRegisterCenter.ZK_REGISTER_PATH + "/" + serviceName; PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) - { ListString serviceAddresses = curatorFramework.getChildren().forPath(path); serviceAddressMap.put(serviceName, serviceAddresses); }; childrenCache.getListenable().addListener(pathChildrenCacheListener); try { childrenCache.start(); } catch (Exception e) { throw new RuntimeException("注册PatchChild Watcher 异常" + e); } }}
@Slf4j
public class ZkServerDiscover implements IServerDiscover {
MapString, ListString serviceAddressMap = new ConcurrentHashMap();
//省略代码...
@Override
public String disvover(String serviceName) {
ListString serviceAddresses;
if (!serviceAddressMap.containsKey(serviceName)) {
String path = ZkRegisterCenter.ZK_REGISTER_PATH + "/" + serviceName;
try {
serviceAddresses = curatorFramework.getChildren().forPath(path);
serviceAddressMap.put(serviceName, serviceAddresses);
registerWatcher(serviceName);
} catch (Exception e) {
if (e instanceof KeeperException.NoNodeException) {
log.error("未获得该节点,serviceName:{}", serviceName);
serviceAddresses = null;
} else {
throw new RuntimeException("获取子节点异常:" + e);
}
}
} else {
serviceAddresses = serviceAddressMap.get(serviceName);
}
//这边可以先不管,后面讲负载均衡的时候会说
return iLoadBalance.selectServiceAddress(serviceAddresses);
}
/**
* 注册监听
*
* @param serviceName 服务名称
*/
private void registerWatcher(String serviceName) {
String path = ZkRegisterCenter.ZK_REGISTER_PATH + "/" + serviceName;
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) - {
ListString serviceAddresses = curatorFramework.getChildren().forPath(path);
serviceAddressMap.put(serviceName, serviceAddresses);
};
childrenCache.getListenable().addListener(pathChildrenCacheListener);
try {
childrenCache.start();
} catch (Exception e) {
throw new RuntimeException("注册PatchChild Watcher 异常" + e);
}
}
}
四、服务多版本与负载均衡
1. 服务多版本
当同一个服务的接口实现出现不兼容升级时,可以用版本号过渡,版本号不同的服务相互间不引用。 可以按照以下的步骤进行版本迁移:
- 在低压力时间段,先升级一半提供者为新版本
- 再将所有消费者升级为新版本
- 然后将剩下的一半提供者升级为新版本
本例中也实现了服务多版本的支持,在服务绑定的时候
serviceName
是根据接口的类全名和版本号来生成的,
handlerMap
中的key即生成的
serviceName
,具体生成规则可以看下面的代码。
123456789101112131415161718192021222324252627
@Slf4jpublic class RpcServer { //省略代码... /** * 绑定服务名以及服务对象 * * @param services 服务列表 */ public void bindService(ListObject services) { for (Object service : services) { RpcService anno = service.getClass().getAnnotation(RpcService.class); if (null == anno) { //注解为空的情况,version就是空,serviceName就是 throw new RuntimeException("服务并没有注解,请检查。" + service.getClass().getName()); } String serviceName = anno.value().getName(); String version = anno.version(); if (!"".equals(version)) { serviceName += "-" + version; } handlerMap.put(serviceName, service); } } //省略代码...}
@Slf4j
public class RpcServer {
//省略代码…
/**
* 绑定服务名以及服务对象
*
* @param services 服务列表
*/
public void bindService(ListObject services) {
for (Object service : services) {
RpcService anno = service.getClass().getAnnotation(RpcService.class);
if (null == anno) {
//注解为空的情况,version就是空,serviceName就是
throw new RuntimeException("服务并没有注解,请检查。" + service.getClass().getName());
}
String serviceName = anno.value().getName();
String version = anno.version();
if (!"".equals(version)) {
serviceName += "-" + version;
}
handlerMap.put(serviceName, service);
}
}
//省略代码...
}
本地调用端则根据传入的version和接口的全类名生成
serviceName
使用服务发现接口获得对应的服务地址
1234567891011121314151617181920
@Slf4jpublic class RpcInvocationHandler implements InvocationHandler { //省略代码... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParams(args); request.setVersion(version); String serviceName = method.getDeclaringClass().getName(); if (null != version && !"".equals(version)) { serviceName += "-" + version; } String servicePath = serverDiscover.disvover(serviceName); //省略代码... }}
@Slf4j
public class RpcInvocationHandler implements InvocationHandler {
//省略代码...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParams(args);
request.setVersion(version);
String serviceName = method.getDeclaringClass().getName();
if (null != version && !"".equals(version)) {
serviceName += "-" + version;
}
String servicePath = serverDiscover.disvover(serviceName);
//省略代码...
}
}
2. 负载均衡
客户端负载均衡,即消费者客户端根据服务提供者列表进行算法分配,来选择调用的服务端。在前面服务注册的时候,可以针对同一个服务名称注册多个服务地址,通过服务发现可以获得该服务地址的列表,通过相关的负载均衡算法,在该列表中找到最合适的一个服务地址。
定义的负载算法接口如下:
1234567891011
public interface ILoadBalance { /** * 在已有服务列表中选择一个服务路径 * @param serviceAddresses 服务地址列表 * * @return 服务地址 */ String selectServiceAddress(ListString serviceAddresses); }
public interface ILoadBalance {
/**
* 在已有服务列表中选择一个服务路径
* @param serviceAddresses 服务地址列表
*
* @return 服务地址
*/
String selectServiceAddress(ListString serviceAddresses);
}
负载算法抽象类如下:
123456789101112131415
public abstract class AbstractLoadBalance implements ILoadBalance { @Override public String selectServiceAddress(ListString serviceAddresses) { if (serviceAddresses == null || serviceAddresses.size() == 0) { return null; } if (serviceAddresses.size() == 1) { return serviceAddresses.get(0); } return doSelect(serviceAddresses); } protected abstract String doSelect(ListString serviceAddresses);}
public abstract class AbstractLoadBalance implements ILoadBalance {
@Override
public String selectServiceAddress(ListString serviceAddresses) {
if (serviceAddresses == null || serviceAddresses.size() == 0) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses);
}
protected abstract String doSelect(ListString serviceAddresses);
}
这边实现了一个最简单的负载算法,即随机负载,代码如下,比较简单。其他比如轮询 、最少活跃调用数(权重)、一致性Hash等负载算法可自行扩展实现。
12345678
public class RandomLoadBalance extends AbstractLoadBalance { @Override protected String doSelect(ListString serviceAddresses) { Random random = new Random(); return serviceAddresses.get(random.nextInt(serviceAddresses.size())); }}
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(ListString serviceAddresses) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
}
在服务发现那块,获得了服务地址列表后即可调用该负载算法获得一个服务地址,代码如下:
1234567891011
@Slf4jpublic class ZkServerDiscover implements IServerDiscover { //省略代码... @Override public String disvover(String serviceName) { //省略代码... return iLoadBalance.selectServiceAddress(serviceAddresses); //省略代码... } }
@Slf4j
public class ZkServerDiscover implements IServerDiscover {
//省略代码…
@Override
public String disvover(String serviceName) {
//省略代码…
return iLoadBalance.selectServiceAddress(serviceAddresses);
//省略代码…
}
}
五、Demo体验
1. 项目结构
rpc-core
模块是rpc框架的核心包,其项目结构如图所示,结构比较简单,根据各个包名也能了解其代表的含义,里面的代码在前面也详细的进行了讲解。
rpc-demo
模块则是demo测试,其项目结构如图所示。
api
包中提供了一个服务接口,定义如下:
1234
public interface Ihello { String sayHello(String name);}
public interface Ihello {
String sayHello(String name);
}
provider
包中提供了该接口的三个实现服务,用于测试多版本和负载均衡,分别如下
12345678910111213141516171819202122232425262728
//1正常实现@RpcService(Ihello.class)public class HelloImpl1 implements Ihello { @Override public String sayHello(String name) { return "HelloImpl1.sayHello:" + name; }} //2相对于1多了一个版本,用于测试多版本@RpcService(value = Ihello.class,version = "1.0")public class HelloImpl1Version implements Ihello { @Override public String sayHello(String name) { return "HelloImpl1 version 1.0.sayHello:" + name; }}//3与1定义的一样,但是实在另一个服务中注册,用于测试负载均衡@RpcService(Ihello.class)public class HelloImpl2 implements Ihello { @Override public String sayHello(String name) { return "HelloImpl2.sayHello:" + name; }}
//1正常实现
@RpcService(Ihello.class)
public class HelloImpl1 implements Ihello {
@Override
public String sayHello(String name) {
return "HelloImpl1.sayHello:" + name;
}
}
//2相对于1多了一个版本,用于测试多版本
@RpcService(value = Ihello.class,version = “1.0”)
public class HelloImpl1Version implements Ihello {
@Override
public String sayHello(String name) {
return "HelloImpl1 version 1.0.sayHello:" + name;
}
}
//3与1定义的一样,但是实在另一个服务中注册,用于测试负载均衡
@RpcService(Ihello.class)
public class HelloImpl2 implements Ihello {
@Override
public String sayHello(String name) {
return "HelloImpl2.sayHello:" + name;
}
}
为了模拟2个服务,写了两个
ServerDemo
,在同一个机器同时启动,
ip
一样,区别在于定义的
port
不一样,代码分别如下:
12345678910111213141516171819202122232425
public class ServerDemo1 { public static void main(String[] args) throws InterruptedException { //这边注册HelloImpl1和HelloImpl1Version Ihello helloService = new HelloImpl1(); Ihello helloService2=new HelloImpl1Version(); IregisterCenter registerCenter = new ZkRegisterCenter("192.168.40.14:2181"); RpcServer rpcServer = new RpcServer(registerCenter, "127.0.0.1", 8888); rpcServer.bindService(Arrays.asList(helloService,helloService2)); rpcServer.publish(); }} public class ServerDemo2 { public static void main(String[] args) throws InterruptedException { //这边注册HelloImpl2 Ihello helloService = new HelloImpl2(); IregisterCenter registerCenter = new ZkRegisterCenter("192.168.40.14:2181"); //这边端口不一样 RpcServer rpcServer = new RpcServer(registerCenter, "127.0.0.1", 9999); rpcServer.bindService(Arrays.asList(helloService)); rpcServer.publish(); }}
public class ServerDemo1 {
public static void main(String[] args) throws InterruptedException {
//这边注册HelloImpl1和HelloImpl1Version
Ihello helloService = new HelloImpl1();
Ihello helloService2=new HelloImpl1Version();
IregisterCenter registerCenter = new ZkRegisterCenter("192.168.40.14:2181");
RpcServer rpcServer = new RpcServer(registerCenter, "127.0.0.1", 8888);
rpcServer.bindService(Arrays.asList(helloService,helloService2));
rpcServer.publish();
}
}
public class ServerDemo2 {
public static void main(String[] args) throws InterruptedException {
//这边注册HelloImpl2
Ihello helloService = new HelloImpl2();
IregisterCenter registerCenter = new ZkRegisterCenter("192.168.40.14:2181");
//这边端口不一样
RpcServer rpcServer = new RpcServer(registerCenter, "127.0.0.1", 9999);
rpcServer.bindService(Arrays.asList(helloService));
rpcServer.publish();
}
}
consumer
包中则提供了客户端代码,既测试了服务多版本,也测试了负载均衡的实现。代码如下:
12345678910111213141516171819202122232425262728
@Slf4jpublic class ClientDemo { public static void main(String[] args) throws InterruptedException { IServerDiscover serverDiscover = new ZkServerDiscover("192.168.40.14:2181"); RpcClientProxy rpcClientProxy = new RpcClientProxy(serverDiscover); //测试服务版本 try { Ihello ihello = rpcClientProxy.clientProxy(Ihello.class, "1.0"); System.out.println(ihello.sayHello("dd")); } catch (Exception e) { log.error("调用失败:e:{}",e.toString()); } //测试负载均衡 for (int i = 0; i 10; i++) { Ihello helloService = rpcClientProxy.clientProxy(Ihello.class); try { String result=helloService.sayHello("xxx"); System.out.println(result); } catch (Exception e) { log.error("调用失败:e:{}",e.toString()); } Thread.sleep(2000); } }}
@Slf4j
public class ClientDemo {
public static void main(String[] args) throws InterruptedException {
IServerDiscover serverDiscover = new ZkServerDiscover("192.168.40.14:2181");
RpcClientProxy rpcClientProxy = new RpcClientProxy(serverDiscover);
//测试服务版本
try {
Ihello ihello = rpcClientProxy.clientProxy(Ihello.class, "1.0");
System.out.println(ihello.sayHello("dd"));
} catch (Exception e) {
log.error("调用失败:e:{}",e.toString());
}
//测试负载均衡
for (int i = 0; i 10; i++) {
Ihello helloService = rpcClientProxy.clientProxy(Ihello.class);
try {
String result=helloService.sayHello("xxx");
System.out.println(result);
} catch (Exception e) {
log.error("调用失败:e:{}",e.toString());
}
Thread.sleep(2000);
}
}
}
2. demo体验
- 启动服务器上的Zookeeper,修改
serverDemo1
、serverDemo1
和ClientDemo
的Zookeeper地址 - 分别启动
serverDemo1
和serverDemo2
,可分别看到下图日志,证明服务启动并成功注册服务
通过工具查看Zookeeper节点信息,如下图所示,可以看到具体的服务节点信息。
- 启动
ClientDemo
,可看到下图日志,证明服务版本与负载均衡均达到了效果
源码地址如下,仅供学习参考
参考
-
-
本文原创,欢迎转载,转载请注明出处,如有不正确或侵权的地方恳请各位看官指正。
原文始发于: