geerpc笔记

GeeRPC笔记

RPC基础知识

为什么要使用RPC?RPC有什么优点?

可以把不同功能的模块通过进程隔离开,那么就可以对这些模块进行不同的形式的变换,而且可以确保不同模块的安全和稳定。

RPC的类型

RPC根据运行模式分为静态模式和动态模式:

静态模式:预编译接口,生成存根代码stub,与应用代码编译。C++ JAVA OBJc

image-20240719213225997

动态模式:依赖语言特性实现动态调用(反射、内省)。 python java php go

没有静态模式中的IDL编译器编译为STUB的过程,程序代码就是IDL,所以IDL+User Code构成了动态模式的RPC程序。

image-20240719213616213

同步调用和异步调用

同步调用

1
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) err

异步调用

1
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan*Call) *Call

Geerpc架构

这个项目的层次结构如下:

  • codec:编解码器
  • geerpc:RPC框架的核心实现
  • registry:注册中心
  • client:客户端
  • server:服务端
  • service:服务
  • xclient:客户端的扩展

重要结构定义如下:

header:

1
2
3
4
5
6
7
type Header struct {
ServiceMethod string // 格式 "Service.Method"
Seq uint64 // 序列号
Error string
}

// 没有专门定义body结构体,body是一个interface{}类型

client

1
2
3
4
5
6
7
8
9
10
11
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // 保证请求的有序发送
header codec.Header
mu sync.Mutex // 操作 client.pending 的互斥锁
seq uint64
pending map[uint64]*Call // 存储未处理完的请求
closing bool // user has called Close
shutdown bool // server has told us to stop
}

server:

1
2
3
type Server struct {
serviceMap sync.Map
}

Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
type methodType struct {
method reflect.Method // 方法
ArgType reflect.Type // 参数类型
ReplyType reflect.Type // 返回值类型
numCalls uint64 // 调用次数
}

type service struct {
name string // 服务名
typ reflect.Type // 服务类型
rcvr reflect.Value // 服务实例
method map[string]*methodType // 服务方法
}

Option:

1
2
3
4
5
6
type Option struct {
MagicNumber int // MagicNumber 标记表示这是一个geerpc request
CodecType codec.Type // client可以选择不同的Codec来encode body
ConnectTimeout time.Duration // 0 means no limit
HandleTimeout time.Duration
}

Codec:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Codec interface {
io.Closer // 关闭连接
ReadHeader(*Header) error // 读取请求头
ReadBody(interface{}) error // 读取请求体
Write(*Header, interface{}) error // 写入请求
}

const (
GobType Type = "application/gob"
JsonType Type = "application/json" // not implemented
)


type GobCodec struct {
conn io.ReadWriteCloser // 连接实例
buf *bufio.Writer // 写入缓冲区
dec *gob.Decoder // 解码器
enc *gob.Encoder // 编码器
}

registry

1
2
3
4
5
6
7
8
// GeeRegistry 是一个简单的注册中心,提供以下功能:
// - 添加服务实例并接收心跳以保持其存活。
// - 返回所有存活的服务实例,并同步删除死亡的服务实例。
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex // protect following
servers map[string]*ServerItem
}

request:

1
2
3
4
5
6
7
// request 存储了调用的所有信息
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
mtype *service.MethodType
svc *service.Service
}

消息的序列化与反序列化

1
2
3
4
5
6
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的 Type 得到构造函数,从而创建 Codec 实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
GobType Type = "application/gob"
JsonType Type = "application/json" // not implemented
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec
}

编码协议

GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|

客户端

