用最简单的方式实现 Rust RPC 服务

RPC是对进程内函数调用的分布式开展,它将进程内的函数调用,扩展成对远程机器上的相应的函数的调用。如何使用最少的代码,用最容易的方式调用远程函数呢?

首先需要知道远程机器的IP地址和端口,然后呢,使用原先进程内的方式直接调用即可,这是最容易的RPC调用。容易不意味着功能简单,而是rpc库背后默默为你承担了序列化、网络传输、路由、服务调用、服务治理的功能。

rpcx 秉承 Go 标准库简单而强大的设计理念,为 Rust 提供了一个原生的 rpc 库。

为什么说是原生呢?因为你可以使用任何你熟悉的编程语言通过HTTP或者JSON-RPC2.0的方式访问Go或者Java实现的rpcx服务,但是除了Go/Java编程语言你没有办法使用raw的rpcx protocol实现TCP的网络调用,而基于TCP的RPC性能要远远高于Request-Response这种类HTTP的调用模型。

现在rpcx for rust的库也发布了: rpcx-rs 。 你可以使用它原生的访问Go或者Java实现的rpcx服务,也可以使用Rust提供rpcx服务。

关键是,依然是那么的简单。

目前 rpcx-rs 发布了 0.1.2 版本,可以方便的将 Rust 函数暴露成rpcx服务,也可以使用Rust访问rpcx服务,支持 JSON 和 MessagePack 两种序列化方式。

rpcx-rs秉承着Go语言版本的信念,采用最简单的方式实现rpc服务,小到一年级的小学生,大到花甲之年的老太太,都能轻松的实现rpc服务。

后续的功能会持续增加,包括protobuf序列化的支持、服务治理各种功能(路由、失败处理、重试、熔断器、限流)、监控(metrics、trace)、注册中心(etcd、consul)等众多的功能。

那么,让我们看看如何开发一个rust实现的rpcx服务,以及如何调用,相应的Go语言的代码也一并列出。

我们的例子是一个 乘法 服务,客户端传入两个整数, 服务器端计算出两个数的 ,然后返回给客户端。

客户端和服务器端公用的数据

客户端和服务器端交互,一般不会使用字节数组,而是封装的数据对象,像Rust和Go语言中的 struct 、Java中的Class等,所以我们先定义好交流用的数据结构,这样服务器和客户端都可以使用了。

你要在你的Cargo.toml文件中引入rpcx库:

rpcx =  "0.1.2"

然后定义数据结构。

use std::error::Error as StdError;

use rmp_serde as rmps; 
use serde::{Deserialize, Serialize};

use rpcx::*;

#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddArgs {
    #[serde(rename = "A")]
    pub a: u64,
    #[serde(rename = "B")]
    pub b: u64,
}
#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddReply {
    #[serde(rename = "C")]
    pub c: u64,
}

type Args struct {
    A int
    B int
}

type Reply struct {
    C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
    return nil
}

我们使用 serde 实现 JSON 和 MessagePack 序列化格式的支持。你不必手工实现 RpcxParam trait,通过 derive 属性就可以自动为你的数据架构加上JSON 和 MessagePack 序列化格式的支持。同时你也需要在derive属性上加上 DefaultSerialize , Deserialize ,以便rpcx实现自动的序列化和初始化。

当然这一切都是通过属性完成的,你只需要定义数据结构即可。为了和Go默认的JSON属性相一致(GO默认序列化好的字段名称首字母是大写的),我们这里也加上了serde的属性,让serde进行 JSON序列化的时候使用指定的名称。

这里我们定义了传入参数 ArithAddArgs 和传出参数( ArithAddReply )。

服务端实现

服务器端实现了一个函数 mul ,函数的名称不重要,因为我们注册的时候会手工传入服务的路径名称和方法名称,也就是Go语言中实现的 service_pathservice_method

函数的参数类型是 ArithAddArgs ,输出结果的类型是 ArithAddReply

在main函数中我们新建了一个服务器,线程池中线程的数量默认是CPU的两倍,你也可以根据需要设置更大的线程数量。

然后我们注册了这个服务(函数),使用 register_func! 宏来进行注册,第一个参数是服务器实例,第二个参数是 service_path 、第三个参数是 service_method ,第四个参数是要注册的函数,第五个和第六个参数是函数的传入参数和传出参数的类型。

然后调用服务的 start 就可以提供服务了。

use mul_model::{ArithAddArgs, ArithAddReply};
use rpcx::*;

fn mul(args: ArithAddArgs) -> ArithAddReply {
    ArithAddReply { c: args.a * args.b }
}

fn main() {
    let mut rpc_server = Server::new("0.0.0.0:8972".to_owned(), 0);

    register_func!(rpc_server, "Arith", "Mul", mul, ArithAddArgs, ArithAddReply);

    rpc_server.start().unwrap();
}

package main

import (
    "context"
    "flag"
    "fmt"

    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/server"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

type Arith struct{}

// the second parameter is not a pointer
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
    reply.C = args.A * args.B
    fmt.Println("C=", reply.C)
    return nil
}

func main() {
    flag.Parse()

    s := server.NewServer()
    //s.Register(new(Arith), "")
    s.RegisterName("Arith", new(Arith), "")
    err := s.Serve("tcp", *addr)
    if err != nil {
        panic(err)
    }
}

客户端实现

客户端使用 Client::new 先连接一个服务器,你可以指定序列化格式,服务器也会使用你的这个序列化格式返回结果,然后指定要调用的 service_pathservice_method , 使用 call 返回结果(Option类型,因为有的调用不需要返回结果,返回结果是Result类型,可能是成功的结果,也可能是Error); 使用 acall 异步调用,会返回一个future。

你可以和Go语言的版本交叉着互相调用,可以看到很容易的我们就实现了rpc服务和调用。

use std::collections::hash_map::HashMap;

use mul_model::*;
use rpcx::Client;
use rpcx::{Result, SerializeType};

pub fn main() {
    let mut c: Client = Client::new("127.0.0.1:8972");
    c.start().map_err(|err| println!("{}", err)).unwrap();
    c.opt.serialize_type = SerializeType::JSON;

    let mut a = 1;
    loop {
        let service_path = String::from("Arith");
        let service_method = String::from("Mul");
        let metadata = HashMap::new();
        let args = ArithAddArgs { a: a, b: 10 };
        a = a + 1;

        let reply: Option<Result> =
            c.call(service_path, service_method, false, metadata, &args);
        if reply.is_none() {
            continue;
        }

        let result_reply = reply.unwrap();
        match result_reply {
            Ok(r) => println!("received: {:?}", r),
            Err(err) => println!("received err:{}", err),
        }
    }
}

package main

import (
    "context"
    "flag"
    "log"

    "github.com/smallnest/rpcx/protocol"

    example "github.com/rpcx-ecosystem/rpcx-examples3"
    "github.com/smallnest/rpcx/client"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    opt := client.DefaultOption
    opt.SerializeType = protocol.JSON

    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
    defer xclient.Close()

    args := example.Args{
        A: 10,
        B: 20,
    }

    reply := &example.Reply{}
    err := xclient.Call(context.Background(), "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}