UP | HOME

nsq源码阅读(一)

Table of Contents

1 nsq是什么

nsq是一个使用纯go实现的消息队列,具有以下特点:

  • 支持分布式拓扑
  • 支持水平扩展
  • 低延时消息发送
  • 主要操作在内存中,达到一定条件落到磁盘

nsq保证消息至少被消费一次,不保证重复消费(需要客户端自己保持操作幂等),不保证顺序。根据官方的测试,单机情况下每秒pub可以达到40w,sub可以达到10w。(ps: 由于nsq是为了分布式而设计,作者建议不要太在意单机的执行速度)

2 nsq的设计

nsq主要包含三个二进制程序: nsqd, nsqlookupd, nsqadmin。nsqd就是主要的用于消息队列的主程序,nsqlookupd是通过topic来寻找nsqd的地址,相当于一个服务发现机制,只在客户端订阅的时候会查找一次。nsqadmin主要提供一个后台管理界面,方便用户查看与统计。 nsqlookupd的实现可以说是很简单,主要是管理topic,channel跟nsqd的关系。主要看下nsqd的实现:

2.1 nsqd如何保证消息不丢

当我们把消息发送给消息队列后如何保证消息不丢失呢,一般做法是持久化,把数据落地磁盘,但是这样当磁盘坏掉的时候也可能丢失,这时候就需要写多个机器。nsq有个选项–mem-queue-size来指示内存消息队列的大小,大于这个大小的消息会被落到磁盘。而磁盘是单文件写入。不支持写入多个节点。总的来说nsqd处理比较简单。

func (t *Topic) put(m *Message) error {
  select {
  case t.memoryMsgChan <- m:
  default:
    b := bufferPoolGet()
    //如果消息满了写入backend, 使用go-diskqueue 写入文件。
    err := writeMessageToBackend(b, m, t.backend)
    bufferPoolPut(b)
    t.ctx.nsqd.SetHealth(err)
    if err != nil {
      t.ctx.nsqd.logf(LOG_ERROR,
    "TOPIC(%s) ERROR: failed to write message to backend - %s",
    t.name, err)
      return err
    }
  }
  return nil
}

官方也介绍一种使用nsqtofile来备份消息,主要是利用一个topic的message会被多个channel消费。这种方式我个人觉得有点不大好,一般来说用恢复数据还比较好。但是用于软件中的错误容忍感觉不大妥当。这就是一个数据备份的问题。

2.2 nsqd如何水平扩展

nsqd pub消息的时候都会要求指定nsqd服务。nsqd并没有提供负载均衡的方式。需要手动确定使用的nsqd服务地址。也就是说需要客户端自己负载均衡。每一个topic对应唯一的nsqd,consumer端倒是可以通过nsqlookupd来进行服务发现。

func main(){
  cfg := nsq.NewConfig()
  p, _ := nsq.NewProducer("localhost:addr", cfg)
  p.Publish("hello world", []byte("hello world"))
}

2.3 nsq支持延迟消息

  func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
    absTs := time.Now().Add(timeout).UnixNano()
    item := &pqueue.Item{Value: msg, Priority: absTs}
    err := c.pushDeferredMessage(item)
    if err != nil {
        return err
    }
    c.addToDeferredPQ(item)
    return nil
} 

延迟消息放在内存中,所以不能存放大量的延迟消息,并且长时间的延迟消息也需要注意。维护了一个简单的优先队列。每一次会拿到所有的channel,进行遍历查看是否有需要延迟发送的消息。

func (n *NSQD) queueScanLoop() {
  workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
  responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
  closeCh := make(chan int)
  // 通过配置的时间间隔来调整tick
  workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
  refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

  channels := n.channels()
  n.resizePool(len(channels), workCh, responseCh, closeCh)

  for {
    select {
    case <-workTicker.C:
      if len(channels) == 0 {
    continue
      }
    case <-refreshTicker.C:
      //对每个channel来更新消息
      channels = n.channels()
      n.resizePool(len(channels), workCh, responseCh, closeCh)
      continue
    case <-n.exitChan:
      goto exit
    }

    num := n.getOpts().QueueScanSelectionCount
    if num > len(channels) {
      num = len(channels)
    }

  loop:
    for _, i := range util.UniqRands(num, len(channels)) {
      workCh <- channels[i]
    }

    numDirty := 0
    for i := 0; i < num; i++ {
      if <-responseCh {
    numDirty++
      }
    }

    if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
      goto loop
    }
  }

exit:
  n.logf(LOG_INFO, "QUEUESCAN: closing")
  close(closeCh)
  workTicker.Stop()
  refreshTicker.Stop()
}