nsq源码阅读(一)
Table of Contents
1 nsq是什么
nsq是一个使用纯go实现的消息队列,具有以下特点:
- 支持分布式拓扑
- 支持水平扩展
- 低延时消息发送
- 主要操作在内存中,达到一定条件落到磁盘
nsq保证消息至少被消费一次,不保证重复消费(需要客户端自己保持操作幂等),不保证顺序。根据官方的测试,单机情况下每秒pub可以达到40w,sub可以达到10w。(ps: 由于nsq是为了分布式而设计,作者建议不要太在意单机的执行速度)
2 nsq的设计
nsq主要包含三个二进制程序: nsqd, nsqlookupd, nsqadmin。nsqd就是主要的用于消息队列的主程序,nsqlookupd是通过topic来寻找nsqd的地址,相当于一个服务发现机制,只在客户端订阅的时候会查找一次。nsqadmin主要提供一个后台管理界面,方便用户查看与统计。 nsqlookupd的实现可以说是很简单,主要是管理topic,channel跟nsqd的关系。主要看下nsqd的实现:
2.1 nsqd如何保证消息不丢
当我们把消息发送给消息队列后如何保证消息不丢失呢,一般做法是持久化,把数据落地磁盘,但是这样当磁盘坏掉的时候也可能丢失,这时候就需要写多个机器。nsq有个选项–mem-queue-size来指示内存消息队列的大小,大于这个大小的消息会被落到磁盘。而磁盘是单文件写入。不支持写入多个节点。总的来说nsqd处理比较简单。
func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() //如果消息满了写入backend, 使用go-diskqueue 写入文件。 err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
官方也介绍一种使用nsqtofile来备份消息,主要是利用一个topic的message会被多个channel消费。这种方式我个人觉得有点不大好,一般来说用恢复数据还比较好。但是用于软件中的错误容忍感觉不大妥当。这就是一个数据备份的问题。
2.2 nsqd如何水平扩展
nsqd pub消息的时候都会要求指定nsqd服务。nsqd并没有提供负载均衡的方式。需要手动确定使用的nsqd服务地址。也就是说需要客户端自己负载均衡。每一个topic对应唯一的nsqd,consumer端倒是可以通过nsqlookupd来进行服务发现。
func main(){ cfg := nsq.NewConfig() p, _ := nsq.NewProducer("localhost:addr", cfg) p.Publish("hello world", []byte("hello world")) }
2.3 nsq支持延迟消息
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error { absTs := time.Now().Add(timeout).UnixNano() item := &pqueue.Item{Value: msg, Priority: absTs} err := c.pushDeferredMessage(item) if err != nil { return err } c.addToDeferredPQ(item) return nil }
延迟消息放在内存中,所以不能存放大量的延迟消息,并且长时间的延迟消息也需要注意。维护了一个简单的优先队列。每一次会拿到所有的channel,进行遍历查看是否有需要延迟发送的消息。
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) // 通过配置的时间间隔来调整tick workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: //对每个channel来更新消息 channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }