flume与语音交互领域日志数据的ETL

JerryXia 发表于 , 阅读 (0)

我厂是一家国内知名的语音交互方案提供商。我厂的车联网客户端每天共计能生成上百G的语音识别、语义解析、对话、NLP等各种环节的日志。通过这些数据可以挖掘出很多有价值的信息。但是之前我厂一直没有对这些日志进行收集和分析,诚为可惜!

笔者于是承担起了这个任务。本文将着眼于ETL部分,介绍语音识别日志的收集与解析过程,以及分享我们在flume使用中的一些经验、教训以及对flume的改进方案。

ETL部分架构设计

我们收集的这部分日志来自公司的各种提供语音交互服务的车载智能产品,说白了都是物联网产品,需要这些客户端自己上报日志,目前我们在云端部署了一个web server集群来接收它们。当用户结束一次对话,设备进入休眠时就会上报。这套web server使用目前web届的新秀openresty技术开发。该技术直接将lua脚本嵌入nginx服务器执行。充分利用了nginx服务器的异步非阻塞特性,对高并发的支持非常好,而且系统资源占用少,非常轻量。

收集节点拿到上报的日志后包装一些元数据,之后通过kafka传输到大数据平台。引入kafka是出于以下几点的考虑:

  • 日志采集后的处理方式多样化,以后只要客户端改为实时上报日志,我们也可以直接将kafka对接storm进行流式分析

  • 线上日志产生量巨大,尤其在使用高峰期,如果直接同步日志对下游服务负荷较重,容易因为故障导致日志阻塞延迟和丢失,所以引入了kafka

  • 消息可以持久化,并且可以进行日志回溯。有了消息队列,上游服务和下游服务完全解藕,网络传输会更稳定、更高效、更均衡,避免级联效应。尤其我们这里是异地机房之间的数据传输,通过kafka可以有效防止网络异常导致的数据丢失。

数据进入大数据平台后首先进入flume集群进行ETL工作。这里就是本文的重点。接下来我会详细介绍我们在这里的工作。

flume应用与改造

flume架构如下图:
flume主要由source、channel、sink三部分组成,所不同的是我们这里flume的上游是kafka而不是web server。

但是这里的flume并不能很好地满足我们的需求。这主要是我们的数据格式造成的。我们客户端的日志在经过openresty收集系统的包装之后是一个非常大的json,基本上每次上报过来的都是一个上百KB的纯文本,json里面有部分是元数据,记录会话的id,上报设备的deviceId,上报时的服务器时间,上报设备的版本等等。另一部分则是真正的数据,被包裹在一个json的key中,通过换行符分隔每一条具体的日志,一般一次普通的持续5分钟的闲聊就会产生大概2500条以上的日志。每一行又用制表符分隔开了一些信息,比如日志的时间戳(当然,这里的时间是客户端时间),日志类型,日志内容等。

所以我们要做的工作是:

  • 解析外层json,拿到元数据走一个channel和sink组,存到hdfs中相关路径,我们称之为session数据
  • 拿到真正日志数据部分,按换行符split,变成多条日志数据,并把时间戳校准为服务器时间,还需要把元数据中的相关id加到这里的每一行作为外键,然后存入另一个channel和sink组,存到hdfs中相关路径,我们称之为timeline数据,有点像拉链表
  • timeline数据解析成多行数据后,有些特殊事件类型对应的内容还要做进一步解析,由于语音交互各环节日志内容多种多样,这里要根据不同的事件类型做不同类型的解析,有些内容是json,有些是某种特殊规则的字符串,甚至一些不太规范的数据,需要我们通过正则表达式匹配。这种数据解析后走另一个channel和sink组,存到hdfs中相关路径,我们称之为event数据。

这样,我们的设计类似于flumeflume本身支持将一个source的数据分发到多个channel和sink组,但是只支持对同一份数据做分发。如果要满足我上述的需求,就必须至少使用两级flume,第一级把一个日志json拆分成多行,第二级再解析每一行的日志。这样第一级的flume功能过于单一,浪费了资源,于是我决定改造flume,让flume在source进程中既完成json拆解,又能按不同种类对多行日志数据进行分发。

首先观察flume源码,我们使用的是1.7.0版本。由于我们的flume是接收kafka的数据,所以观察KafkaSource.java的doProcess方法。这个方法在flume启动之后会定期被执行,它从kafka中取数据,组装成Event数组,然后通过

getChannelProcessor().processEventBatch(eventList);

将这组事件传递给channel,但是在传递给channel之前,我们可以看到ChannelProcessor.java的148行

events = interceptorChain.intercept(events);

flume将数据先交给了拦截器处理,由于这里我们用了cloudera开源的morphline拦截器来做日志解析,因此,这里会走到MorphlineInterceptor.java的intercept方法。这里有多个该方法的重载,实际执行的是这个:

    @Override    public List<Event> intercept(List<Event> events) {      List results = new ArrayList(events.size());      for (Event event : events) {        event = intercept(event);        if (event != null) {          results.add(event);        }      }      return results;    }

也就是说,kafka一次会批量传输多个大的日志json,这里的for循环会一次处理每个json,但是这里问题出来了,for循环里的intercept方法只支持将每个json解析出来的结果作为一个event对象。我们不能把它解析成成百上千条日志,而且还要分发到不同的channel。

好在这个方法的签名本来就是要返回event数组的,这里不会检查输入和输出的数组长度是否匹配。于是我们可以很简单的做如下修改:

    @Override    public List<Event> intercept(List<Event> events) {      List results = Lists.newArrayList();      for (Event event : events) {        List<Event> eves = interceptDispatch(event);        if (eves != null) {          results.addAll(eves);        }      }      return results;    }    private List<Event> interceptDispatch(Event event) {      collector.reset();      morphline.process(event);      List<Record> results = collector.getRecords();      if (results.size() == 0) {        return null;      }      List<Event> events = Lists.newArrayList();      for (Record record: results) {        Event result = toEvent(record);        events.add(result);      }      return events;    }

那么这样修改是否可行呢?我们需要继续深入morphline的源码。我们可以看到这里也是调用了morphline的process方法来处理数据。并从collector对象中拿到morphline解析后的数据。

morphline是flume的插件,用来解析数据,用户可以通过写配置文件的方式来调用morphline的各种解析器来做相应解析工作,有点类似于logstash,只是配置文件语法不同。它位于cloudera的hadoop工具包项目kite中。

morphline这里值得关注的是AbstractCommand.java中的buildCommandChain方法,从这里我们就可以看到morphline会解析用户的配置文件,然后通过责任链模式,将用户的解析命令组装成链,有趣的是这个方法像三明治一样在每个链条中间夹着一个Connector命令,并且在责任链的最后也加上这个Connector。通过debug,我们发现,执行到最后这个Connector对象时,再执行下一环的process方法时,就会最终回到flume代码中MorphlineInterceptor类的内部类Collector的process方法,代码如下:

private static final class Collector implements Command {    private final List<Record> results = new ArrayList();    public List<Record> getRecords() {      return results;    }    public void reset() {      results.clear();    }    @Override    public Command getParent() {      return null;    }    @Override    public void notify(Record notification) {    }    @Override    public boolean process(Record record) {      Preconditions.checkNotNull(record);      results.add(record);      return true;    }  }

由此我们可以确定,这个内部类本身是支持返回多个记录的,当责任链开始时无论我们怎么将解析过程拆解成多个分支,collector.getRecords()方法都会阻塞,直到所有分支执行完成。而同一个入口的数据会共享这个内部类的results对象,因此最终他将包含一个入口进去的json经解析后生成的所有日志。

接下来我们要做的就是自己实现一个morphline的command,让它将日志拆分,并转发到不同的解析责任链中。

我们实现了一个叫ToLineLogs的command

@Overrideprotected boolean doProcess(Record inputRecord) {      Record template = inputRecord.copy();    template.removeAll(Fields.ATTACHMENT_BODY);    template.removeAll(Fields.ATTACHMENT_MIME_TYPE);    template.removeAll("key");    List logs = inputRecord.get(Fields.ATTACHMENT_BODY);    for (Object log : logs) {        AIOSLog aiosLog = (AIOSLog) log;        Record metadata = genMetadata(template, aiosLog);        // 输出元数据record        if (!getChild().process(metadata)) {            return false;        }        // 输出解析的好的具体日志        List<Record> lineLogs = genLineLogs(template, aiosLog);        for (Record lineLog : lineLogs) {            if (!getChild().process(lineLog)) {                return false;            }