在使用 Go 语言的 net/rpc 包进行远程过程调用(RPC)时,一个函数需要满足以下五个条件才能被远程调用:

  • 函数类型是导出的:函数必须是一个导出的函数,也就是说函数名的首字母必须大写。例如,Func 是可以被导出的,而 func 则不行。
  • 函数所属类型必须是导出的:函数必须属于一个导出的类型,即该类型的名字首字母也必须大写。例如,type MyType struct {} 是可以被导出的,而 type myType struct {} 则不行。
  • 函数必须有两个参数,而且这两个参数都必须是导出的类型或者内建类型的。第一个参数是输入参数,第二个参数是返回参数。举例来说,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error 是一个符合条件的函数。
  • 第二个参数必须是指针类型:函数的第二个参数必须是指向一个类型的指针,这个类型的首字母必须大写。例如,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error,其中 arg2 是指向 ArgType2 的指针。
  • 函数必须返回一个错误类型:函数的返回值必须是一个 error 类型,这样才能在调用过程中传递错误信息。例如,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error 中,函数返回一个 error 类型的值。

更直观一些:

1
func (t *T) MethodName(argType T1, replyType *T2) error

根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC 调用所需要的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Call 表示一个活跃的RPC
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{} // arguments to the function
Reply interface{} // reply from the function
Error error // if error occurs, it will be set
Done chan *Call // Strobes when call is complete.
}

func (call *Call) done() {
call.Done <- call
}

为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是 chan *Call,当调用结束时,会调用 call.done() 通知调用方。

实现Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Client 是一个 RPC 客户端,它可以有多个未处理完的 Call,且可以被多个 goroutine 同时使用
// Client 通过发送 Call 实例来调用远程服务
// Client 会将 Call 实例放入 client.pending 中,等待服务端处理
// Client 会通过 client.receive() 方法来接收服务端的响应
// Client 会通过 client.send() 方法来发送请求
type Client struct {
cc codec.Codec
opt *geerpc.Option
sendingMtx sync.Mutex // 保证请求的有序发送
header codec.Header
mtx sync.Mutex // 操作 client.pending 的互斥锁
seq uint64
pending map[uint64]*Call // 存储未处理完的请求
closing bool // user has called Close
shutdown bool // server has told us to stop
}

var _ io.Closer = (*Client)(nil)

var ErrShutdown = errors.New("connection is shut down")

// Close the connection
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

Client 的字段比较复杂:

  • cc 是消息的编解码器,和服务端类似,用来序列化将要发送出去的请求,以及反序列化接收到的响应。
  • sending 是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
  • header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。
  • seq 用于给发送的请求编号,每个请求拥有唯一编号。
  • pending 存储未处理完的请求,键是编号,值是 Call 实例。
  • closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 Close 方法,而 shutdown 置为 true 一般是有错误发生。

然后声明了三个函数:

  • registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq。
  • removeCall:根据 seq,从 client.pending 中移除对应的 call,并返回。
  • terminateCalls:服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。

对一个客户端端来说,接收响应、发送请求是最重要的 2 个功能。那么首先实现接收功能,接收到的响应有三种情况:

  • call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
  • call 存在,但服务端处理出错,即 h.Error 不为空。
  • call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// error occurs, so terminateCalls pending calls
client.terminateCalls(err)
}

创建 Client 实例时,首先需要完成一开始的协议交换,即发送 Option 信息给服务端。协商好消息的编解码方式之后,再创建一个子协程调用 receive() 接收响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// send options with server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}

func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive()
return client
}

send函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()

// register this call.
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}

// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""

// encode and send the request
if err := client.cc.Write(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
// call may be nil, it usually means that Write partially failed,
// client has received the response and handled
if call != nil {
call.Error = err
call.done()
}
}
}

// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}

// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
  • GoCall 是客户端暴露给用户的两个 RPC 服务调用接口,Go 是一个异步接口,返回 call 实例。
  • Call 是对 Go 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。

服务器端

  • 首先定义了结构体 Server,没有任何的成员字段。
  • 实现了 Accept 方式,net.Listener 作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了 ServerConn 方法。
  • DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。

ServeConn 的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder 反序列化得到 Option 实例,检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器,接下来的处理交给 serverCodec

