golang版本grpc服务端浅析
1 前言
前面分析完了客户端,在经过一段时间的颓废后,我觉得我至少应该做到有始有终,所以今天写写服务的流程.
2 注册流程
想象一个rpc的流程, 客户端指明一个函数的函数名跟参数,服务端找到这个函数,然后把参数应用到这个函数上。所以一个注册服务流程,就是告诉grpc让收到一个rpc调用的时候,如何找到我们的函数。看一个最简单的服务端程序.
type server struct{} func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { return &pb.HelloReply{Message: "Hello " + in.Name}, nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
一般的流程是先定义个proto文件,然后根据protoc工具生成go的grpc代码,这里的SayHello函数是你proto文件中声明的函数。然后你实现你声明的接口,然后再注册到server中,就可以供客户端调用了。 在grpc注册的过程中有几个概念:
- Server
- grpc Service
- ServiceDesc
- 自己的service
自己的service很简单,就是上面的server结构体,这个结构实现了相应的interface。servicedesc相当于是一个根据proto文件来描述你server需要实现的功能的结构。grpc service, 你proto文件可能会声明多个service,你的Server也可以注册多个service。Server 原则上来说只需要一个server,一个server下有多个service,一个service下有多个方法。 我们看下RegisterGreeterServer这个方法
//描述一个service具有哪些方法 var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "helloworld.proto", } func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) //判断是否实现相应的接口 if !st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } s.register(sd, ss) } func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if _, ok := s.m[sd.ServiceName]; ok { grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } //初始化一个服务 srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } //根据名字映射方法 for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } 把服务注册到server中 s.m[sd.ServiceName] = srv }
其实根据这个注册名字,我们已经能够大概猜到当客户端的请求到来时如何找到我们的方法。首先根据service name找到相应的service再根据方法名找到相应的方法,然后拿到我们自定义的结构调用这个方法。
3 调用流程
注册流程看完了,接下来再来看下让一个请求到达服务端的时候是如何处理。 在Server中的Serve方法中,服务端通过监听请求的到来,通过for循环不断接受到来的连接。
//去掉一些错误处理的代码后,十分简洁 func (s *Server) Serve(lis net.Listener) error { .......... for { rawConn, err := lis.Accept() ............. // Start a new goroutine to deal with rawConn // so we don't stall this Accept loop goroutine. go s.handleRawConn(rawConn) } } //最后通过http2创建一个stream,来收取消息 func (s *Server) serveStreams(st transport.ServerTransport) { defer s.removeConn(st) defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait() } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } pos := strings.LastIndex(sm, "/") ............ //从这里到服务和注册的方法 service := sm[:pos] method := sm[pos+1:] srv, ok := s.m[service] ................. // Unary RPC or Streaming RPC? if md, ok := srv.md[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.sd[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } ........... } //接下来是最主要的处理函数 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { if stats.On() { begin := &stats.Begin{ BeginTime: time.Now(), } stats.HandleRPC(stream.Context(), begin) } defer func() { if stats.On() { end := &stats.End{ EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) } stats.HandleRPC(stream.Context(), end) } }() if trInfo != nil { defer trInfo.tr.Finish() trInfo.firstLine.client = false trInfo.tr.LazyLog(&trInfo.firstLine, false) defer func() { if err != nil && err != io.EOF { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } }() } if s.opts.cp != nil { // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. stream.SetSendCompress(s.opts.cp.Type()) } p := &parser{r: stream} for { pf, req, err := p.recvMsg(s.opts.maxMsgSize) if err == io.EOF { // The entire stream is done (for unary RPC only). return err } ............. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { switch err := err.(type) { case *rpcError: if e := t.WriteStatus(stream, err.code, err.desc); e != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } return err default: if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } // TODO checkRecvPayload always return RPC error. Add a return here if necessary. } } var inPayload *stats.InPayload if stats.On() { inPayload = &stats.InPayload{ RecvTime: time.Now(), } } statusCode := codes.OK statusDesc := "" //df函数主要用于读取http2请求并反序列化到v中(相当于一个解包的操作) df := func(v interface{}) error { if inPayload != nil { inPayload.WireLength = len(req) } if pf == compressionMade { var err error req, err = s.opts.dc.Do(bytes.NewReader(req)) if err != nil { if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) } return Errorf(codes.Internal, err.Error()) } } if len(req) > s.opts.maxMsgSize { // TODO: Revisit the error code. Currently keep it consistent with // java implementation. statusCode = codes.Internal statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize) } if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } if inPayload != nil { inPayload.Payload = v inPayload.Data = req inPayload.Length = len(req) stats.HandleRPC(stream.Context(), inPayload) } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } return nil } //这些我们注册的函数 reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) ..................... opts := &transport.Options{ Last: true, Delay: false, } //发结果发送出去 if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil { switch err := err.(type) { case transport.ConnectionError: // Nothing to do here. case transport.StreamError: statusCode = err.Code statusDesc = err.Desc default: statusCode = codes.Unknown statusDesc = err.Error() } return err } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) } errWrite := t.WriteStatus(stream, statusCode, statusDesc) if statusCode != codes.OK { return Errorf(statusCode, statusDesc) } return errWrite } }
grpc服务端的基本流程就是这样了,我只是看的一个基本流程,再没遇到问题时,基本上不会再看grpc源码了。目前公司项目中在使用grpc,等有了坑,我再来总结吧
4 总结
grpc中有很多我比较喜欢的点,一个是grpcsever的配置通过传递函数来修改,这样又不用取回之前的设置,又可以只修改其中一个值,比较精巧,-0- 原谅我见识不多。还有就是错误的处理,context的使用,trace的使用。 不得不吐槽一下,golang的代码看起来真不够美,虽然简单,但是写起来不够优美,源码的错误处理看起来也是乱糟糟的。