Go Thrift RPC

2018-02-09 12:26:29来源:https://www.jianshu.com/p/3cee4074302a作者:carlSQ人点击

分享


引言

年底了,最近也闲了,顺便给自己接近一年的后端生涯,做一些知识总结。


Thrift Server 架构流程图


image.png
Thrift Server

Go 里面开启一个TCP Server 服务很简单,这得益于go net 库对底层socket的封装。一个典型的Go Server端程序大致如下:


func handleConn(conn net.Conn) {
defer conn.Close()
defer func() {
if e := recover(); e != nil {
fmt.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
for {
// read request data from the connection do something
// write response data to the connection
}
}
func main() {
l, err := net.Listen("tcp", ":16888")
if err != nil {
fmt.Println("listen error:", err)
return
}
for {
conn, err := l.Accept()
if err != nil {
fmt.Println("accept error:", err)
break
}
go handleConn(conn)
}
}

而用Thrift库 RPC Server端程序大致如下:


    handler := &Sharpshooter.Sharpshooter{}
processor := player.NewPlayerServerProcessor(handler)
serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")
if err != nil {
log.Fatalln("Error:", err)
}
server := thrift.NewTSimpleServer2(processor, serverTransport)
err = server.Serve()
if err != nil {
log.Fatalln("Error:", err)
}

上面的代码到底做了啥?下面我们具体分析。
其中Sharpshooter 提供如下接口供远程调用,对应上面架构流程图的Handler


type PlayerServer interface {
Ping() (r bool, err error)
UploadMap(gamemap [][]int32) (err error)
UploadParamters(arguments *Args_) (err error)

AssignTanks(tanks []int32) (err error)

LatestState(state *GameState) (err error)

GetNewOrders() (r []*Order, err error)
}

而在Thrift 源码中,定义了一个 TServer interface如下:


type TServer interface {
ProcessorFactory() TProcessorFactory
ServerTransport() TServerTransport
InputTransportFactory() TTransportFactory
OutputTransportFactory() TTransportFactory
InputProtocolFactory() TProtocolFactory
OutputProtocolFactory() TProtocolFactory
// Starts the server
Serve() error
// Stops the server. This is optional on a per-implementation basis. Not
// all servers are required to be cleanly stoppable.
Stop() error
}

满足以上的接口都可以认为是一个Server(前提是相应方法的实现要正确)。
TProcessorFactory 对应上面架构流程图的Processor 一个连接处理器。
TServerTransport 对应流程图Socket。
TTransportFactory 对应流程图Transport。
TProtocolFactory 对应流程图Protocol。


Thrift 定义了一个TSimpleServer 实现了个简单的Server


type TSimpleServer struct {
quit chan struct{}
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
sync.WaitGroup
}

TSimpleServer 的Serve() error 函数实现


func (p *TSimpleServer) Listen() error {
return p.serverTransport.Listen()
}
func (p *TSimpleServer) AcceptLoop() error {
for {
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
p.Add(1)
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}

其实就是通过serverTransport监听端口号,接受连接, processRequests 方法处理连接。


TServerSocket

上面简单的Thrift Server代码实例 serverTransport 初始化如下:
serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")


type TServerSocket struct {
listener net.Listener
addr net.Addr
clientTimeout time.Duration
// Protects the interrupted value to make it thread safe.
mu sync.RWMutex
interrupted bool
}

其实就是一个TServerSocket 实例,实现了 TServerTransport interface ,监听端口号,接受连接,具体实现代码就是调用net 库,监听端口号,接受连接。


type TServerTransport interface {
Listen() error
Accept() (TTransport, error)
Close() error
// Optional method implementation. This signals to the server transport
// that it should break out of any accept() or listen() that it is currently
// blocked on. This method, if implemented, MUST be thread safe, as it may
// be called from a different thread context than the other TServerTransport
// methods.
Interrupt() error
}

这里的TServerSocket 不是 Posix标准里的socket,而是Thrift 对Go的net库网络操作的抽象用于Server端。


Processor

processor 处理接受的每个连接,看看 processRequests()函数的具体实现:


func (p *TSimpleServer) processRequests(client TTransport) error {
defer p.Done()
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
select {
case <-p.quit:
return nil
default:
}
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}

方法主要获取处理器processor,获取输入io inputTransport, 输出io outputTransport, 输入数据协议inputProtocol,输出数据协议outputProtocol,最后由处理器 processor.Process(inputProtocol, outputProtocol)处理输入输出。
上面的例子中构造了一个 processor := player.NewPlayerServerProcessor(handler) 处理器,看看代码具体实现:


func NewPlayerServerProcessor(handler PlayerServer) *PlayerServerProcessor {
self14 := &PlayerServerProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
self14.processorMap["ping"] = &playerServerProcessorPing{handler: handler}
self14.processorMap["uploadMap"] = &playerServerProcessorUploadMap{handler: handler}
self14.processorMap["uploadParamters"] = &playerServerProcessorUploadParamters{handler: handler}
self14.processorMap["assignTanks"] = &playerServerProcessorAssignTanks{handler: handler}
self14.processorMap["latestState"] = &playerServerProcessorLatestState{handler: handler}
self14.processorMap["getNewOrders"] = &playerServerProcessorGetNewOrders{handler: handler}
return self14
}

构造processor时主要给RPC调用接口做了接口名到接口处理一个key-value 映射。
接下来看看processor的Process(inputProtocol, outputProtocol)函数。


func (p *PlayerServerProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x15 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x15.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x15
}

只要是从输入数据协议里面读取rpc 接口名字,和这次rpc 调用id,通过GetProcessorFunction获取对接接口的处理单元,处理这次请求调用。


func (p *PlayerServerProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
processor, ok = p.processorMap[key]
return processor, ok
}

以getNewOrders 接口为例,


type playerServerProcessorGetNewOrders struct {
handler PlayerServer
}
func (p *playerServerProcessorGetNewOrders) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := PlayerServerGetNewOrdersArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := PlayerServerGetNewOrdersResult{}
var retval []*Order
var err2 error
if retval, err2 = p.handler.GetNewOrders(); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing getNewOrders: "+err2.Error())
oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("getNewOrders", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}

从输入数据协议里面读取数据,反序列化构造getNewOrders 的接口参数,再调用用户业务层实现的p.handler.GetNewOrders()接口函数,将返回值通过输出数据协议序列化,通过TServerTransport 返回给客户端。


Transport

Transport 类似java里面的各种io,可以通过装饰器从一种io到另一种,到现在笔者还没搞懂java 的各种io 。


// Encapsulates the I/O layer
type TTransport interface {
io.ReadWriteCloser
Flusher
ReadSizeProvider
// Opens the transport for communication
Open() error
// Returns true if the transport is open
IsOpen() bool
}

Thrift 里面提供了, TBufferedTransport,TFramedTransport,TMemoryBuffer,RichTransport,也可以自己扩展自己需要的Transport。


Protocol

Protocol 主要用来数据的序列化和反序列化。Thrift 里面提供了TBinaryProtocol,TCompactProtocol,TDebugProtocol,TJSONProtocol等,具体格式就不细说,笔者这篇博客简单描述了下TBinaryProtocol。



type TProtocol interface {
WriteMessageBegin(name string, typeId TMessageType, seqid int32) error
WriteMessageEnd() error
WriteStructBegin(name string) error
WriteStructEnd() error
WriteFieldBegin(name string, typeId TType, id int16) error
WriteFieldEnd() error
WriteFieldStop() error
WriteMapBegin(keyType TType, valueType TType, size int) error
WriteMapEnd() error
WriteListBegin(elemType TType, size int) error
WriteListEnd() error
WriteSetBegin(elemType TType, size int) error
WriteSetEnd() error
WriteBool(value bool) error
WriteByte(value int8) error
WriteI16(value int16) error
WriteI32(value int32) error
WriteI64(value int64) error
WriteDouble(value float64) error
WriteString(value string) error
WriteBinary(value []byte) error
ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error)
ReadMessageEnd() error
ReadStructBegin() (name string, err error)
ReadStructEnd() error
ReadFieldBegin() (name string, typeId TType, id int16, err error)
ReadFieldEnd() error
ReadMapBegin() (keyType TType, valueType TType, size int, err error)
ReadMapEnd() error
ReadListBegin() (elemType TType, size int, err error)
ReadListEnd() error
ReadSetBegin() (elemType TType, size int, err error)
ReadSetEnd() error
ReadBool() (value bool, err error)
ReadByte() (value int8, err error)
ReadI16() (value int16, err error)
ReadI32() (value int32, err error)
ReadI64() (value int64, err error)
ReadDouble() (value float64, err error)
ReadString() (value string, err error)
ReadBinary() (value []byte, err error)
Skip(fieldType TType) (err error)
Flush() (err error)
Transport() TTransport
}

Thrift Client

Client 端就简单多了,与服务端建立一个连接,再在连接上构建TTransport,选择与服务端对应的in Protocol 和 out Protocol。发送数据。以 getNewOrders为例。


func (p *PlayerServerClient) GetNewOrders() (r []*Order, err error) {
if err = p.sendGetNewOrders(); err != nil {
return
}
return p.recvGetNewOrders()
}
func (p *PlayerServerClient) sendGetNewOrders() (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
p.SeqId++
if err = oprot.WriteMessageBegin("getNewOrders", thrift.CALL, p.SeqId); err != nil {
return
}
args := PlayerServerGetNewOrdersArgs{}
if err = args.Write(oprot); err != nil {
return
}
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
func (p *PlayerServerClient) recvGetNewOrders() (value []*Order, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "getNewOrders" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "getNewOrders failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "getNewOrders failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error12 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error13 error
error13, err = error12.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error13
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "getNewOrders failed: invalid message type")
return
}
result := PlayerServerGetNewOrdersResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}

sendGetNewOrders 会将远程接口名,参数序列化之后发到Server. recvGetNewOrders 接受Server返回的数据反序列化。调用完成。


总结

先不评价Thrift 在众多RPC框架中怎么样(笔者用过的RPC太少)。但Thrift整个设计面向接口,层次清楚,很易于扩展,维护,可以学习下。








微信扫一扫

第七城市微信公众平台