serveCodec 的过程非常简单。主要包含三个阶段

  • 读取请求 readRequest
  • 处理请求 handleRequest
  • 回复请求 sendResponse

之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:

  • handleRequest 使用了协程并发执行请求。
  • 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
  • 尽力而为,只有在 header 解析失败时,才终止循环。

服务注册

假设客户端发过来一个请求,包含 ServiceMethod 和 Argv。

1
2
3
4
{
"ServiceMethod""T.MethodName"
"Argv""0101110101..." // 序列化之后的字节流
}

通过 “T.MethodName” 可以确定调用的是类型 T 的 MethodName,如果硬编码实现这个功能,很可能是这样:

1
2
3
4
5
6
7
8
9
10
11
12
switch req.ServiceMethod {
case "T.MethodName":
t := new(t)
reply := new(T2)
var argv T1
gob.NewDecoder(conn).Decode(&argv)
err := t.MethodName(argv, reply)
server.sendMessage(reply, err)
case "Foo.Sum":
f := new(Foo)
...
}

也就是说,如果使用硬编码的方式来实现结构体与服务的映射,那么每暴露一个方法,就需要编写等量的代码。那有没有什么方式,能够将这个映射过程自动化呢?可以借助反射。

通过反射,我们能够非常容易地获取某个结构体的所有方法,并且能够通过方法,获取到该方法所有的参数类型与返回值。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
var wg sync.WaitGroup
typ := reflect.TypeOf(&wg)
for i := 0; i < typ.NumMethod(); i++ {
method := typ.Method(i)
argv := make([]string, 0, method.Type.NumIn())
returns := make([]string, 0, method.Type.NumOut())
// j 从 1 开始,第 0 个入参是 wg 自己。
for j := 1; j < method.Type.NumIn(); j++ {
argv = append(argv, method.Type.In(j).Name())
}
for j := 0; j < method.Type.NumOut(); j++ {
returns = append(returns, method.Type.Out(j).Name())
}
log.Printf("func (w *%s) %s(%s) %s",
typ.Elem().Name(),
method.Name,
strings.Join(argv, ","),
strings.Join(returns, ","))
}
}

运行的结果是:

1
2
3
func (w *WaitGroup) Add(int)
func (w *WaitGroup) Done()
func (w *WaitGroup) Wait()

这里通过反射实现了服务注册:

第一步,定义结构体 methodType:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint64
}

func (m *methodType) NumCalls() uint64 {
return atomic.LoadUint64(&m.numCalls)
}

func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// arg may be a pointer type, or a value type
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem())
} else {
argv = reflect.New(m.ArgType).Elem()
}
return argv
}

func (m *methodType) newReplyv() reflect.Value {
// reply must be a pointer type
replyv := reflect.New(m.ReplyType.Elem())
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}

每一个 methodType 实例包含了一个方法的完整信息。包括

  • method:方法本身
  • ArgType:第一个参数的类型
  • ReplyType:第二个参数的类型
  • numCalls:后续统计方法调用次数时会用到

另外,我们还实现了 2 个方法 newArgvnewReplyv,用于创建对应类型的实例。newArgv 方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。

第二步,定义结构体 service:

1
2
3
4
5
6
type service struct {
name string
typ reflect.Type
rcvr reflect.Value
method map[string]*methodType
}

service 的定义也是非常简洁的,name 即映射的结构体的名称,比如 T,比如 WaitGroup;typ 是结构体的类型;rcvr 即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0 个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。

1
2
3
4
5
6
7
8
9
10
11
12
// newService 把 rcvr 对象转换为 service 实例
func newService(rcvr interface{}) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr)
s.name = reflect.Indirect(s.rcvr).Type().Name()
s.typ = reflect.TypeOf(rcvr)
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods()
return s
}

超时处理

超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。

纵观整个远程调用的过程,需要客户端处理超时的地方有:

  • 与服务端建立连接,导致的超时
  • 发送请求到服务端,写报文导致的超时
  • 等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
  • 从服务端接收响应时,读报文导致的超时

