UP | HOME

golang版本grpc服务端浅析

Table of Contents

1 前言

前面分析完了客户端,在经过一段时间的颓废后,我觉得我至少应该做到有始有终,所以今天写写服务的流程.

2 注册流程

想象一个rpc的流程, 客户端指明一个函数的函数名跟参数,服务端找到这个函数,然后把参数应用到这个函数上。所以一个注册服务流程,就是告诉grpc让收到一个rpc调用的时候,如何找到我们的函数。看一个最简单的服务端程序.

type server struct{}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
      return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func main() {
      lis, err := net.Listen("tcp", port)
      if err != nil {
          log.Fatalf("failed to listen: %v", err)
      }
      s := grpc.NewServer()
      pb.RegisterGreeterServer(s, &server{})
      reflection.Register(s)
      if err := s.Serve(lis); err != nil {
          log.Fatalf("failed to serve: %v", err)
      }
}

一般的流程是先定义个proto文件,然后根据protoc工具生成go的grpc代码,这里的SayHello函数是你proto文件中声明的函数。然后你实现你声明的接口,然后再注册到server中,就可以供客户端调用了。 在grpc注册的过程中有几个概念:

  1. Server
  2. grpc Service
  3. ServiceDesc
  4. 自己的service

自己的service很简单,就是上面的server结构体,这个结构实现了相应的interface。servicedesc相当于是一个根据proto文件来描述你server需要实现的功能的结构。grpc service, 你proto文件可能会声明多个service,你的Server也可以注册多个service。Server 原则上来说只需要一个server,一个server下有多个service,一个service下有多个方法。 我们看下RegisterGreeterServer这个方法

//描述一个service具有哪些方法
var _Greeter_serviceDesc = grpc.ServiceDesc{
  ServiceName: "helloworld.Greeter",
  HandlerType: (*GreeterServer)(nil),
  Methods: []grpc.MethodDesc{
      {
          MethodName: "SayHello",
          Handler:    _Greeter_SayHello_Handler,
      },
  },
  Streams:  []grpc.StreamDesc{},
  Metadata: "helloworld.proto",
}
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
  s.RegisterService(&_Greeter_serviceDesc, srv)
}
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
      ht := reflect.TypeOf(sd.HandlerType).Elem()
      st := reflect.TypeOf(ss)
      //判断是否实现相应的接口
      if !st.Implements(ht) {
          grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
      }
      s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
      s.mu.Lock()
      defer s.mu.Unlock()
      s.printf("RegisterService(%q)", sd.ServiceName)
      if _, ok := s.m[sd.ServiceName]; ok {
          grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
      }
      //初始化一个服务
      srv := &service{
          server: ss,
          md:     make(map[string]*MethodDesc),
          sd:     make(map[string]*StreamDesc),
          mdata:  sd.Metadata,
      }
      //根据名字映射方法
      for i := range sd.Methods {
          d := &sd.Methods[i]
          srv.md[d.MethodName] = d
      }
      for i := range sd.Streams {
          d := &sd.Streams[i]
          srv.sd[d.StreamName] = d
      }
      把服务注册到server中
      s.m[sd.ServiceName] = srv
}

其实根据这个注册名字,我们已经能够大概猜到当客户端的请求到来时如何找到我们的方法。首先根据service name找到相应的service再根据方法名找到相应的方法,然后拿到我们自定义的结构调用这个方法。

3 调用流程

注册流程看完了,接下来再来看下让一个请求到达服务端的时候是如何处理。 在Server中的Serve方法中,服务端通过监听请求的到来,通过for循环不断接受到来的连接。

//去掉一些错误处理的代码后,十分简洁
  func (s *Server) Serve(lis net.Listener) error {
    ..........

    for {
      rawConn, err := lis.Accept()
      .............

      // Start a new goroutine to deal with rawConn
      // so we don't stall this Accept loop goroutine.
      go s.handleRawConn(rawConn)
    }
  }
//最后通过http2创建一个stream,来收取消息

