GeeRPC笔记
RPC基础知识
为什么要使用RPC?RPC有什么优点?
可以把不同功能的模块通过进程隔离开,那么就可以对这些模块进行不同的形式的变换,而且可以确保不同模块的安全和稳定。
RPC的类型
RPC根据运行模式分为静态模式和动态模式:
静态模式 :预编译接口,生成存根代码stub,与应用代码编译。C++
JAVA OBJc
image-20240719213225997
动态模式 :依赖语言特性实现动态调用(反射、内省)。
python java php go
没有静态模式中的IDL编译器编译为STUB的过程,程序代码就是IDL,所以IDL+User
Code构成了动态模式 的RPC程序。
image-20240719213616213
同步调用和异步调用
同步调用 :
1 func (client *Client) Call(serviceMethod string , args interface {}, reply interface {}) err
异步调用 :
1 func (client *Client) Go(serviceMethod string , args interface {}, reply interface {}, done chan *Call) *Call
Geerpc架构
这个项目的层次结构如下:
codec:编解码器
geerpc:RPC框架的核心实现
registry:注册中心
client:客户端
server:服务端
service:服务
xclient:客户端的扩展
重要结构定义如下:
header :
1 2 3 4 5 6 7 type Header struct { ServiceMethod string Seq uint64 Error string }
client :
1 2 3 4 5 6 7 8 9 10 11 type Client struct { cc codec.Codec opt *Option sending sync.Mutex header codec.Header mu sync.Mutex seq uint64 pending map [uint64 ]*Call closing bool shutdown bool }
server :
1 2 3 type Server struct { serviceMap sync.Map }
Service :
1 2 3 4 5 6 7 8 9 10 11 12 13 type methodType struct { method reflect.Method ArgType reflect.Type ReplyType reflect.Type numCalls uint64 }type service struct { name string typ reflect.Type rcvr reflect.Value method map [string ]*methodType }
Option :
1 2 3 4 5 6 type Option struct { MagicNumber int CodecType codec.Type ConnectTimeout time.Duration HandleTimeout time.Duration }
Codec :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 type Codec interface { io.Closer ReadHeader(*Header) error ReadBody(interface {}) error Write(*Header, interface {}) error }const ( GobType Type = "application/gob" JsonType Type = "application/json" )type GobCodec struct { conn io.ReadWriteCloser buf *bufio.Writer dec *gob.Decoder enc *gob.Encoder }
registry :
1 2 3 4 5 6 7 8 type GeeRegistry struct { timeout time.Duration mu sync.Mutex servers map [string ]*ServerItem }
request :
1 2 3 4 5 6 7 type request struct { h *codec.Header argv, replyv reflect.Value mtype *service.MethodType svc *service.Service }
消息的序列化与反序列化
1 2 3 4 5 6 type Codec interface { io.Closer ReadHeader(*Header) error ReadBody(interface {}) error Write(*Header, interface {}) error }
紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的
Type
得到构造函数,从而创建 Codec
实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type NewCodecFunc func (io.ReadWriteCloser) Codectype Type string const ( GobType Type = "application/gob" JsonType Type = "application/json" )var NewCodecFuncMap map [Type]NewCodecFuncfunc init () { NewCodecFuncMap = make (map [Type]NewCodecFunc) NewCodecFuncMap[GobType] = NewGobCodec }
编码协议
GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body
的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码
Option,然后通过 Option 的 CodeType
解码剩余的内容。即报文将以这样的形式发送:
1 2 | Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} | | <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
客户端
在使用 Go 语言的 net/rpc
包进行远程过程调用(RPC)时,一个函数需要满足以下五个条件才能被远程调用:
函数类型是导出的 :函数必须是一个导出的函数,也就是说函数名的首字母必须大写。例如,Func
是可以被导出的,而 func
则不行。
函数所属类型必须是导出的 :函数必须属于一个导出的类型,即该类型的名字首字母也必须大写。例如,type MyType struct {}
是可以被导出的,而 type myType struct {}
则不行。
函数必须有两个参数 ,而且这两个参数都必须是导出的类型或者内建类型的。第一个参数是输入参数,第二个参数是返回参数。举例来说,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error
是一个符合条件的函数。
第二个参数必须是指针类型 :函数的第二个参数必须是指向一个类型的指针,这个类型的首字母必须大写。例如,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error
,其中
arg2
是指向 ArgType2
的指针。
函数必须返回一个错误类型 :函数的返回值必须是一个
error
类型,这样才能在调用过程中传递错误信息。例如,func (t *MyType) MyMethod(arg1 ArgType1, arg2 *ArgType2) error
中,函数返回一个 error
类型的值。
更直观一些:
1 func (t *T) MethodName(argType T1, replyType *T2) error
根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC
调用所需要的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 type Call struct { Seq uint64 ServiceMethod string Args interface {} Reply interface {} Error error Done chan *Call }func (call *Call) done() { call.Done <- call }
为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是
chan *Call
,当调用结束时,会调用 call.done()
通知调用方。
实现Client :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 type Client struct { cc codec.Codec opt *geerpc.Option sendingMtx sync.Mutex header codec.Header mtx sync.Mutex seq uint64 pending map [uint64 ]*Call closing bool shutdown bool }var _ io.Closer = (*Client)(nil )var ErrShutdown = errors.New("connection is shut down" )func (client *Client) Close() error { client.mu.Lock() defer client.mu.Unlock() if client.closing { return ErrShutdown } client.closing = true return client.cc.Close() }func (client *Client) IsAvailable() bool { client.mu.Lock() defer client.mu.Unlock() return !client.shutdown && !client.closing }
Client 的字段比较复杂:
cc
是消息的编解码器,和服务端类似,用来序列化将要发送出去的请求,以及反序列化接收到的响应。
sending
是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
header 是每个请求的消息头,header
只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在
Client 结构体中可以复用。
seq 用于给发送的请求编号,每个请求拥有唯一编号。
pending 存储未处理完的请求,键是编号,值是 Call 实例。
closing 和 shutdown 任意一个值置为 true,则表示 Client
处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用
Close
方法,而 shutdown 置为 true 一般是有错误发生。
然后声明了三个函数:
registerCall:将参数 call 添加到 client.pending 中,并更新
client.seq。
removeCall:根据 seq,从 client.pending 中移除对应的
call,并返回。
terminateCalls:服务端或客户端发生错误时调用,将 shutdown 设置为
true,且将错误信息通知所有 pending 状态的 call。
对一个客户端端来说,接收响应、发送请求是最重要的 2
个功能。那么首先实现接收功能,接收到的响应有三种情况:
call
不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
call 存在,但服务端处理出错,即 h.Error 不为空。
call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (client *Client) receive() { var err error for err == nil { var h codec.Header if err = client.cc.ReadHeader(&h); err != nil { break } call := client.removeCall(h.Seq) switch { case call == nil : err = client.cc.ReadBody(nil ) case h.Error != "" : call.Error = fmt.Errorf(h.Error) err = client.cc.ReadBody(nil ) call.done() default : err = client.cc.ReadBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } client.terminateCalls(err) }
创建 Client 实例时,首先需要完成一开始的协议交换,即发送
Option
信息给服务端。协商好消息的编解码方式之后,再创建一个子协程调用
receive()
接收响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func NewClient (conn net.Conn, opt *Option) (*Client, error ) { f := codec.NewCodecFuncMap[opt.CodecType] if f == nil { err := fmt.Errorf("invalid codec type %s" , opt.CodecType) log.Println("rpc client: codec error:" , err) return nil , err } if err := json.NewEncoder(conn).Encode(opt); err != nil { log.Println("rpc client: options error: " , err) _ = conn.Close() return nil , err } return newClientCodec(f(conn), opt), nil }func newClientCodec (cc codec.Codec, opt *Option) *Client { client := &Client{ seq: 1 , cc: cc, opt: opt, pending: make (map [uint64 ]*Call), } go client.receive() return client }
send函数 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 func (client *Client) send(call *Call) { client.sending.Lock() defer client.sending.Unlock() seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) if call != nil { call.Error = err call.done() } } }func (client *Client) Go(serviceMethod string , args, reply interface {}, done chan *Call) *Call { if done == nil { done = make (chan *Call, 10 ) } else if cap (done) == 0 { log.Panic("rpc client: done channel is unbuffered" ) } call := &Call{ ServiceMethod: serviceMethod, Args: args, Reply: reply, Done: done, } client.send(call) return call }func (client *Client) Call(serviceMethod string , args, reply interface {}) error { call := <-client.Go(serviceMethod, args, reply, make (chan *Call, 1 )).Done return call.Error }
Go
和 Call
是客户端暴露给用户的两个 RPC
服务调用接口,Go
是一个异步接口,返回 call 实例。
Call
是对 Go
的封装,阻塞
call.Done,等待响应返回,是一个同步接口。
服务器端
首先定义了结构体 Server
,没有任何的成员字段。
实现了 Accept
方式,net.Listener
作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了
ServerConn
方法。
DefaultServer 是一个默认的 Server
实例,主要为了用户使用方便。
ServeConn
的实现就和之前讨论的通信过程紧密相关了,首先使用
json.NewDecoder
反序列化得到 Option 实例,检查 MagicNumber
和 CodeType 的值是否正确。然后根据 CodeType
得到对应的消息编解码器,接下来的处理交给 serverCodec
。
serveCodec
的过程非常简单。主要包含三个阶段
读取请求 readRequest
处理请求 handleRequest
回复请求 sendResponse
之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和
request body,因此这里使用了 for
无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:
handleRequest 使用了协程并发执行请求。
处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
尽力而为,只有在 header 解析失败时,才终止循环。
服务注册
假设客户端发过来一个请求,包含 ServiceMethod 和 Argv。
1 2 3 4 { "ServiceMethod" : "T.MethodName" "Argv" :"0101110101..." }
通过 “T.MethodName” 可以确定调用的是类型 T 的
MethodName,如果硬编码实现这个功能,很可能是这样:
1 2 3 4 5 6 7 8 9 10 11 12 switch req.ServiceMethod { case "T.MethodName" : t := new (t) reply := new (T2) var argv T1 gob.NewDecoder(conn).Decode(&argv) err := t.MethodName(argv, reply) server.sendMessage(reply, err) case "Foo.Sum" : f := new (Foo) ... }
也就是说,如果使用硬编码的方式来实现结构体与服务的映射,那么每暴露一个方法,就需要编写等量的代码。那有没有什么方式,能够将这个映射过程自动化呢?可以借助反射。
通过反射,我们能够非常容易地获取某个结构体的所有方法,并且能够通过方法,获取到该方法所有的参数类型与返回值。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func main () { var wg sync.WaitGroup typ := reflect.TypeOf(&wg) for i := 0 ; i < typ.NumMethod(); i++ { method := typ.Method(i) argv := make ([]string , 0 , method.Type.NumIn()) returns := make ([]string , 0 , method.Type.NumOut()) for j := 1 ; j < method.Type.NumIn(); j++ { argv = append (argv, method.Type.In(j).Name()) } for j := 0 ; j < method.Type.NumOut(); j++ { returns = append (returns, method.Type.Out(j).Name()) } log.Printf("func (w *%s) %s(%s) %s" , typ.Elem().Name(), method.Name, strings.Join(argv, "," ), strings.Join(returns, "," )) } }
运行的结果是:
1 2 3 func (w *WaitGroup) Add(int )func (w *WaitGroup) Done()func (w *WaitGroup) Wait()
这里通过反射实现了服务注册:
第一步 ,定义结构体 methodType:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 type methodType struct { method reflect.Method ArgType reflect.Type ReplyType reflect.Type numCalls uint64 }func (m *methodType) NumCalls() uint64 { return atomic.LoadUint64(&m.numCalls) }func (m *methodType) newArgv() reflect.Value { var argv reflect.Value if m.ArgType.Kind() == reflect.Ptr { argv = reflect.New(m.ArgType.Elem()) } else { argv = reflect.New(m.ArgType).Elem() } return argv }func (m *methodType) newReplyv() reflect.Value { replyv := reflect.New(m.ReplyType.Elem()) switch m.ReplyType.Elem().Kind() { case reflect.Map: replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem())) case reflect.Slice: replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0 , 0 )) } return replyv }
每一个 methodType 实例包含了一个方法的完整信息。包括
method:方法本身
ArgType:第一个参数的类型
ReplyType:第二个参数的类型
numCalls:后续统计方法调用次数时会用到
另外,我们还实现了 2 个方法 newArgv
和
newReplyv
,用于创建对应类型的实例。newArgv
方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。
第二步 ,定义结构体 service:
1 2 3 4 5 6 type service struct { name string typ reflect.Type rcvr reflect.Value method map [string ]*methodType }
service 的定义也是非常简洁的,name 即映射的结构体的名称,比如
T
,比如 WaitGroup
;typ 是结构体的类型;rcvr
即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0
个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。
1 2 3 4 5 6 7 8 9 10 11 12 func newService (rcvr interface {}) *service { s := new (service) s.rcvr = reflect.ValueOf(rcvr) s.name = reflect.Indirect(s.rcvr).Type().Name() s.typ = reflect.TypeOf(rcvr) if !ast.IsExported(s.name) { log.Fatalf("rpc server: %s is not a valid service name" , s.name) } s.registerMethods() return s }
超时处理
超时处理是 RPC
框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在
RPC 框架中加入超时处理的能力。
纵观整个远程调用的过程,需要客户端处理超时的地方有:
与服务端建立连接,导致的超时
发送请求到服务端,写报文导致的超时
等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:
读取客户端请求报文时,读报文导致的超时
发送响应报文时,写报文导致的超时
调用映射服务的方法时,处理报文导致的超时
GeeRPC 在 3 个地方添加了超时处理机制。分别是:
1)客户端创建连接时 2)客户端 Client.Call()
整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段)
3)服务端处理报文,即 Server.handleRequest
超时。
Q:为什么在其他地方不添加超时处理呢?有什么难度?
创建连接超时
为了实现上的简单,将超时设定放在了 Option 中。ConnectTimeout 默认值为
10s,HandleTimeout 默认值为 0,即不设限。
1 2 3 4 5 6 7 8 9 10 11 12 type Option struct { MagicNumber int CodecType codec.Type ConnectTimeout time.Duration HandleTimeout time.Duration }var DefaultOption = &Option{ MagicNumber: MagicNumber, CodecType: codec.GobType, ConnectTimeout: time.Second * 10 , }
客户端连接超时,只需要为 Dial 添加一层超时处理的外壳即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 type clientResult struct { client *Client err error }type newClientFunc func (conn net.Conn, opt *Option) (client *Client, err error )func dialTimeout (f newClientFunc, network, address string , opts ...*Option) (client *Client, err error ) { opt, err := parseOptions(opts...) if err != nil { return nil , err } conn, err := net.DialTimeout(network, address, opt.ConnectTimeout) if err != nil { return nil , err } defer func () { if err != nil { _ = conn.Close() } }() ch := make (chan clientResult) go func () { client, err := f(conn, opt) ch <- clientResult{client: client, err: err} }() if opt.ConnectTimeout == 0 { result := <-ch return result.client, result.err } select { case <-time.After(opt.ConnectTimeout): return nil , fmt.Errorf("rpc client: connect timeout: expect within %s" , opt.ConnectTimeout) case result := <-ch: return result.client, result.err } }func Dial (network, address string , opts ...*Option) (*Client, error ) { return dialTimeout(NewClient, network, address, opts...) }
重点是使用select实现超时
1 2 3 4 5 6 select { case <-time.After(opt.ConnectTimeout): return nil , fmt.Errorf("rpc client: connect timeout: expect within %s" , opt.ConnectTimeout) case result := <-ch: return result.client, result.err }
支持http协议
Web 开发中,我们经常使用 HTTP 协议中的 HEAD、GET、POST
等方式发送请求,等待响应。但 RPC 的消息格式与标准的 HTTP
协议并不兼容,在这种情况下,就需要一个协议的转换过程。HTTP
协议的 CONNECT 方法恰好提供了这个能力 ,CONNECT
一般用于代理服务。
假设浏览器与服务器之间的 HTTPS
通信都是加密的,浏览器通过代理服务器发起 HTTPS
请求时,由于请求的站点地址和端口号都是加密保存在 HTTPS
请求报文头中的,代理服务器如何知道往哪里发送请求呢?为了解决这个问题,浏览器通过
HTTP 明文形式向代理服务器发送一个 CONNECT
请求告诉代理服务器目标地址和端口,代理服务器接收到这个请求后,会在对应端口与目标站点建立一个
TCP 连接,连接建立成功后返回 HTTP 200
状态码告诉浏览器与该站点的加密通道已经完成。接下来代理服务器仅需透传浏览器和服务器之间的加密数据包即可,代理服务器无需解析
HTTPS 报文。
举一个简单例子:
浏览器向代理服务器发送 CONNECT 请求。
1 CONNECT geektutu.com:443 HTTP/1.0
代理服务器返回 HTTP 200 状态码表示连接已经建立。
1 HTTP /1 .0 200 Connection Established
之后浏览器和服务器开始 HTTPS
握手并交换加密数据,代理服务器只负责传输彼此的数据包,并不能读取具体数据内容(代理服务器也可以选择安装可信根证书解密
HTTPS 报文)。
事实上,这个过程其实是通过代理服务器将 HTTP 协议转换为 HTTPS
协议的过程。对 RPC 服务端来,需要做的是将 HTTP 协议转换为 RPC
协议,对客户端来说,需要新增通过 HTTP CONNECT 请求创建连接的逻辑。
那通信过程应该是这样的:
客户端向 RPC 服务器发送 CONNECT 请求
1 CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0
RPC 服务器返回 HTTP 200 状态码表示连接建立。
1 HTTP /1 .0 200 Connected to Gee RPC
客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N
个请求报文,服务端处理 RPC 请求并响应。
在 server.go
中新增如下的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const ( connected = "200 Connected to Gee RPC" defaultRPCPath = "/_geeprc_" defaultDebugPath = "/debug/geerpc\ y ) // ServeHTTP implements an http.Handler that answers RPC requests. func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method != " CONNECT" { w.Header().Set(" Content-Type", " text/plain; charset=utf-8 ") w.WriteHeader(http.StatusMethodNotAllowed) _, _ = io.WriteString(w, " 405 must CONNECT\n") return } conn, _, err := w.(http.Hijacker).Hijack() // 将w转换为http.Hijacker接口类型,然后调用Hijack方法返回底层的网络连接 if err != nil { log.Print(" rpc hijacking ", req.RemoteAddr, " : ", err.Error()) return } _, _ = io.WriteString(conn, " HTTP/1.0 "+connected+" \n\n") server.ServeConn(conn) } // HandleHTTP registers an HTTP handler for RPC messages on rpcPath. // It is still necessary to invoke http.Serve(), typically in a go statement. func (server *Server) HandleHTTP() { http.Handle(defaultRPCPath, server) } // HandleHTTP is a convenient approach for default server to register HTTP handlers func HandleHTTP() { DefaultServer.HandleHTTP() }
关键代码:
1 conn, _, err := w.(http.Hijacker).Hijack()
为了实现上的简单,GeeRegistry 采用 HTTP
协议提供服务,且所有的有用信息都承载在 HTTP Header 中。
Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers
承载。
Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server
承载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) { switch req.Method { case "GET" : w.Header().Set("X-Geerpc-Servers" , strings.Join(r.aliveServers(), "," )) case "POST" : addr := req.Header.Get("X-Geerpc-Server" ) if addr == "" { w.WriteHeader(http.StatusInternalServerError) return } r.putServer(addr) default : w.WriteHeader(http.StatusMethodNotAllowed) } }
负载均衡
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Discovery interface { Refresh() error Update(servers []string ) error Get(mode SelectMode) (string , error ) GetAll() ([]string , error ) }type XClient struct { d Discovery mode SelectMode opt *Option mu sync.Mutex clients map [string ]*Client }
同时封装了一个XClient,使用clients的map来存储addr到client的映射,如何获取一个client,使用discovery.Get函数获取.
我们将复用 Client 的能力封装在方法 dial
中,dial
的处理逻辑如下:
检查 xc.clients
是否有缓存的
Client,如果有,检查是否是可用状态,如果是则返回缓存的
Client,如果不可用,则从缓存中删除。
如果步骤 1) 没有返回缓存的 Client,则说明需要创建新的
Client,缓存并返回。
紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type MultiServersDiscovery struct { r *rand.Rand mu sync.RWMutex servers []string index int }func NewMultiServerDiscovery (servers []string ) *MultiServersDiscovery { d := &MultiServersDiscovery{ servers: servers, r: rand.New(rand.NewSource(time.Now().UnixNano())), } d.index = d.r.Intn(math.MaxInt32 - 1 ) return d }
r
是一个产生随机数的实例,初始化时使用时间戳设定随机数种子,避免每次产生相同的随机数序列。
index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0
开始,初始化时随机设定一个值。
首先实现一个最基础的服务发现模块
Discovery。为了与通信部分解耦,这部分的代码统一放置在 xclient
子目录下。
定义 2 个类型:
SelectMode 代表不同的负载均衡策略,简单起见,GeeRPC 仅实现 Random 和
RoundRobin 两种策略。
Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
Refresh() 从注册中心更新服务列表
Update(servers []string) 手动更新服务列表
Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
GetAll() 返回所有的服务实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package xclientimport ( "errors" "math" "math/rand" "sync" "time" )type SelectMode int const ( RandomSelect SelectMode = iota RoundRobinSelect )type Discovery interface { Refresh() error Update(servers []string ) error Get(mode SelectMode) (string , error ) GetAll() ([]string , error ) }
服务发现和注册中心
geerpc registry
注册中心的位置如上图所示。注册中心的好处在于,客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。更具体一些:
服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着。
客户端向注册中心询问,当前哪天服务是可用的,注册中心将可用的服务列表返回客户端。
客户端根据注册中心得到的服务列表,选择其中一个发起调用。
如果没有注册中心,客户端需要硬编码服务端的地址,而且没有机制保证服务端是否处于可用状态。当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有
etcd 、zookeeper 、consul ,一般比较出名的微服务或者
RPC 框架,这些主流的注册中心都是支持的。
主流的注册中心 etcd、zookeeper
等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口很多。GeeRPC
选择自己实现一个简单的支持心跳保活的注册中心。
GeeRegistry 的代码独立放置在子目录 registry 中。
首先定义 GeeRegistry 结构体,默认超时时间设置为 5
min,也就是说,任何注册的服务超过 5 min,即视为不可用状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 type GeeRegistry struct { timeout time.Duration mu sync.Mutex servers map [string ]*ServerItem }type ServerItem struct { Addr string start time.Time }const ( defaultPath = "/_geerpc_/registry" defaultTimeout = time.Minute * 5 )func New (timeout time.Duration) *GeeRegistry { return &GeeRegistry{ servers: make (map [string ]*ServerItem), timeout: timeout, } }var DefaultGeeRegister = New(defaultTimeout)
为 GeeRegistry 实现添加服务实例和返回服务列表的方法。
putServer:添加服务实例,如果服务已经存在,则更新 start。
aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (r *GeeRegistry) putServer(addr string ) { r.mu.Lock() defer r.mu.Unlock() s := r.servers[addr] if s == nil { r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()} } else { s.start = time.Now() } }func (r *GeeRegistry) aliveServers() []string { r.mu.Lock() defer r.mu.Unlock() var alive []string for addr, s := range r.servers { if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) { alive = append (alive, addr) } else { delete (r.servers, addr) } } sort.Strings(alive) return alive }
为了实现上的简单,GeeRegistry 采用 HTTP
协议提供服务,且所有的有用信息都承载在 HTTP Header 中。
Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers
承载。
Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server
承载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) { switch req.Method { case "GET" : w.Header().Set("X-Geerpc-Servers" , strings.Join(r.aliveServers(), "," )) case "POST" : addr := req.Header.Get("X-Geerpc-Server" ) if addr == "" { w.WriteHeader(http.StatusInternalServerError) return } r.putServer(addr) default : w.WriteHeader(http.StatusMethodNotAllowed) } }func (r *GeeRegistry) HandleHTTP(registryPath string ) { http.Handle(registryPath, r) log.Println("rpc registry path:" , registryPath) }func HandleHTTP () { DefaultGeeRegister.HandleHTTP(defaultPath) }
有哪些问题待解决?
面试问题
介绍一下你的项目
我做的这个Geerpc是一个从零实现 Go 语言官方的标准库
net/rpc
的项目,并在此基础上,新增了协议交换(protocol
exchange) 、注册中心(registry) 、服务发现(service
discovery) 、负载均衡(load
balance) 、超时处理(timeout
processing) 等特性。
项目的结构大致分为codec(编解码器)、client、server、registry、discovery。
抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的
Type
得到构造函数,从而创建 Codec
实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。项目可以支持多种不同的codeC但是目前只支持了Gob。
一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC
客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由
Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过
Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:
1 2 | Option {MagicNumber: xxx, CodecType: xxx} | Header {ServiceMethod ...} | Body interface{} || <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
通过go语言的反射实现服务注册,首先定义了一个methodType类型 ,包括method:方法本身、ArgType:第一个参数的类型、ReplyType:第二个参数的类型、numCalls:方法调用次数。还实现了
2 个方法 newArgv
和
newReplyv
,用于创建对应类型的实例。newArgv
方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。然后定义了service 结构体,service
的定义也是非常简洁的,name 即映射的结构体的名称,比如
T
,比如 WaitGroup
;typ 是结构体的类型;rcvr
即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0
个参数;method 是 map
类型,存储映射的结构体的所有符合条件的方法。构造函数
newService
,入参是任意需要映射为服务的结构体实例。
如何实现的超时处理?
纵观整个远程调用的过程,需要客户端处理超时的地方有:
与服务端建立连接,导致的超时
发送请求到服务端,写报文导致的超时
等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:
读取客户端请求报文时,读报文导致的超时
发送响应报文时,写报文导致的超时
调用映射服务的方法时,处理报文导致的超时
GeeRPC 在 3 个地方添加了超时处理机制。分别是:
1)客户端创建连接时 2)客户端 Client.Call()
整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段)
3)服务端处理报文,即 Server.handleRequest
超时。
为了实现上的简单,将超时设定放在了 Option 中。ConnectTimeout 默认值为
10s,HandleTimeout 默认值为 0,即不设限。
创建连接 :
客户端连接超时,只需要为 Dial
添加一层超时处理的外壳即可。在这里实现了一个超时处理的外壳
dialTimeout
,这个壳将 NewClient 作为入参,在 2
个地方添加了超时处理的机制。
将 net.Dial
替换为
net.DialTimeout
,如果连接创建超时,将返回错误。
使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果
time.After()
信道先接收到消息,则说明 NewClient
执行超时,返回错误。怎么实现?(使用select语句)
关键代码:
1 2 3 4 5 6 select { case <-time.After(opt.ConnectTimeout): return nil , fmt.Errorf("rpc client: connect timeout: expect within %s" , opt.ConnectTimeout) case result := <-ch: return result.client, result.err }
Client.Call 超时 :
Client.Call
的超时处理机制,使用 context
包实现,控制权交给用户,控制更为灵活。
1 2 3 4 5 6 7 8 9 10 11 12 func (client *Client) Call(ctx context.Context, serviceMethod string , args, reply interface {}) error { call := client.Go(serviceMethod, args, reply, make (chan *Call, 1 )) select { case <-ctx.Done(): client.removeCall(call.Seq) return errors.New("rpc client: call failed: " + ctx.Err().Error()) case call := <-call.Done: return call.Error } }
用户可以使用 context.WithTimeout
创建具备超时检测能力的
context 对象来控制。例如:
1 2 3 ctx, _ := context.WithTimeout(context.Background(), time.Second)var reply int err := client.Call(ctx, "Foo.Sum" , &Args{1 , 2 }, &reply)
服务端处理超时 :
这一部分的实现与客户端很接近,使用 time.After()
结合
select+chan
完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) { defer wg.Done() called := make (chan struct {}) sent := make (chan struct {}) go func () { err := req.svc.call(req.mtype, req.argv, req.replyv) called <- struct {}{} if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct {}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct {}{} }() if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s" , timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
这里需要确保 sendResponse
仅调用一次,因此将整个过程拆分为 called
和
sent
两个阶段,在这段代码中只会发生如下两种情况:
called 信道接收到消息,代表处理没有超时,继续执行
sendResponse。
time.After()
先于 called
接收到消息,说明处理已经超时,called 和 sent 都将被阻塞。在
case <-time.After(timeout)
处调用
sendResponse
。
如何实现服务注册?
因为是仿写的net/rpc,对 net/rpc
而言,一个函数需要能够被远程调用,需要满足如下五个条件:
the method’s type is exported. – 方法所属类型是导出的。
the method is exported. – 方式是导出的。
the method has two arguments, both exported (or builtin) types. –
两个入参,均为导出或内置类型。
the method’s second argument is a pointer. –
第二个入参必须是一个指针。
the method has return type error. – 返回值为 error 类型。
更直观一些:
1 func (t *T) MethodName(argType T1, replyType *T2) error
为了实现服务注册,首先定义了methodType :
1 2 3 4 5 6 type methodType struct { method reflect.Method ArgType reflect.Type ReplyType reflect.Type numCalls uint64 }
method:方法本身
ArgType:第一个参数的类型
ReplyType:第二个参数的类型
numCalls:后续统计方法调用次数时会用到
然后定义结构体 service:
1 2 3 4 5 6 type service struct { name string typ reflect.Type rcvr reflect.Value method map [string ]*methodType }
service 的定义也是非常简洁的,name 即映射的结构体的名称,比如
T
,比如 WaitGroup
;typ 是结构体的类型;rcvr
即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0
个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。
如何支持HTTP协议?
服务器端 :
那通信过程应该是这样的:
客户端向 RPC 服务器发送 CONNECT 请求
1 CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0
RPC 服务器返回 HTTP 200 状态码表示连接建立。
1 HTTP /1 .0 200 Connected to Gee RPC
客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N
个请求报文,服务端处理 RPC 请求并响应。
server端使用hijack函数。
客户端
如何实现服务注册
反射
如何实现负载均衡
discovery,目前设置了随机和轮询算法。
服务发现和注册中心
为什么要用注册中心?
geerpc registry
注册中心的位置如上图所示。注册中心的好处在于,客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。更具体一些:
服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着。
客户端向注册中心询问,当前哪天服务是可用的,注册中心将可用的服务列表返回客户端。
客户端根据注册中心得到的服务列表,选择其中一个发起调用。
注册中心实现
为 GeeRegistry 实现添加服务实例和返回服务列表的方法。
putServer:添加服务实例,如果服务已经存在,则更新 start。
aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
为了实现上的简单,GeeRegistry 采用 HTTP
协议提供服务,且所有的有用信息都承载在 HTTP Header 中。
Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers
承载。
Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server
承载。