从0-1手写RPC框架

前言

什么是RPC

RPC(Remote Procedure Call)远程过程调用,简言之就是像调用本地方法一样调用远程服务。目前外界使用较多的有gRPC、Dubbo、Spring Cloud等。相信大家对RPC的概念都已经很熟悉了,这里不做过多介绍。

为啥要自己写

为什么要自己写一个RPC框架,dubbo难道满足不了你?我觉得从个人成长上说,如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异步调用、熔断降级等技术,可以全方位的提升基本素质。虽然也有相关源码,但是只看源码容易眼高手低,动手写一个才是自己真正掌握这门技术的最优路径。(换个角度想想dubbo真的适合你们项目吗?有多少人觉得dubbo只是比feign性能好,更主流,大公司都用等等理由才用的?)

RPC框架要素

一款分布式RPC框架离不开三个基本要素:

  • 服务提供方 Serivce Provider
  • 服务消费方 Servce Consumer
  • 注册中心 Registery

围绕上面三个基本要素可以进一步扩展服务路由、负载均衡、服务熔断降级、序列化协议、通信协议等等。

  1. 注册中心

    主要是用来完成服务注册和发现的工作。虽然服务调用是服务消费方直接发向服务提供方的,但是现在服务都是集群部署,服务的提供者数量也是动态变化的,所以服务的地址也就无法预先确定。因此如何发现这些服务就需要一个统一注册中心来承载。

  2. 服务提供方(RPC服务端)

    其需要对外提供服务接口,它需要在应用启动时连接注册中心,将服务名及其服务元数据发往注册中心。同时需要提供服务服务下线的机制。需要维护服务名和真正服务地址映射。服务端还需要启动Socket服务监听客户端请求。

  3. 服务消费方(RPC客户端)

    客户端需要有从注册中心获取服务的基本能力,它需要在应用启动时,扫描依赖的RPC服务,并为其生成代理调用对象,同时从注册中心拉取服务元数据存入本地缓存,然后发起监听各服务的变动做到及时更新缓存。在发起服务调用时,通过代理调用对象,从本地缓存中获取服务地址列表,然后选择一种负载均衡策略筛选出一个目标地址发起调用。调用时会对请求数据进行序列化,并采用一种约定的通信协议进行socket通信。

技术选型

注册中心

目前成熟的注册中心有Zookeeper,Nacos,Consul,Eureka,它们的主要比较如下:

我们这里采用nacos

IO通信框架

本实现采用Netty作为底层通信框架,Netty是一个高性能事件驱动型的非阻塞的IO(NIO)框架。

通信协议

TCP通信过程中会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。所以需要对发送的数据包封装到一种通信协议里。

业界的主流协议的解决方案可以归纳如下:

  1. 消息定长,例如每个报文的大小为固定长度100字节,如果不够用空格补足。
  2. 在包尾特殊结束符进行分割。
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段。

很明显1,2都有些局限性,本实现采用方案3,自定义协议

序列化协议

常见的协议有JavaSerializer、json、Protobuf及Hessian。建议选用Protobuf,其序列化后码流小性能高,非常适合RPC调用,Google自家的gRPC也是用其作为通信协议。但是我们这里采用Hessian2序列化(懒,其他后续慢慢实现)

整体架构

下面就来看看实现吧

通信相关

通信协议

  • 第一个是魔法数,比如我定义为0x01F1。
  • 第二个代表时间戳,以便对时间进行校验
  • 第三个是消息类型,如0代表请求1代表响应。
  • 第四个是加密序列号,采用随机的方式对消息体加密和解密
  • 第五个表示消息长度,即此后面的内容是消息content。

对应实体类如下:

编码器

也就是需要按顺序写入消息的字节到缓冲器,如下:

public class EasyRpcEncoder extends MessageToByteEncoder<RpcRemoteMsg> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcRemoteMsg rpcRemoteMsg, ByteBuf out) throws Exception {
        // 写入开头的标志
        out.writeShort(rpcRemoteMsg.getStartSign());
        // 写入秒时间戳
        out.writeInt(rpcRemoteMsg.getTimeStamp());
        // 写消息类型
        out.writeShort(rpcRemoteMsg.getMsgType());
        // 写入加密序列号
        out.writeShort(rpcRemoteMsg.getEncryptSequence());
        // 写入消息长度
        out.writeInt(rpcRemoteMsg.getContentLength());
        // 写入消息主体
        out.writeBytes(rpcRemoteMsg.getContent());
    }
}