func (s *Server) serveStreams(st transport.ServerTransport) {
  defer s.removeConn(st)
  defer st.Close()
  var wg sync.WaitGroup
  st.HandleStreams(func(stream *transport.Stream) {
      wg.Add(1)
      go func() {
          defer wg.Done()
          s.handleStream(st, stream, s.traceInfo(st, stream))
      }()
  }, func(ctx context.Context, method string) context.Context {
      if !EnableTracing {
          return ctx
      }
      tr := trace.New("grpc.Recv."+methodFamily(method), method)
      return trace.NewContext(ctx, tr)
  })
  wg.Wait()
}

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  sm := stream.Method()
  if sm != "" && sm[0] == '/' {
      sm = sm[1:]
  }
  pos := strings.LastIndex(sm, "/")

  ............
      //从这里到服务和注册的方法
  service := sm[:pos]
  method := sm[pos+1:]
  srv, ok := s.m[service]

  .................

  // Unary RPC or Streaming RPC?
  if md, ok := srv.md[method]; ok {
      s.processUnaryRPC(t, stream, srv, md, trInfo)
      return
  }
  if sd, ok := srv.sd[method]; ok {
      s.processStreamingRPC(t, stream, srv, sd, trInfo)
      return
  }

  ...........

}

//接下来是最主要的处理函数
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
      if stats.On() {
          begin := &stats.Begin{
              BeginTime: time.Now(),
          }
          stats.HandleRPC(stream.Context(), begin)
      }
      defer func() {
          if stats.On() {
              end := &stats.End{
                  EndTime: time.Now(),
              }
              if err != nil && err != io.EOF {
                  end.Error = toRPCErr(err)
              }
              stats.HandleRPC(stream.Context(), end)
          }
      }()
      if trInfo != nil {
          defer trInfo.tr.Finish()
          trInfo.firstLine.client = false
          trInfo.tr.LazyLog(&trInfo.firstLine, false)
          defer func() {
              if err != nil && err != io.EOF {
                  trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
                  trInfo.tr.SetError()
              }
          }()
      }
      if s.opts.cp != nil {
          // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
          stream.SetSendCompress(s.opts.cp.Type())
      }
      p := &parser{r: stream}
      for {
          pf, req, err := p.recvMsg(s.opts.maxMsgSize)
          if err == io.EOF {
              // The entire stream is done (for unary RPC only).
              return err
          }

          .............

          if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
              switch err := err.(type) {
              case *rpcError:
                  if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
                      grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
                  }
                  return err
              default:
                  if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
                      grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
                  }
                  // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
              }
          }
          var inPayload *stats.InPayload
          if stats.On() {
              inPayload = &stats.InPayload{
                  RecvTime: time.Now(),
              }
          }
          statusCode := codes.OK
          statusDesc := ""
          //df函数主要用于读取http2请求并反序列化到v中(相当于一个解包的操作)
          df := func(v interface{}) error {
              if inPayload != nil {
                  inPayload.WireLength = len(req)
              }
              if pf == compressionMade {
                  var err error
                  req, err = s.opts.dc.Do(bytes.NewReader(req))
                  if err != nil {
                      if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
                          grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
                      }
                      return Errorf(codes.Internal, err.Error())
                  }
              }
              if len(req) > s.opts.maxMsgSize {
                  // TODO: Revisit the error code. Currently keep it consistent with
                  // java implementation.
                  statusCode = codes.Internal
                  statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
              }
              if err := s.opts.codec.Unmarshal(req, v); err != nil {
                  return err
              }
              if inPayload != nil {
                  inPayload.Payload = v
                  inPayload.Data = req
                  inPayload.Length = len(req)
                  stats.HandleRPC(stream.Context(), inPayload)
              }
              if trInfo != nil {
                  trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
              }
              return nil
          }
          //这些我们注册的函数
          reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)

          .....................

          opts := &transport.Options{
              Last:  true,
              Delay: false,
          }
          //发结果发送出去
          if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
              switch err := err.(type) {
              case transport.ConnectionError:
                  // Nothing to do here.
              case transport.StreamError:
                  statusCode = err.Code
                  statusDesc = err.Desc
              default:
                  statusCode = codes.Unknown
                  statusDesc = err.Error()
              }
              return err
          }
          if trInfo != nil {
              trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
          }
          errWrite := t.WriteStatus(stream, statusCode, statusDesc)
          if statusCode != codes.OK {
              return Errorf(statusCode, statusDesc)
          }
          return errWrite
      }
}

grpc服务端的基本流程就是这样了,我只是看的一个基本流程,再没遇到问题时,基本上不会再看grpc源码了。目前公司项目中在使用grpc,等有了坑,我再来总结吧

4 总结

grpc中有很多我比较喜欢的点,一个是grpcsever的配置通过传递函数来修改,这样又不用取回之前的设置,又可以只修改其中一个值,比较精巧,-0- 原谅我见识不多。还有就是错误的处理,context的使用,trace的使用。 不得不吐槽一下,golang的代码看起来真不够美,虽然简单,但是写起来不够优美,源码的错误处理看起来也是乱糟糟的。