UP | HOME

GRPC golang版源码分析之客户端(二)

Table of Contents

1 前言

前面一篇文章分析了一个grpc call的大致调用流程,顺着源码走了一遍,但是grpc中有一些特性并没有进行分析,这篇blog主要就是分析负载均衡这个特性。负载均衡可以让你在使用grpc调用方法是连接不同的服务器。在启动的时候,grpc client会建立对所有服务器的连接,然后通过轮询算法拿不同的服务器。

2 负载均衡

负载均衡是在客户端实现的,通过在初始化一个grpc客户端传入一个负载均衡器,默认是通过轮询算法每次调用时切换调用的服务器,由于是在客户端做的,所以只能让服务器平摊压力,并不能实时的根据服务器的状态来进行负载均衡。比如某一刻所有的grpc客户端都轮到了同一个服务器(这里只是举例)。

//在clientconn.go的DialContext函数中
go func() {
          var addrs []Address
          if cc.dopts.balancer == nil {
              // Connect to target directly if balancer is nil.
    //如果没有设置负载均衡器,则直接连接
              addrs = append(addrs, Address{Addr: target})
          } else {
              var credsClone credentials.TransportCredentials
              if creds != nil {
                  credsClone = creds.Clone()
              }
              config := BalancerConfig{
                  DialCreds: credsClone,
              }
    //启动一个负载均衡器,start函数会启动一个watch监听地址的变化,而Notify返回一个通道,在每次服务器地址变化后的最新地址信息.
              if err := cc.dopts.balancer.Start(target, config); err != nil {
                  waitC <- err
                  return
              }
              ch := cc.dopts.balancer.Notify()
              if ch == nil {
                  // There is no name resolver installed.
                  addrs = append(addrs, Address{Addr: target})
              } else {
                  addrs, ok = <-ch
                  if !ok || len(addrs) == 0 {
                      waitC <- errNoAddr
                      return
                  }
              }
          }
  //对每一个地址进行连接, 因为这是客户端启动时,所以需要对所有地址操作.
          for _, a := range addrs {
              if err := cc.resetAddrConn(a, false, nil); err != nil {
                  waitC <- err
                  return
              }
          }
          close(waitC)
      }()
..............
//省略一些代码
if ok {
  //这里开启一个监听goroutine,主要是监听服务器地址变化并对新的地址建立连接,对老的地址关闭连接
          go cc.lbWatcher()
      }

上面的代码大致是说明了一个负载均衡器是怎么使用的,下面看看负载均衡具体的结构:

type Balancer interface {
  //启动一个负载均衡,内部会启动一个名称服务器的watcher,不断监听地址的变化
      Start(target string, config BalancerConfig) error
      Up(addr Address) (down func(error))
  //得到下一次连接的地址
      Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
  //从服务器得到更新的地址,这个地址是更新后的所有地址
      Notify() <-chan []Address
  //关闭负载均衡器
      Close() error
}

负载均衡器有个默认实现在balancer.go中,使用的轮询算法。这个实现中包含了一个名字服务器的服务,我们可以通过实现一个名字服务器的接口,然后封装到这个负载均衡器中,这样就不需要自己实现整个负载均衡器。名字服务器的接口如下:

type Update struct {
      // Op indicates the operation of the update.
      Op Operation
      // Addr is the updated address. It is empty string if there is no address update.
      Addr string
      // Metadata is the updated metadata. It is nil if there is no metadata update.
      // Metadata is not required for a custom naming implementation.
      Metadata interface{}
}
// Resolver creates a Watcher for a target to track its resolution changes.
type Resolver interface {
      // Resolve creates a Watcher for target.
  //通过一个名字得到一个watcher,监听服务器地址变化。
      Resolve(target string) (Watcher, error)
}
// Watcher watches for the updates on the specified target.
type Watcher interface {
      // Next blocks until an update or error happens. It may return one or more
      // updates. The first call should get the full set of the results. It should
      // return an error if and only if Watcher cannot recover.
  // 得到下次更新的地址
      Next() ([]*Update, error)
      // Close closes the Watcher.
      Close()
}

负载均衡器的全貌大概就这些了,其中还有些细节我没有看到,目前来说我也不希望过早深入这些细节。所以负载均衡就到这里啦。

3 相关链接