需要服务端处理超时的地方有:

  • 读取客户端请求报文时,读报文导致的超时
  • 发送响应报文时,写报文导致的超时
  • 调用映射服务的方法时,处理报文导致的超时

GeeRPC 在 3 个地方添加了超时处理机制。分别是:

1)客户端创建连接时 2)客户端 Client.Call() 整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段) 3)服务端处理报文,即 Server.handleRequest 超时。

Q:为什么在其他地方不添加超时处理呢?有什么难度?

创建连接超时

为了实现上的简单,将超时设定放在了 Option 中。ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。

1
2
3
4
5
6
7
8
9
10
11
12
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
ConnectTimeout time.Duration // 0 means no limit
HandleTimeout time.Duration
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
}

客户端连接超时,只需要为 Dial 添加一层超时处理的外壳即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type clientResult struct {
client *Client
err error
}

type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)

func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
return nil, err
}
// close the connection if client is nil
defer func() {
if err != nil {
_ = conn.Close()
}
}()
ch := make(chan clientResult)
go func() {
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
result := <-ch
return result.client, result.err
}
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}
}

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewClient, network, address, opts...)
}

重点是使用select实现超时

1
2
3
4
5
6
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}

支持http协议

Web 开发中,我们经常使用 HTTP 协议中的 HEAD、GET、POST 等方式发送请求,等待响应。但 RPC 的消息格式与标准的 HTTP 协议并不兼容,在这种情况下,就需要一个协议的转换过程。HTTP 协议的 CONNECT 方法恰好提供了这个能力,CONNECT 一般用于代理服务。

假设浏览器与服务器之间的 HTTPS 通信都是加密的,浏览器通过代理服务器发起 HTTPS 请求时,由于请求的站点地址和端口号都是加密保存在 HTTPS 请求报文头中的,代理服务器如何知道往哪里发送请求呢?为了解决这个问题,浏览器通过 HTTP 明文形式向代理服务器发送一个 CONNECT 请求告诉代理服务器目标地址和端口,代理服务器接收到这个请求后,会在对应端口与目标站点建立一个 TCP 连接,连接建立成功后返回 HTTP 200 状态码告诉浏览器与该站点的加密通道已经完成。接下来代理服务器仅需透传浏览器和服务器之间的加密数据包即可,代理服务器无需解析 HTTPS 报文。

举一个简单例子:

  1. 浏览器向代理服务器发送 CONNECT 请求。
1
CONNECT geektutu.com:443 HTTP/1.0
  1. 代理服务器返回 HTTP 200 状态码表示连接已经建立。
1
HTTP/1.0 200 Connection Established
  1. 之后浏览器和服务器开始 HTTPS 握手并交换加密数据,代理服务器只负责传输彼此的数据包,并不能读取具体数据内容(代理服务器也可以选择安装可信根证书解密 HTTPS 报文)。

事实上,这个过程其实是通过代理服务器将 HTTP 协议转换为 HTTPS 协议的过程。对 RPC 服务端来,需要做的是将 HTTP 协议转换为 RPC 协议,对客户端来说,需要新增通过 HTTP CONNECT 请求创建连接的逻辑。

那通信过程应该是这样的:

  1. 客户端向 RPC 服务器发送 CONNECT 请求
1
CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0
  1. RPC 服务器返回 HTTP 200 状态码表示连接建立。
1
HTTP/1.0 200 Connected to Gee RPC
  1. 客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N 个请求报文,服务端处理 RPC 请求并响应。

server.go 中新增如下的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const (
connected = "200 Connected to Gee RPC"
defaultRPCPath = "/_geeprc_"
defaultDebugPath = "/debug/geerpc\ y
)

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack() // 将w转换为http.Hijacker接口类型,然后调用Hijack方法返回底层的网络连接
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server)
}

// HandleHTTP is a convenient approach for default server to register HTTP handlers
func HandleHTTP() {
DefaultServer.HandleHTTP()
}

关键代码:

1
conn, _, err := w.(http.Hijacker).Hijack() // 将w转换为http.Hijacker接口类型,然后调用Hijack方法返回底层的网络连接

为了实现上的简单,GeeRegistry 采用 HTTP 协议提供服务,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Runs at /_geerpc_/registry
// - params
// - w http.ResponseWriter: write response
// - req *http.Request: read request
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET": // 获取所有服务实例
// keep it simple, server is in req.Header
w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))
case "POST": // 添加服务实例或发送心跳
// keep it simple, server is in req.Header
addr := req.Header.Get("X-Geerpc-Server")
if addr == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
r.putServer(addr) // putServer 方法用于添加服务实例或发送心跳,如果已经添加的服务实例,更新其最后一次发送心跳的时间
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}

负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Discovery interface {
Refresh() error // refresh from remote registry
Update(servers []string) error
Get(mode SelectMode) (string, error)
GetAll() ([]string, error)
}

type XClient struct {
d Discovery
mode SelectMode
opt *Option
mu sync.Mutex // protect following
clients map[string]*Client
}

同时封装了一个XClient,使用clients的map来存储addr到client的映射,如何获取一个client,使用discovery.Get函数获取.

我们将复用 Client 的能力封装在方法 dial 中,dial 的处理逻辑如下:

  1. 检查 xc.clients 是否有缓存的 Client,如果有,检查是否是可用状态,如果是则返回缓存的 Client,如果不可用,则从缓存中删除。
  2. 如果步骤 1) 没有返回缓存的 Client,则说明需要创建新的 Client,缓存并返回。

紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// MultiServersDiscovery 是一个不需要注册中心的多服务器发现器
// 用户需要显式地提供服务器地址
type MultiServersDiscovery struct {
r *rand.Rand // generate random number
mu sync.RWMutex // protect following
servers []string
index int // record the selected position for robin algorithm
}

// NewMultiServerDiscovery 创建一个MultiServersDiscovery实例
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {
d := &MultiServersDiscovery{
servers: servers,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
d.index = d.r.Intn(math.MaxInt32 - 1)
return d
}
  • r 是一个产生随机数的实例,初始化时使用时间戳设定随机数种子,避免每次产生相同的随机数序列。
  • index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。

首先实现一个最基础的服务发现模块 Discovery。为了与通信部分解耦,这部分的代码统一放置在 xclient 子目录下。

定义 2 个类型:

  • SelectMode 代表不同的负载均衡策略,简单起见,GeeRPC 仅实现 Random 和 RoundRobin 两种策略。
  • Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
    • Refresh() 从注册中心更新服务列表
    • Update(servers []string) 手动更新服务列表
    • Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
    • GetAll() 返回所有的服务实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package xclient

import (
"errors"
"math"
"math/rand"
"sync"
"time"
)

type SelectMode int

const (
RandomSelect SelectMode = iota // select randomly
RoundRobinSelect // select using Robbin algorithm
)

type Discovery interface {
Refresh() error // refresh from remote registry
Update(servers []string) error
Get(mode SelectMode) (string, error)
GetAll() ([]string, error)
}

服务发现和注册中心

geerpc registry

注册中心的位置如上图所示。注册中心的好处在于,客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。更具体一些:

  1. 服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着。
  2. 客户端向注册中心询问,当前哪天服务是可用的,注册中心将可用的服务列表返回客户端。
  3. 客户端根据注册中心得到的服务列表,选择其中一个发起调用。

如果没有注册中心,客户端需要硬编码服务端的地址,而且没有机制保证服务端是否处于可用状态。当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcdzookeeperconsul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口很多。GeeRPC 选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,任何注册的服务超过 5 min,即视为不可用状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// GeeRegistry is a simple register center, provide following functions.
// add a server and receive heartbeat to keep it alive.
// returns all alive servers and delete dead servers sync simultaneously.
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex // protect following
servers map[string]*ServerItem
}

type ServerItem struct {
Addr string
start time.Time
}

const (
defaultPath = "/_geerpc_/registry"
defaultTimeout = time.Minute * 5
)

// New create a registry instance with timeout setting
func New(timeout time.Duration) *GeeRegistry {
return &GeeRegistry{
servers: make(map[string]*ServerItem),
timeout: timeout,
}
}

var DefaultGeeRegister = New(defaultTimeout)

为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (r *GeeRegistry) putServer(addr string) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.servers[addr]
if s == nil {
r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
} else {
s.start = time.Now() // if exists, update start time to keep alive
}
}

func (r *GeeRegistry) aliveServers() []string {
r.mu.Lock()
defer r.mu.Unlock()
var alive []string
for addr, s := range r.servers {
if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
alive = append(alive, addr)
} else {
delete(r.servers, addr)
}
}
sort.Strings(alive)
return alive
}

为了实现上的简单,GeeRegistry 采用 HTTP 协议提供服务,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Runs at /_geerpc_/registry
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET":
// keep it simple, server is in req.Header
w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))
case "POST":
// keep it simple, server is in req.Header
addr := req.Header.Get("X-Geerpc-Server")
if addr == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
r.putServer(addr)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}

// HandleHTTP registers an HTTP handler for GeeRegistry messages on registryPath
func (r *GeeRegistry) HandleHTTP(registryPath string) {
http.Handle(registryPath, r)
log.Println("rpc registry path:", registryPath)
}

func HandleHTTP() {
DefaultGeeRegister.HandleHTTP(defaultPath)
}

有哪些问题待解决?

  • 调用超时问题
  • 单向调用
  • 幂等问题:client发送请求R1之后由于网络拥塞导致没有及时送达server,从而重传R2,但是在server端收到了同时收到了R1和R2,这样的幂等问题,即如何在serve端只处理一次请求?
  • 反向调用(NAT穿越):如果一个机器在公网,另一个在内网,那么从内网调用公网是可以的,但是反过来是不行的,所以如何支持反向调用(Go语言的RPC框架就没有支持
  • 更多的协议支持
  • MQ异步

面试问题

介绍一下你的项目

我做的这个Geerpc是一个从零实现 Go 语言官方的标准库 net/rpc的项目,并在此基础上,新增了协议交换(protocol exchange)注册中心(registry)服务发现(service discovery)负载均衡(load balance)超时处理(timeout processing)等特性。

项目的结构大致分为codec(编解码器)、client、server、registry、discovery。

抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的 Type 得到构造函数,从而创建 Codec 实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。项目可以支持多种不同的codeC但是目前只支持了Gob。

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|

通过go语言的反射实现服务注册,首先定义了一个methodType类型,包括method:方法本身、ArgType:第一个参数的类型、ReplyType:第二个参数的类型、numCalls:方法调用次数。还实现了 2 个方法 newArgvnewReplyv,用于创建对应类型的实例。newArgv 方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。然后定义了service结构体,service 的定义也是非常简洁的,name 即映射的结构体的名称,比如 T,比如 WaitGroup;typ 是结构体的类型;rcvr 即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0 个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。构造函数 newService,入参是任意需要映射为服务的结构体实例。

如何实现的超时处理?

纵观整个远程调用的过程,需要客户端处理超时的地方有:

  • 与服务端建立连接,导致的超时
  • 发送请求到服务端,写报文导致的超时
  • 等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
  • 从服务端接收响应时,读报文导致的超时

需要服务端处理超时的地方有:

  • 读取客户端请求报文时,读报文导致的超时
  • 发送响应报文时,写报文导致的超时
  • 调用映射服务的方法时,处理报文导致的超时

GeeRPC 在 3 个地方添加了超时处理机制。分别是:

1)客户端创建连接时 2)客户端 Client.Call() 整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段) 3)服务端处理报文,即 Server.handleRequest 超时。

