NSQ代码阅读
NSQ分三大模块:nsqd、nsqlookupd、nsqadmin,从哪个开始着手很重要。
本人阅读源码习惯从功能少的、尽量闭环的程序着手。纵观上面三个模块,最提纲挈领,最有桥梁作用的应该是nsqlookupd,但nsqlookupd仍然有很多的功能开发,在没有搞清楚其功能之前,不适合直接阅读。因此,我把第一个阅读的源码定为nsq-to-file。
nsq-to-file
注意:本地源码地址为:github.com/nsqio/nsq/apps/nsq_to_file
nsq-to-file作为NSQ的一种客户端,其实是在go-nsq的基础上做的功能开发。
nsq-to-file功能是订阅消费指定的话题(topic)/通道(channel),并写到文件中,对输出文件进行有选择的滚动和/或压缩文件。
支持的命令行参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| -channel="nsq_to_file": nsq 通道(channel) -consumer-opt=: 传递给 nsq.Consumer 的参数 (可能会给多次, http://godoc.org/github.com/bitly/go-nsq -datetime-format="%Y-%m-%d_%H": strftime,和 filename 里 <DATETIME> 格式兼容 -filename-format="<TOPIC>.<HOST><GZIPREV>.<DATETIME>.log": output 文件名格式 (<TOPIC>, <HOST>, <DATETIM -gzip=false: gzip 输出文件 -gzip-compression=3: (已经抛弃) 使用 --gzip-level, gzip 压缩级别(1 = 速度最佳, 2 = 最近压缩, 3 = 默认压缩) -gzip-level=6: gzip 压缩级别 (1-9, 1=BestSpeed, 9=BestCompression) -host-identifier="": 输出到 log 文件,提供主机名。 <SHORT_HOST> 和 <HOSTNAME> 是有效的替换者 -lookupd-http-address=: lookupd HTTP 地址 (可能会给多次) -max-in-flight=200: 最大的消息数 to allow in flight -nsqd-tcp-address=: nsqd TCP 地址 (可能会给多次) -output-dir="/tmp": 输出文件所在的文件夹 -reader-opt=: (已经抛弃) 使用 --consumer-opt -skip-empty-files=false: 忽略写空文件 -topic=: nsq 话题(topic) (可能会给多次) -topic-refresh=1m0s: 话题(topic)列表刷新的频率是多少? -version=false: 打印版本信息
|
nsq-to-file功能分析
首先理清nsq-to-file功能模块和逻辑思路,有助于提纲挈领,也会对比出,假如自己实现这程序时要面临的问题。
功能拆分
1.缓存管理、日志标识的控制、支持文件压缩及压缩级别控制、日期格式管理、输出文件名格式等nsq-to-file本身的功能控制;
2.指定监听channel,指定监听的topic;
3.挂载到lookupd,或直接监听至nsqd;
4.话题列表刷新频率;
5.其他。
逻辑分析
第1条基本是nsq-to-file自身功能的实现,2、3、4则是与nsq或go-nsq(nsq的golib)API的使用挂钩,归纳起来需要实现以下功能:
- 挂载到lookupd或nsqd
- 定义channel,并告知lookupd或nsqd
- 获取topics
代码文件可以围绕这些功能及逻辑进行拆分阅读。
代码文件
两个文件:nsq_to_file.go 和 strftime.go,均为main package。
nsq_to_file.go
先找到main函数
1 2 3 4 5 6 7 8
| func main() { cfg := nsq.NewConfig()
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt") flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()
|
第一个函数调用flag.Parse()是golang标准库中的命令行参数解析,可以追出允许使用什么命令行参数。
nsq_to_file flag参数功能解析
nsq_to_file opt参数设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| var ( showVersion = flag.Bool("version", false, "print version string")
channel = flag.String("channel", "nsq_to_file", "nsq channel") maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
outputDir = flag.String("output-dir", "/tmp", "directory to write output files to") datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format") filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)") hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens") gzipLevel = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)") gzipEnabled = flag.Bool("gzip", false, "gzip output files.") skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files") topicPollRate = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed") topicPattern = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes") rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")
nsqdTCPAddrs = app.StringArray{} lookupdHTTPAddrs = app.StringArray{} topics = app.StringArray{}
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)") )
|
consumer-opt
最近两个是reader-opt和consumer-opt,其实功能相同,建议使用consumer-opt。
这个命令行功能是为go-nsq设置config参数,因为nsq_to_file是在其基础上开发出来的。而且参数说明中也说了,有可能已经被设置好了的。
以下是config结构体的定义(文件:github.com/nsqio/go-nsq/config.go):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| type Config struct { initialized bool
configHandlers []configHandler
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"` WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
LocalAddr net.Addr `opt:"local_addr"`
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"` LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"` DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"` MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"` BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
ClientID string `opt:"client_id"` Hostname string `opt:"hostname"` UserAgent string `opt:"user_agent"`
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
TlsV1 bool `opt:"tls_v1"` TlsConfig *tls.Config `opt:"tls_config"`
Deflate bool `opt:"deflate"` DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"` Snappy bool `opt:"snappy"`
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"` OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
AuthSecret string `opt:"auth_secret"` }
|
从这里看出,使用golang默认标识的方式,已经设置好了参数的key(即opt后的值)和default值了。暂且不看各功能的含义(太多了),我们先看如何设置。
原文解释:
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).
Use Set(option string, value interface{}) as an alternate way to set parameters.
Set的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
func (c *ConfigFlag) Set(opt string) (err error) { parts := strings.SplitN(opt, ",", 2) key := parts[0]
switch len(parts) { case 1: err = c.Config.Set(key, true) case 2: err = c.Config.Set(key, parts[1]) } return }
|
可以看出,对命令行的要求是“key,value”,即使用“,”分隔。
如设置timeout的启动命令:
./nsq_to_file --lookupd-http-address=127.0.0.1:4161 --consumer-opt="read_timeout,100s"
继续main函数代码阅读
channel
1 2 3 4 5 6 7 8
| if *showVersion { fmt.Printf("nsq_to_file v%s\n", version.Binary) return }
if *channel == "" { log.Fatal("--channel is required") }
|
showVersion没什么好说的,关键是channelopt。
通过之前的准备工作,了解到NSQ消息分发是无单点的分布式拓扑结构,其中Topic和channel是其重要的组成单元,一个Topic下的信息会同时发送给不同的channel中,而同一个channel下的多个消费者中,只有一个可以获得此channel下的消息。
做一个测试:
再开启一个消费者终端,与之前不同的是,此终端监听Channel为"test-channel":
nsq_to_file --lookupd-http-address=127.0.0.1:4161 --channel="test-channel" --output-dir=tmp
向NSQ-http端口发送消息:
curl -d "hello world test5" "http://127.0.0.1:4151/put?topic=test"
可以看到输出文件中,有两行hello world test5,这是因为channel nsq_to_file(默认channel)和test-channel中都会有此条消息,而虽然channel nsq_to_file中有三个消费者在等待接收,但只有一个消费者获取到,其余继续等待。test-channel中的消费者获取信息后,也会写到/tmp目录下的文件中,因此两条消息均会被记录。
其他OPT
明白channel的作用后,在继续阅读channel的使用代码之前,将main后面几个同类型opt判定也看完:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| var topicsFromNSQLookupd bool
if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 { log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.") } if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 { log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both") }
if *gzipLevel < 1 || *gzipLevel > 9 { log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel) }
if hasArg("gzip-compression") { log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level") switch *gzipCompression { case 1: *gzipLevel = gzip.BestSpeed case 2: *gzipLevel = gzip.BestCompression case 3: *gzipLevel = gzip.DefaultCompression default:
|