NSQ学习笔记(二)

记录一下nsq的部分组件,nsq_stat,nsq_tail,nsq_to_file,nsq_to_http等

COMPONENTS

nsq_stat

Polls /stats for all the producers of the specified topic/channel and displays aggregate stats

实现很简单

读取基本的参数,然后执行statLoop函数

核心代码实现如下:

func main() {
    flag.Parse()

    ....
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

    go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)

    <-termChan
}

statLoop函数是组件的核心

func statLoop(...) {
    ci := clusterinfo.New(...)
    for i := 0; !countNum.isSet || countNum.value >= i; i++ {

        ...
        producers, err = ci.GetLookupdTopicProducers(topic, lookupdHTTPAddrs)
        ...

        if o == nil {
            o = c
            time.Sleep(interval)
            continue
        }

        // 输出status
        fmt.Printf()

        o = c
        time.Sleep(interval)
    }
    os.Exit(0)
}

nsq_tail

消费指定topic/channel ,并写入到标准输出中
应用官方client libraries,github.com/nsqio/go-nsq

核心代码实现如下:

func main() {
    flag.Parse()
    ...
    consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
    ...
    consumer.AddHandler(&TailHandler{totalMessages: *totalMessages})

    // AddHandler sets the Handler for messages 
    consumer.AddHandler(&TailHandler{totalMessages: *totalMessages})

    // 连接nsqd
    err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
    // 连接 nsqlookupd
    err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)

    // stop
    for {
        select {
        case <-consumer.StopChan:
            return
        case <-sigChan:
            consumer.Stop()
        }
    }
}

nsq_to_file

消费指定的 topci/channel 写入一个新的按行分隔的文件,可滚动和压缩文件

核心代码实现如下:

func main() {
    discoverer := newTopicDiscoverer(cfg)
    topics, err = clusterinfo.New(...)
    for _, topic := range topics {
        logger, err := newConsumerFileLogger(topic, cfg)
        discoverer.topics[topic] = logger
        go discoverer.startTopicRouter(logger)
    }
    // 同步topic中消息到文件
    discoverer.watch(...)
}

func (t *TopicDiscoverer) watch(...) {
    ticker := time.Tick(*topicPollRate)
    for {
        select {
        case <-ticker:
            if sync {
                t.syncTopics(...)
            }
        case <-t.termChan:
            ...
        case <-t.hupChan:
            t.hup()
        }
    }
}

func (t *TopicDiscoverer) syncTopics(...) {
    newTopics, err := clusterinfo.New(...).GetLookupdTopics()
    ...
    for _, topic := range newTopics {
        ...
        // 写文件的逻辑在这里
        logger, err := newConsumerFileLogger(topic, t.cfg)
        ...
        go t.startTopicRouter(logger)
    }
}

func newConsumerFileLogger(...) {
    // NewFileLogger里实现了写文件的逻辑,并支持gzip
    f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
    ...

    consumer, err := nsq.NewConsumer(topic, *channel, cfg)
    consumer.AddHandler(f)

    err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
    ...
    err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
    ...

    return &ConsumerFileLogger{
        C: consumer,
        F: f,
    }, nil
}

nsq_to_http

消费指定topic/channel ,用http去pub,代码实现与上边类似

nsq_to_nsq

通过TCP消费指定topic/channel,并在目标nsqd重新发布

核心代码实现如下:

func  main() {
    ...
    consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
    ...
    // 一系列producer
    producers := make(map[string]*nsq.Producer)
    for _, addr := range destNsqdTCPAddrs {
        producer, err := nsq.NewProducer(addr, pCfg)
        ...
        producers[addr] = producer
    }
    // 消费消息,发布消息    
    consumer.AddConcurrentHandlers(handler, len(destNsqdTCPAddrs))
}

to_nsq

从stdin中获取流,重新pub到一个nsq topic上

核心代码实现如下:

func main() {
    producers := make(map[string]*nsq.Producer)
    for _, addr := range destNsqdTCPAddrs {
        producer, err := nsq.NewProducer(addr, cfg)
        ...
    }
    // 读取stdin
    r := bufio.NewReader(os.Stdin)
    go func() {
        for {
            ...
            err = readAndPublish(r, delim, producers)
            ...
        }
    }()
}
// 读取流,发布
func readAndPublish(r *bufio.Reader, delim byte, producers map[string]*nsq.Producer) error {
    ...
    for _, producer := range producers {
        err := producer.Publish(*topic, line)
        ...
    }
    ...
}