为了实现上的简单,将超时设定放在了 Option 中。ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。

创建连接

客户端连接超时,只需要为 Dial 添加一层超时处理的外壳即可。在这里实现了一个超时处理的外壳 dialTimeout,这个壳将 NewClient 作为入参,在 2 个地方添加了超时处理的机制。

  1. net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
  2. 使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。怎么实现?(使用select语句)

关键代码:

1
2
3
4
5
6
select {		// 使用select判断先超时还是先收到消息
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}

Client.Call 超时:

Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。

1
2
3
4
5
6
7
8
9
10
11
12
// Call 调用指定的函数,等待它完成,并返回其错误状态。
// 该函数会阻塞,直到调用完成。
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}

用户可以使用 context.WithTimeout 创建具备超时检测能力的 context 对象来控制。例如:

1
2
3
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)

服务端处理超时:

这一部分的实现与客户端很接近,使用 time.After() 结合 select+chan 完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
called := make(chan struct{})
sent := make(chan struct{})
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv)
called <- struct{}{}
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()

if timeout == 0 {
<-called
<-sent
return
}
select {
case <-time.After(timeout):
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}

这里需要确保 sendResponse 仅调用一次,因此将整个过程拆分为 calledsent 两个阶段,在这段代码中只会发生如下两种情况:

  1. called 信道接收到消息,代表处理没有超时,继续执行 sendResponse。
  2. time.After() 先于 called 接收到消息,说明处理已经超时,called 和 sent 都将被阻塞。在 case <-time.After(timeout) 处调用 sendResponse

如何实现服务注册?

因为是仿写的net/rpc,对 net/rpc 而言,一个函数需要能够被远程调用,需要满足如下五个条件:

  • the method’s type is exported. – 方法所属类型是导出的。
  • the method is exported. – 方式是导出的。
  • the method has two arguments, both exported (or builtin) types. – 两个入参,均为导出或内置类型。
  • the method’s second argument is a pointer. – 第二个入参必须是一个指针。
  • the method has return type error. – 返回值为 error 类型。

更直观一些:

1
func (t *T) MethodName(argType T1, replyType *T2) error

为了实现服务注册,首先定义了methodType :

1
2
3
4
5
6
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint64
}
  • method:方法本身
  • ArgType:第一个参数的类型
  • ReplyType:第二个参数的类型
  • numCalls:后续统计方法调用次数时会用到

然后定义结构体 service:

1
2
3
4
5
6
type service struct {
name string
typ reflect.Type
rcvr reflect.Value
method map[string]*methodType
}

service 的定义也是非常简洁的,name 即映射的结构体的名称,比如 T,比如 WaitGroup;typ 是结构体的类型;rcvr 即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0 个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。

如何支持HTTP协议?

服务器端

那通信过程应该是这样的:

  1. 客户端向 RPC 服务器发送 CONNECT 请求
1
CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0
  1. RPC 服务器返回 HTTP 200 状态码表示连接建立。
1
HTTP/1.0 200 Connected to Gee RPC
  1. 客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N 个请求报文,服务端处理 RPC 请求并响应。

server端使用hijack函数。

客户端

如何实现服务注册

反射

如何实现负载均衡

discovery,目前设置了随机和轮询算法。

服务发现和注册中心

为什么要用注册中心?

geerpc registry

注册中心的位置如上图所示。注册中心的好处在于,客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。更具体一些:

  1. 服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着。
  2. 客户端向注册中心询问,当前哪天服务是可用的,注册中心将可用的服务列表返回客户端。
  3. 客户端根据注册中心得到的服务列表,选择其中一个发起调用。

注册中心实现

为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。

为了实现上的简单,GeeRegistry 采用 HTTP 协议提供服务,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。

geerpc笔记
https://gstarmin.github.io/2024/07/13/geerpc笔记/
作者
Starmin
发布于
2024年7月13日
许可协议