golang-NSQ源码阅读 5-nsq_to_file源码解析-3 输出 | KaiQ.Gu|KerwinKoo Blog

JerryXia 发表于 , 阅读 (0)

nsq_to_file 输出文件管理

本文在NSQ代码阅读-nsq_to_file代码阅读的基础上,对NSQ消息获取及获取之后的文件操作进行代码解析。涉及解析文件为:

nsq_to_file.gostrftime.go

功能解析

1 文件名格式化:

1
2
datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")

2 文件压缩

1
2
gzipLevel      = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
gzipEnabled = flag.Bool("gzip", false, "gzip output files.")

3 文件滚动

1
2
rotateSize     = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

文件滚动指的是,在到达指定文件size,或指定文件读写时间段的情况下,创建新的文件,续写新的message。

4 跳过空白文件

1
skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")

功能实现

文件名称格式化

函数NewFileLogger主要负责对输出的文件名进行格式化。

函数NewFileLogger在nsq_to_file中只被使用一次,但会根据topic的个数来调用多次。其功能是获取日志文件句柄,并对文件名、文件操作(是否压缩及压缩level、文件滚动写入等)做规范。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) {
// TODO: remove, deprecated, for compat <GZIPREV>
// 默认格式为:"<TOPIC>.<HOST><REV>.<DATETIME>.log"
// strings.Replace:If n < 0, there is no limit on the number of replacements.
filenameFormat = strings.Replace(filenameFormat, "<GZIPREV>", "<REV>", -1) // filenameFormat中的<GZIPREV>替换为<REV>,
if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 {

// 指定压缩、文件滚动后,文件名格式中必须存在<REV>字段
// strings.Index:如果字符串中没有出现指定字段,函数返回-1
if strings.Index(filenameFormat, "<REV>") == -1 {
return nil, errors.New("missing <REV> in --filename-format when gzip or rotation enabled")
}

} else { // remove <REV> as we don't need it
filenameFormat = strings.Replace(filenameFormat, "<REV>", "", -1)
}

hostname, err := os.Hostname() // 获取操作系统的主机名称
if err != nil {
return nil, err
}
shortHostname := strings.Split(hostname, ".")[0]
identifier := shortHostname
if len(*hostIdentifier) != 0 { // 根据opt需要,决定HOSTNAME的长度
identifier = strings.Replace(*hostIdentifier, "<SHORT_HOST>", shortHostname, -1)
identifier = strings.Replace(identifier, "<HOSTNAME>", hostname, -1)
}
filenameFormat = strings.Replace(filenameFormat, "<TOPIC>", topic, -1)
filenameFormat = strings.Replace(filenameFormat, "<HOST>", identifier, -1)
// 通过区分PID,实现支持多终端运行时,对不同文件的写操作。
filenameFormat = strings.Replace(filenameFormat, "<PID>", fmt.Sprintf("%d", os.Getpid()), -1)

// 判断是否需要加压缩后缀.gz
if gzipEnabled && !strings.HasSuffix(filenameFormat, ".gz") {
filenameFormat = filenameFormat + ".gz"
}

f := &FileLogger{
logChan: make(chan *nsq.Message, 1),
compressionLevel: compressionLevel,
filenameFormat: filenameFormat,
gzipEnabled: gzipEnabled,
ExitChan: make(chan int),
termChan: make(chan bool),
hupChan: make(chan bool),
}
return f, nil
}

函数最终会返回一个FileLogger结构体实例,但结构体中没有topicchannel的属性,因此可以看做是比较独立的输出文件句柄结构体。但是该结构体承接着信息的读取、文件的编辑等功能,则其围绕的功能函数需要将功能一一实现。

句柄结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type FileLogger struct {
out *os.File
writer io.Writer
gzipWriter *gzip.Writer
logChan chan *nsq.Message
compressionLevel int
gzipEnabled bool
filenameFormat string

ExitChan chan int
termChan chan bool
hupChan chan bool

// for rotation
lastFilename string
lastOpenTime time.Time
filesize int64
rev uint
}

获取时间戳做输出文件名

1
2
3
4
5
func (f *FileLogger) calculateCurrentFilename() string {
t := time.Now()
datetime := strftime(*datetimeFormat, t) //根据opt的时间格式要求,生成时间戳文件名
return strings.Replace(f.filenameFormat, "<DATETIME>", datetime, -1)
}

注意

  • 输出文件名格式要求是用户在opt中进行指定的,默认为:"%Y-%m-%d_%H",实现过程中需要转换为go的time format(在文件strftime.go中实现);
  • 生成的时间戳最终会替换掉文件名格式化占位符中的<DATETIME>

nsq_to_file的其他代码阅读

hasArg

在main函数中首次用到,用于检索opt中是否有指定的key

1
2
3
4
5
6
7
8
9
10
func hasArg(s string) bool {
argExist := false
flag.Visit(func(f *flag.Flag) {
if f.Name == s {
argExist = true
}
})

return argExist
}

之前版本的实现有字符串包含的bug,上面的代码实现是阅读代码后我在github中做出的提交(目前已commit主版本)。函数的核心是下面这段代码:

1
2
3
4
5
flag.Visit(func(f *flag.Flag) {
if f.Name == s {
argExist = true
}
})

其中参数func(f *flag.Flag)中,参数f会在flag包中被赋值:

1
2
3
4
5
6
7
// Visit visits the flags in lexicographical order, calling fn for each.
// It visits only those flags that have been set.
func (f *FlagSet) Visit(fn func(*Flag)) {
for _, flag := range sortFlags(f.actual) {
fn(flag)
}
}

上面代码中可以看到,函数参数fn会被for循环调用,因此fn函数内部的比较if f.Name == s亦会被多次调用,轮询对比。