解码器

这里省事,直接继承LengthFieldBasedFrameDecoder实现自己逻辑,如下:

public class EasyRpcDecoder extends LengthFieldBasedFrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(EasyRpcDecoder.class);
    // 开始标记
    private final short HEAD_START = (short) 0x01F1;


    public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);

    }

    public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    public EasyRpcDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 经过父解码器的处理 我们就不需要在考虑沾包和半包了
        // 当然,想要自己处理沾包和半包问题也不是不可以
        ByteBuf decode = (ByteBuf) super.decode(ctx, in);
        if (decode == null) {
            return null;
        }
        // 开始标志校验  开始标志不匹配直接 过滤此条消息
        short startIndex = decode.readShort();
        if (startIndex != HEAD_START) {
            return null;
        }
        // 时间戳
        int timeStamp = decode.readInt();
        // 请求还是响应  0:请求  1:响应
        short msgType = decode.readShort();
        // 加密序列号
        int encryptSequence = decode.readShort();
        // 消息体长度
        int contentLength = decode.readInt();
        // 读取消息
        byte[] msgByte = new byte[contentLength];
        decode.readBytes(msgByte);

        // 将消息转成实体类 传递给下面的数据处理器
        return msgType == 0 ? EncryptUtil.remoteMsgToRequest(encryptSequence,msgByte) : EncryptUtil.remoteMsgToResponse(encryptSequence,msgByte);
    }
}

nacos相关

nacos其实很简单一共四个方法:

  • registerInstance :注册
  • deregisterInstance : 下线
  • getAllInstances : 获取所有实例
  • subscribe :订阅服务

我们nacos全局就一个使用类:

public class NacosEasyRpcCenter implements EasyRpcCenter {
    private final static Logger log = LoggerFactory.getLogger(NacosEasyRpcCenter.class);

    private final static String DEFAULT_NAMESPACE = "public";
    private final static String DEFAULT_META_PARAM = "easyRpcMeta";

    private NacosNamingService nacosNamingService;

    private EasyRpcApplicationConfig applicationConfig;

    private EasyRpcCenterConfig centerConfig;

    public NacosEasyRpcCenter(EasyRpcCenterConfig centerConfig, EasyRpcApplicationConfig rpcApplicationConfig) {
        Assert.isTrue(StrUtil.isNotEmpty(centerConfig.getAddress()), "registry address cannot be empty");
        Assert.isTrue(StrUtil.isNotEmpty(centerConfig.getGroup()), "registry group cannot be empty");
        this.applicationConfig = rpcApplicationConfig;
        this.centerConfig = centerConfig;
        Properties properties = new Properties();
        properties.setProperty("serverAddr", String.format("%s:%d", centerConfig.getAddress(), centerConfig.getPort()));
        properties.setProperty("namespace", StrUtil.emptyToDefault(centerConfig.getNamespace(), DEFAULT_NAMESPACE));
        try {
            this.nacosNamingService = new NacosNamingService(properties);
        } catch (NacosException e) {
            log.error("Easy-Rpc -> nacos center init error:{}", e.getErrMsg(), e);
            throw new EasyRpcRunException(e.getErrMsg());
        }
    }


    @Override
    public void registerInstance(ServiceInstance serviceInstance) {
        Instance instance = new Instance();
        instance.setIp(serviceInstance.getIp());
        instance.setPort(serviceInstance.getPort());
        Map<String, String> meteData=new HashMap<>(4);
        meteData.put(DEFAULT_META_PARAM,JSONObject.toJSONString(serviceInstance.getMetaDataSet()));
        instance.setMetadata(meteData);
        try {
            nacosNamingService.registerInstance(applicationConfig.getName(), centerConfig.getGroup(),instance);
        } catch (NacosException e) {
            log.error("Easy-Rpc -> nacos center register error:{}", e.getErrMsg(), e);
            throw new EasyRpcRunException(e.getErrMsg());
        }
        log.info("Easy-Rpc -> nacos center register:[ serviceName:{} group:{} ] success",applicationConfig.getName(),centerConfig.getGroup());
    }

    @Override
    public void deregisterInstance(ServiceInstance serviceInstance) {
        try {
            nacosNamingService.deregisterInstance(applicationConfig.getName(), centerConfig.getGroup(),serviceInstance.getIp(),serviceInstance.getPort());
        } catch (NacosException e) {
            log.error("Easy-Rpc -> nacos center deregister error:{}", e.getErrMsg(), e);
        }
        log.info("Easy-Rpc -> nacos center deregister:[ serviceName:{} group:{} ] success",applicationConfig.getName(),centerConfig.getGroup());
    }

