记录一下nsq的部分组件,nsq_stat,nsq_tail,nsq_to_file,nsq_to_http等
COMPONENTS
nsq_stat
Polls /stats for all the producers of the specified topic/channel and displays aggregate stats
实现很简单
读取基本的参数,然后执行statLoop函数
核心代码实现如下:
func main() {
flag.Parse()
....
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
<-termChan
}
statLoop函数是组件的核心
func statLoop(...) {
ci := clusterinfo.New(...)
for i := 0; !countNum.isSet || countNum.value >= i; i++ {
...
producers, err = ci.GetLookupdTopicProducers(topic, lookupdHTTPAddrs)
...
if o == nil {
o = c
time.Sleep(interval)
continue
}
// 输出status
fmt.Printf()
o = c
time.Sleep(interval)
}
os.Exit(0)
}
nsq_tail
消费指定topic/channel ,并写入到标准输出中
应用官方client libraries,github.com/nsqio/go-nsq
核心代码实现如下:
func main() {
flag.Parse()
...
consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
...
consumer.AddHandler(&TailHandler{totalMessages: *totalMessages})
// AddHandler sets the Handler for messages
consumer.AddHandler(&TailHandler{totalMessages: *totalMessages})
// 连接nsqd
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
// 连接 nsqlookupd
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
// stop
for {
select {
case <-consumer.StopChan:
return
case <-sigChan:
consumer.Stop()
}
}
}
nsq_to_file
消费指定的 topci/channel 写入一个新的按行分隔的文件,可滚动和压缩文件
核心代码实现如下:
func main() {
discoverer := newTopicDiscoverer(cfg)
topics, err = clusterinfo.New(...)
for _, topic := range topics {
logger, err := newConsumerFileLogger(topic, cfg)
discoverer.topics[topic] = logger
go discoverer.startTopicRouter(logger)
}
// 同步topic中消息到文件
discoverer.watch(...)
}
func (t *TopicDiscoverer) watch(...) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(...)
}
case <-t.termChan:
...
case <-t.hupChan:
t.hup()
}
}
}
func (t *TopicDiscoverer) syncTopics(...) {
newTopics, err := clusterinfo.New(...).GetLookupdTopics()
...
for _, topic := range newTopics {
...
// 写文件的逻辑在这里
logger, err := newConsumerFileLogger(topic, t.cfg)
...
go t.startTopicRouter(logger)
}
}
func newConsumerFileLogger(...) {
// NewFileLogger里实现了写文件的逻辑,并支持gzip
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
...
consumer, err := nsq.NewConsumer(topic, *channel, cfg)
consumer.AddHandler(f)
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
...
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
...
return &ConsumerFileLogger{
C: consumer,
F: f,
}, nil
}
nsq_to_http
消费指定topic/channel ,用http去pub,代码实现与上边类似
nsq_to_nsq
通过TCP消费指定topic/channel,并在目标nsqd重新发布
核心代码实现如下:
func main() {
...
consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
...
// 一系列producer
producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
producer, err := nsq.NewProducer(addr, pCfg)
...
producers[addr] = producer
}
// 消费消息,发布消息
consumer.AddConcurrentHandlers(handler, len(destNsqdTCPAddrs))
}
to_nsq
从stdin中获取流,重新pub到一个nsq topic上
核心代码实现如下:
func main() {
producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
producer, err := nsq.NewProducer(addr, cfg)
...
}
// 读取stdin
r := bufio.NewReader(os.Stdin)
go func() {
for {
...
err = readAndPublish(r, delim, producers)
...
}
}()
}
// 读取流,发布
func readAndPublish(r *bufio.Reader, delim byte, producers map[string]*nsq.Producer) error {
...
for _, producer := range producers {
err := producer.Publish(*topic, line)
...
}
...
}