    @Override
    public List<ServiceInstance> getAllInstances(String serviceId) {
        List<ServiceInstance> serviceInstanceList = EasyRpcInstanceCache.getServiceInstanceList(serviceId);
        if(CollectionUtil.isEmpty(serviceInstanceList)){
            serviceInstanceList = new CopyOnWriteArrayList<>(new ArrayList<>(8));
            try {
                List<Instance> allInstances = nacosNamingService.getAllInstances(serviceId, centerConfig.getGroup());
                if(CollectionUtil.isNotEmpty(allInstances)){
                    for(Instance instance:allInstances){
                        serviceInstanceList.add(new ServiceInstance(instance.getIp(), instance.getPort()));
                    }
                }
            } catch (NacosException e) {
                log.error("Easy-Rpc -> nacos center deregister error:{}", e.getErrMsg(), e);
            }
            EasyRpcInstanceCache.updateServiceInstanceInfo(serviceId,serviceInstanceList);
        }
        return serviceInstanceList;
    }

    @Override
    public void subscribeInstance(String serviceId) {
        try {
            nacosNamingService.subscribe(serviceId, centerConfig.getGroup(), new AbstractEventListener() {
                @Override
                public Executor getExecutor() {
                    return ThreadPoolUtils.subscribeInstancePool;
                }

                @Override
                public void onEvent(Event event) {
                    NamingEvent namingEvent = (NamingEvent) event;
                    List<Instance> allInstances = namingEvent.getInstances();
                    List<ServiceInstance> serviceInstanceList = new CopyOnWriteArrayList<>(new ArrayList<>(8));
                    if(CollectionUtil.isNotEmpty(allInstances)){
                        for(Instance instance:allInstances){
                            serviceInstanceList.add(new ServiceInstance(instance.getIp(), instance.getPort()));
                        }
                    }
                    // 直接把本地的全量替换
                    EasyRpcInstanceCache.updateServiceInstanceInfo(serviceId,serviceInstanceList);
                }
            });
        } catch (NacosException e) {
            log.error("Easy-Rpc -> nacos center subscribe error:{}", e.getErrMsg(), e);
        }
        log.info("Easy-Rpc -> nacos center subscribe:[{}] success",serviceId);
    }
}

消费者相关

消费者不用想,凡是这种RPC的几乎都是动态代理,问题是用什么样的方法为它生成代理,我们以Feign为例看看它是怎么做的?

启动注解→扫描包下被注解标识的接口→获取封装信息→生成FeignClientFactoryBean注入容器

感兴趣的可以自己去看看,最终我们采用注解调用时,获取的无非是factoryBean.getObject(); 返回的动态代理对象罢了

想想这样合适吗?他需要扫描整个包然后生成代理对象再放入容器,要是这个对象压根没人用岂不是白生成了? 所以我这里改了一下,不再扫描包生成了,我在你属性注入时才生成!

自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface EasyRpcServiceInvoke {
    // 服务项ID
    String serviceId();
    // 服务发布的Bean 名称
    String beanRefName() default "";

}

此注解代表我需要获取远程调用的代理对象,如:

消费者后置处理器

所以我要在Bean属性注入的时候,为它注入一个动态代理对象,同时添加需要订阅的服务项

这个服务项在Nacos中就是服务名,就好比feign也需要指定一个服务名是吧

为什么dubbo不需要?因为dubbo在nacos不是以服务为单位,而是以暴露的接口服务为单位,如下:

代理对象生成

这里采用了Cglib动态代理,为什么不用jdk动态代理,主要是为了避免反射耗时,也方便以后拓展的灵活性

代理工厂:

public class CglibInvokeBeanProxyFactory {

    /**
     * @Author colins
     * @Description  获取客户端远程调用代理对象
     * @return T
     **/
    public static <T> T getClientInvokeProxy(Class<T> interfaceClass, String serviceId, String beanRef,String interfaces) throws Exception {
        return (T) Enhancer.create(interfaceClass, new CglibInvocationHandler(serviceId, beanRef,interfaces));
    }

}

代理处理类:

public class CglibInvocationHandler implements InvocationHandler {


    private final String serviceId;

    private final String beanRef;

    private final String interfaces;

    public CglibInvocationHandler(String serviceId, String beanRef, String interfaces) {
        this.serviceId = serviceId;
        this.beanRef = beanRef;
        this.interfaces = interfaces;
    }

    @Override
    public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
        List<ServiceInstance> serviceInstanceList = EasyRpcInstanceCache.getServiceInstanceList(serviceId);
        if (CollectionUtil.isEmpty(serviceInstanceList)) {
            throw new EasyRpcException(String.format("[ %d ] No corresponding service found ", serviceId));
        }

        // 构建请求参数
        EasyRpcRequest easyRpcRequest = new EasyRpcRequest(UUID.randomUUID().toString(),beanRef, interfaces, method.getName(), method.getParameterTypes(), objects);
        // 获取会话
        EasyRpcSession easyRpcSession = EasyRpcSessionFactory.getInstance().openSession(serviceId, easyRpcRequest, serviceInstanceList);
        // 会话执行调用
        return easyRpcSession.exec();
    }
}

生产者相关

自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
@Component
public @interface EasyRpcServicePublish {
    // 暴露出去的bean名称 默认就是bean默认名称
    String beanRefName() default "";
}

生产者后置处理器

主要两件事:

  • 收集暴露出去的bean集合
  • 收集需要暴露出去的元数据

为什么要收集暴露出去的服务bean?

仔细看一下上面的注解是一个复合注解,我暴露出去提供远程调用服务的对象在容器中不就是一个bean吗?所以如果有消费者调用暴露的服务,本质不就是调用远程容器中的bean对象吗? 所以我给它收集缓存起来,要是有别的服务远程调用过来,我直接走缓存反射执行方法就好了

容器相关

容器启动

需要做三件事,启动netty server 、发布服务、订阅服务

public class EasyRpcStartEvent implements ApplicationListener<ContextRefreshedEvent> {

    private EasyRpcConfig rpcConfig;

    private EasyRpcCenter rpcCenter;

    public EasyRpcStartEvent(EasyRpcConfig rpcConfig, EasyRpcCenter rpcCenter) {
        this.rpcConfig = rpcConfig;
        this.rpcCenter = rpcCenter;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 启动NettyServer
        ThreadPoolUtils.startNettyPool.execute(new EasyRpcServer(rpcConfig.getProtocol().getPort(), new EasyRpcServerHandlerInit()));
        // 发布服务
        rpcCenter.registerInstance(new ServiceInstance(rpcConfig.getProtocol().getPort(), EasyRpcSpringConstant.serviceMetaDataList));
        // 订阅服务
        EasyRpcSpringConstant.serviceIdList.forEach(item->{
            rpcCenter.subscribeInstance(item);
        });
    }
}

容器关闭

只需要让该服务注册下线即可,如果想优雅一点,可以把Netty相关也全关闭(这里先不处理)

public class EasyRpcCloseEvent implements ApplicationListener<ContextClosedEvent> {

    private EasyRpcConfig rpcConfig;

    private EasyRpcCenter rpcCenter;

    public EasyRpcCloseEvent(EasyRpcConfig rpcConfig, EasyRpcCenter rpcCenter) {
        this.rpcConfig = rpcConfig;
        this.rpcCenter = rpcCenter;
    }


    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        // 注销服务即可 其他中断无所谓
        // netty server下线 client会断开重连
        rpcCenter.deregisterInstance(new ServiceInstance(rpcConfig.getProtocol().getPort()));
    }
}

调用相关

消费者

消费者在调用的时候本质是执行代理对象中的逻辑,也就是往netty管道中写入数据,不在多说,看看发送数据的实体类

生产者

生产者就是根据传来的信息,拿到缓存中对象的bean对象反射执行方法,然后返回

测试结果

消费端16线程,所以均为一台机器(生产、消费、jmeter都在一台机器)

测试结果仅供参考,在好的条件下,某些配置设置合适点,吞吐量应该更高

无传参 无返回POJO传参 简单返回简单传参 POJO返回无传参 POJO_list返回
100并发 1000次请求 (共计10000)1W左右QPS9350左右QPS8900左右QPS7500左右QPS
300并发 1000次请求 (共计30000)9200左右QPS8700左右QPS8000左右QPS6800左右QPS
500并发 1000次请求 (共计50000)8500左右QPS7800左右QPS7800左右QPS6800左右QPS

后续拓展

从以上可以看到很多东西都还没实现,只能说基本雏形搭完了,后续什么SPI机制、服务治理相关、多协议拓展、配置丰富化、多注册中心等等都是可以做的事,最后再看一下架构吧

Last Updated: