第 8 章 点击流分析

第 8 章 点击流分析

点击流分析通常分析用户浏览网站产生的事件,即点击数据。这种分析的目的一般是了解网站访问者的用户行为,从而向基于数据的后续决策提供信息。

点击流分析能够发现用户与线上展示的交互方式、网站流量的地域来源、用户访问网站的高频设备和操作系统、用户完成特定行为(这里一般指某种目标,比如订阅邮件列表、注册新账户、加入购物车)的通常路径等。有了点击数据和市场推广信息,你便可以进行进一步的分析,比如分析各个营销渠道(自然检索、付费搜索、社交媒体支出等)的投资回报率(Return On Investment,ROI)。另外,还可以将点击数据与操作数据存储层(Operational Data Store,ODS)或客户关系管理系统(Customer Relationship Management,CRM)数据相关联,基于更丰富的用户信息深入研究。

本章以点击流数据为例,讲解如何将第一部分中的各种工具结合在一起,但是本章涉及的各个概念适用于任何对机器产生的数据进行批处理的系统。此类数据包括但不仅限于以下几种:广告展示数据、性能或其他日志,还有网络日志及通信日志。

8.1 用例场景定义

假设你是出售自行车部件的网络零售商。用户可以访问你的网站,浏览自行车的零部件、浏览评论,还可以下单付款。你的网站程序会将用户的页面浏览和链接点击全部记录到日志中。一般情况下,网站产生的是纯文本的 Apache 或 Nginx 日志,有些时候点击日志是网络分析工具通过跟踪网站形成的定制化的日志。但无论怎样,点击日志会严格按照时间戳进行追加。

作为网络零售商,你肯定想更加了解自己的用户群,想进行营销和市场方面的优化。为了达成这样的目标,你需要回答如下问题。

  • 上个月我的网站上有多少页面浏览量(Page View,PV)?环比增长率如何?

  • 上个月我的网站上有多少独立访客数(Unique Visitor,UV)?环比增长率如何?

  • 访客购物时的平均停留时间有多长?

  • 网站的跳出率(bounce rate)是多少?换句话说,有多少用户打开本网站后尚未跳转至其他页面便已结束访问?

  • 对于每个页面,新用户登录并在会话中发生购买行为的概率是多少?

  • 对于每个页面,新用户登录并在七天内发生购买行为的概率是多少?

  • 购买行为是通过哪一种营销渠道(直接访问、自然检索、付费搜索等)发生的?

想要回答以上乃至更多的问题,则需扫描、处理和分析 Web 服务器记录的日志。

为便于后续讨论,我们假定网络日志均以最常见的格式出现,即组合日志格式(combined log format)。此类格式中的每一条日志皆如下所示(注意:限于排版和篇幅,如下示例会在多行呈现,但实际情况下,一条日志记录即为连续的一行)。

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /
apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html"
"Mozilla/4.08 [en] (Win98; I  ;Nav)"

以上日志具体包含以下部分。

  • 127.0.0.1

    此处是源 IP 地址(即发生点击的设备的 IP 地址)。

  • -

    第二个字段在例子中使用连字符表示,它一般指客户端标识。实际情况下我们通常认为这不可信,而且会直接被服务端忽略。

  • frank

    如果需要的话,在此处写明用户名,即 HTTP 认证的身份标识。

  • 10/Oct/2000:13:55:36 -0700

    该字段记录服务完成处理请求的时间,并带有时区。

  • GET /apache_pb.gif HTTP/1.0

    该字段表示请求的类型,以及客户端请求的页面。

  • 200

    此处是 Web 服务器返回给客户端的状态标识码。

  • 2326

    该字段表示返回给客户端的对象大小,不包括响应报文头部的大小。

  • http://www.example.com/start.html

    如果需要的话,用该字段代表引用地址(若存在),即引导用户进入当前地址的上一地址。

  • Mozilla/4.08 [en] (Win98; I ;Nav)

    本字段是用户代理字符串,能够标识发生点击行为的设备具有的浏览器和操作系统。

8.2 使用Hadoop进行点击流分析

一个活跃的站点每天能够产生数 GB 的数据。存储和分析如此庞大的数据需要一个十分健壮的分布式系统。另外,日志数据的生成是非常迅速的。通常情况下,日志在一天甚至一个小时之内需要多次轮转更替。这是因为分析人员想要尽早知道最新的变化与总体的趋势。此外,日志数据是半结构化的,一条日志记录可能有需要解析的用户代理字符串,也可能有新增或者不久便会移除的字段,如测试用的参数。考虑到这些特点,Hadoop 仍不失为一个点击流分析的利器。

本章会展示如何使用 Apache Hadoop 及生态圈中相关的其他项目,构建一个能够对点击流数据进行收集、处理和分析的应用。

8.3 设计概述

我们将此案例的整体架构分为五大部分:数据存储、数据采集、数据处理、数据分析及协调调度。

  • 数据存储模块对存储系统(如 HDFS 或 HBase)、数据格式、压缩算法以及数据模型等进行技术选型。

  • 数据采集模块从网络服务器或二级数据源(如 CRM、ODS、市场推广数据)等获取点击流数据,并将其加载到 Hadoop 中待处理。

  • 数据处理模块将原始数据导入 Hadoop,并对其进行转换加工,形成支持分析和查询的数据集。此处的处理一词包括但不仅限于以下内容:数据去重、去除无效点击、将点击数据转化为列式存储格式、生成会话(通过关联每次点击中代表这次访问的会话 ID,将单个用户单次访问的多个点击过程合并)及聚合等。

  • 数据分析模块在预处理后的数据集上进行各种分析类的查询,以找到本章前面提到的问题的答案。

  • 协调调度模块自动组织和协调进行数据采集、处理、分析的各个过程。

图 8-1 展示了这一设计的整体规划。

{%}

图 8-1:设计概览

我们的设计实现将采用 HDFS 作为数据存储,使用 Flume 收集 Apache 日志,使用 Sqoop 将其他的二级数据源(如 CRM、ODS、市场推广数据)导入 Hadoop。我们还将采用 Spark 进行预处理,采用 Impala 分析处理后的数据。将 BI 工具连接到 Impala 之后,我们就能够在这些数据上进行交互式的查询。我们还将使用 Oozie 协调调度多个操作,使其成为一个完整的工作流。

接下来介绍设计方案的具体内容,包括文件格式与数据模型的选型、使用 Web 应用将点击流信息发送到 Flume 中、配置 Flume 将这些数据存储成文本格式等细节。我们还会讨论这里涉及的各种处理算法(尤其是会话生成算法及其实现方式),如何使用 Impala(或其他 SQL-on-Hadoop 工具)在海量数据集上进行高效的聚合操作。我们也将在合适的地方讲解数据的自动处理(如数据采集、数据加工等)。另外,我们还会比较方案中选择的工具与候选工具,并讲述选择的理由。

接下来我们会逐个讲解上面提到的设计。

8.4 数据存储

正如前面提到的,我们的应用程序将使用 Flume 来收集原始数据,进行若干步转换操作,生成清洗后的更加丰富的数据集,以供数据分析使用。迈出这一步之前,首先需要确定原始数据、数据转换中间结果及最终数据集的存储方式。由于这几类数据集面向的需求不同,我们希望最终确定的存储选型,即数据格式及数据模型,能满足数据集的需要。

首先,由 Flume 采集到的原始数据以纯文本的格式存储至 HDFS 中。我们之所以选择 HDFS 是因为后续的数据加工需要对多种记录进行批量转换操作。正如前面章节提到的,对于扫描大数据集这样的批处理操作,HDFS 的性能不错。我们之所以选择文本格式,是因为该格式处理简单,对任何日志类型均通用,且不需在 Flume 进行额外的处理。

但是,我们对处理后的海量数据进行聚合查询和分析时,通常只涉及一部分列。分析类的查询通常需要扫描一个月的数据,最少也得扫描一天的数据。出于数据扫描性能的考虑,这里的确应该使用 HDFS。许多分析类查询每次执行时只会选择一部分列的数据,因此,对于处理后的数据,最好选择一种列式存储格式。我们使用的列存储格式是 Parquet,用来存储处理(如生成会话)后待分析的数据。考虑到 Snappy 算法对 CPU 性能有所提升,我们还会使用该算法对处理后的数据进行压缩。关于所有文件类型和压缩格式的讨论,及最优文件的选择方法,参见第 1 章。

我们计划长期存储原始数据,而不是处理后便将其删除。Hadoop 应用的设计通常都是如此。这样做有以下几点好处。

  • 在 ETL 过程中,如果出现 bug 或失败,数据的重新处理会更为容易。

  • 先前由原始数据生成处理后的数据集时,我们可能忽略掉了一些有意思的特征。保留原始数据,可以直接对其进行分析。另外,设计 ETL 流水线时需要浏览原始数据,决定将哪些特征包含到处理后的数据集中,所以这对于数据发现和探索是有意义的。

  • 出于审计的考虑,保留原始日志也是很有意义的。

我们将采用 1.2 节中提到的目录结构来存储数据集。因为分析是基于“天”级别(一天或者多天)的,所以我们决定按照日期对点击数据进行分区,叶子节点的分区对应某一天的点击数据。对于点击数据,以下是存储原始数据和处理后数据的目录结构。

  • /etl/BI/casualcyclist/rawlogs/year=2014/month=10/day=10

    该路径对应的是原始数据集,直接由 Flume 产生。

  • /data/bikeshop/clickstream/year=2014/month=10/day=10

    该路径对应的是处理后的数据集,即直接供分析所用,已去除噪声的会话数据集。

如你所见,我们使用了三级(年、月、日),对以上两类数据按照日期进行了分区。

 处理后的数据集大小

处理后的数据集一般比原始数据集要小一些。如果处理后的数据集以更为方便压缩的格式(如 Parquet 这样的列式存储格式)进行存储,则更是如此。于是,叶子节点分区的平均大小(如代表一天数据的分区)可能太小,不能充分利用 Hadoop。正如第 1 章中提到的那样,每个分区的平均大小应当至少是 HDFS 块大小(默认 HDFS 块大小为 64MB)的几倍,这样才能更好地利用 Hadoop 的处理能力。如果发现处理后的数据集中每个天级分区较小,那么最好不要使用天级分区,只保留年和月两级分区即可。原始数据可以仍选择年、月、日三级分区的形式。

在本章其余的内容中,我们假定处理后的数据集中,天级分区足够大,数据以三级分区的形式呈现。

另外,你可能看到过这样一种一级分区模式,形如 dt=2014-10-10,而不是使用三级分区模式 year=2014/month=10/day=10。前者实际上是另外一种有效的分区方式,这样原始数据和处理后的数据均只需拥有一个分区列(一个类型为 string 的 : dt 列),而不是三个分区列(列名为 yearmonthday 的三个 int 类型的列)。一级分区模式使得总分区数减少,即便叶分区数等于总分区数目。分区更少,存储的逻辑元数据信息也就更少,这意味着使用访问元数据的 Hive、HCatalog 和 Impala 进行该数据集上的查询,性能可能会稍有提升。但是,一级分区模式与三级分区模式相比,前者查询时的灵活性更差。举例来说,如果在带有三级分区的数据集上进行年级别的分析的话,我们可以简单地指定 WHERE year=2014,而不是 WHERE dt LIKE '2014-%'

不过就天级数据来看,二者均是合理的分区模式,使用起来也基本没有差别。你可以根据偏好和风格选择其中一个。本章将使用三级分区模式。

图 8-2 和图 8-3 分别展示了三级分区模式和一级分区模式。

图 8-2:三级分区模式

图 8-3:一级分区模式

想要了解更多关于 HDFS 和 Hive 模式设计的内容,请参考《Hive 编程指南》。

8.5 数据采集

正如我们在第 2 章中提到的那样,就导入数据到 Hadoop 而言,我们有多种方式。下面就我们的架构来评估这些工具,了解它们是否符合我们的需求。

  • 文件传输

    该方式适合一次性的文件传输,对于海量点击流数据的采集并不可靠。

  • Sqoop

    Sqoop 是数据导入和导出至外部存储(如关系型数据库管理系统)的利器。很明显,Sqoop 并不适合用于日志数据采集。

  • Kafka

    正如第 2 章中讨论的,Kafka 的架构决定了它是将海量日志数据从网络服务器移动到各种消费者(包括 HDFS)的优秀可靠方案。因此,我们将其作为架构中数据采集选型的重要候选之一。

  • Flume

    类似于 Kafka,将海量事件类型数据(如日志)移动至 HDFS 时,Flume 也是一个优秀可靠的选择。

日志采集的候选项正是 Kafka 和 Flume。二者均提供了我们所需要的可靠可扩展日志数据采集功能。我们的应用场景只需将日志数据加载到 HDFS 中,所以这里选择 Flume,因为它是面向 HDFS 的。它内置了写 HDFS 的模块,这意味着我们不需做任何定制化的开发,就可以直接搭建 Flume 流水线。Flume 还支持拦截器机制,支持数据的基本变换,比如过滤搜索爬虫的无效点击。而且,这种变换是在数据采集后、写入 HDFS 之前完成的。

如果需要建设一个更加通用的流水线,并使其支持除了 HDFS 以外的多种数据持久化方式,我们则会考虑 Kafka。

既然已经确定了数据采集的工具,下面我们讨论一下将数据导入 Flume 流水线的一些选项。

  • 如果网络服务器是用 Java 语言编写的,并且使用 Log4j 记录日志,那么我们可以使用 Flume Log4j 输出将数据发送给 Flume 的 Avro 数据源。这是将事件数据发送到 Flume 的最简方式,只需将一个文件(flume-ng-sdk\2.jar)加入应用环境的 Classpath 中,并对 Log4j 的配置文件稍作修改即可。不过,在本例中,我们使用的网络服务器是一个第三方的应用程序,我们无法修改其代码,或许它并不是用 Java 语言编写的,所以 Log4j 输出并不适合当前场景。

  • 对于当前这种 Log4j 输出并不适合的应用场景,还是有其他一些不错的选择。你可以用 Avro 数据源或 Thrift 数据源分别给 Flume 发送 Avro 或 Thrift 消息。Flume 也可以从 JMS 消息队列中将消息拉取出来。另外一个办法是向 HTTP 数据源发送 JSON 格式的消息。选取哪种方式集成 Flume,取决于你使用的应用框架。这里有多种集成方式可供选择,便于 Flume 与已有的架构更好地融合。举例来说,如果你原本就会把事件发送到 JMS 队列中,那么就直接集成 Flume 去队列中拉取数据。如果你之前并未使用 JMS 队列,也没必要为集成 Flume 引入它,还有其他的选项可供选择。以上这些都不适合通过磁盘读取日志的应用场景。

  • Flume 提供了 Syslog 数据源,这种数据源能够从 Syslog 数据流中读取事件,并将其转化成 Flume 事件。Syslog 是系统日志采集和移动的标准化工具。Apache HTTP 服务器支持将日志输出到 Syslog 中,不过仅限错误级别的日志。要将访问日志页发送到 Syslog,你需要一个变通方案,比如借助管道对访问日志重定向。

  • 我们的应用场景针对一个第三方的 Web 应用,无法通过修改应用来灵活地定制化输出点击流数据,又需要处理已经存储到磁盘上的日志文件。我们来关注一下 Flume 的 Spooling Directory Source。它从一个目录中读取文件,并将文件中的每一行转换成一个 Flume 事件。注意,在网络上流传着一些使用 exec 数据源读取日志文件尾部的做法。这不是一个可靠的解决方案,很不可取。直接读取日志文件尾部,上一次读到的具体位置不会记录。Flume 程序崩溃会导致多读或者少读一些,这两种情况都是我们不希望看到的。相比之下,Spooling Directory Source 只读取已经关闭的完整文件,所以在遇到失败的情况下,它会重试整个文件并标记为成功,从而保证不会采集两次。对于从磁盘上采集数据来讲,这是一个更为可靠的方案。

我们已经选定了生成事件到 Flume 流水线的数据源,接下来需要考虑一个合适的 Flume Channel 承接上一步数据。正如第 2 章中提到的那样,如果性能比稳定性更为重要,那么 Memory Channel 就是最好的选择。如果稳定性更为重要则应当选择 File Channel。对于本章的例子,稳定性是非常重要的。在流量高峰时,Memory Channel 可能会丢失部分点击,当站点负载较高时,也可能出现这类数据丢失问题。因此,我们在 Flume 分层选型时,均选用 File Channel。

显然,我们要把数据存储到 HDFS 上,因此要采用 HDFS Sink。另外,为了更加清晰地了解处理过程,我们将数据存储为纯文本。数据会按照各自的时间戳存储在 HDFS 上的不同目录内。后面深入了解采集层的架构时,我们会继续讨论更多细节。按照日期和时间进行分区的方式能够减少数据处理时的磁盘 I/O,使数据处理任务更加高效。

现在我们已经确定了数据采集层的架构,下面来深入了解一下 Flume 流水线的架构。我们首先看一下整个顶层视图,再来认识流水线中每层的具体配置细节。图 8-4 展示了我们的采集工作流的顶层视图。在这个流程中,你会看到,我们采用的是扇入的模式(第 2 章中有所提及)。

{%}

图 8-4:Flume 采集架构的整体视图

在图 8-4 中,整个流水线主要分成了三层。

  • 客户端层

    本章示例中,客户端层(client tier)由产生日志的网络服务器构成。这些日志需要采集到 HDFS 中,以便按照要求对其进行分析。此处需要注意的是,每个网络服务器的主机上都有一个 Flume agent,承担着获取网络服务器产生的日志事件,并发送到收集器层的责任。

  • 收集器层

    收集器层(collector tier)的主机能汇聚从客户端层发过来的事件,并传递到最终的目的地,即 HDFS。本章的例子假定收集器层的 agent 在集群的边缘节点上运行——这些节点在集群网络之中,它们拥有 Hadoop 的客户端配置,能够访问 Hadoop 集群并提交 Hadoop 命令、向 HDFS 写数据等。

  • HDFS 层

    这是数据的最终目的地。收集器层的 Flume agent 承担着将事件持久化到 HDFS 文件的职责。作为持久化操作的一部分,Flume agent 要进行配置,以确保 HDFS 上文件的分区以及文件名正确无误。

在如上配置中,Flume agent 在网络服务器和收集器层上运行。注意:我们不在 HDFS 节点上运行 Flume agent,收集器层的 agent 会使用 Hadoop 客户端将事件写入 HDFS。

正如第 2 章中提到的,这种扇入式的架构设计有如下好处。

  • 允许控制连接到集群的 Flume agent 数目。如果我们的网络服务器屈指可数,那么将服务器的 agent 直接连到集群中可能不会带来什么问题。但是如果有成千上百台服务器连到集群,就会引发集群的资源问题。

  • 该架构能够利用收集器层节点较大的本地磁盘空间缓存事件,缓解网络服务器磁盘空间紧张的问题。

  • 负载能够在多个收集器 agent 之间进行均衡。如果一个或多个收集器 agent 意外退出,该架构还可以支持事件采集的故障恢复。

接下来,我们来关注 Flume 流水线中各个层次的更多细节。

8.5.1 客户端层

图 8-5 描述客户端层 Flume agent 的具体细节。

{%}

图 8-5:客户端层细节

下面介绍客户端层 Flume agent 的各个组件。

  • 数据源

    数据源即指进入 Flume 流水线的事件,也就是本例中的 Apache 日志记录。正如前面提到的,本章示例中使用的是 Spooling Directory Source,所以我们会从每个网络服务器的特定目录上拉取数据源。

  • Flume 数据源

    此处的 Flume 数据源即为 Spooling Directory Source,能够读取磁盘的特定目录下要采集的文件,并将其转化为事件,还会在处理完事件后重命名文件。

  • 时间戳拦截器

    该拦截器的作用是向每个 Flume 事件的头部插入一个时间戳。后面的 HDFS Sink 会用到该时间戳,从而保证按照日期分区存储结果文件。

  • Channel

    出于稳定性考虑,我们使用 File Channel 作为 Flume Channel。

  • Avro Sink

    此处的 Avro Sink,再加上后面的 Avro 数据源,为 Flume 客户端层之间的事件传输提供了一个序列化机制。此处需要注意的是,之前我们提到的故障恢复(failover)功能就是通过设置多个 Sink 来保证的,并且由一个支持负载均衡(load balancing)的 Sink 组实现在所有可用 Sink 之间的负载分发。

 选择 Flume 的 Avro 数据源关乎通过 HDFS Sink 存储到 HDFS 上的文件格式。正如第 1 章中提到的那样,Avro 是一种序列化格式,可用于进程间的数据传输及文件系统(如 HFDS)的数据存储。例子中的 Avro Sink 与 Avro 数据源就扮演着这样一种角色,序列化传输过程中的事件数据。以什么格式将点击数据存储到 HDFS 中,取决于最终的 Flume Sink。

下面是用于客户端层的 Flume agent 的配置文件示例。该配置文件将部署到所有的网络服务器上。

# 定义客户端层数据源为Spooling Directory Source:
client.sources=r1
client.sources.r1.channels=ch1
client.sources.r1.type=spooldir
client.sources.r1.spoolDir=/opt/Weblogs
# 使用时间戳拦截器向所有的事件头部添加时间戳:
client.sources.r1.interceptors.i1.type=timestamp
client.sources.r1.interceptors=i1
# 定义客户端层的Channel为File Channel:
client.channels=ch1
client.channels.ch1.type=FILE
# 定义客户端层的Sink为两个Avro Sink:
client.sinks=k1 k2
client.sinks.k1.type=avro
client.sinks.k1.hostname=collector1.hadoopapplicationarch.com
# 发送数据前配置其压缩:
client.sinks.k1.compression-type=deflate
client.sinks.k1.port=4141
client.sinks.k2.type=avro
client.sinks.k2.hostname=collector2.hadoopapplicationarch.com
client.sinks.k2.port=4141
client.sinks.k2.compression-type=deflate
client.sinks.k1.channel=ch1
client.sinks.k2.channel=ch1
# 定义一个负载均衡的Sink组,以保证在收集器层的多个节点间能够分散负载:
client.sinkgroups=g1
client.sinkgroups.g1.sinks=k1 k2
client.sinkgroups.g1.processor.type=load_balance
client.sinkgroups.g1.processor.selector=round_robin
client.sinkgroups.g1.processor.backoff=true

8.5.2 收集器层

图 8-6 所示为收集器层的 Flume agent 的具体细节。

{%}

图 8-6:收集器层细节

下面介绍收集器层 Flume agent 的各个组件。

  • Avro 数据源

    我们在讨论 Avro Sink 的时候就提到过,此处的 Avro 数据源就是数据在客户端层序列化后,到收集器层进行反序列化的“关键一跳”。

  • Channel

    此处我们还是选择使用 File Channel 作为 Flume Channel,以确保事件在传递过程中的可靠性。为了进一步提高整体的可靠性,你可以利用集群边缘节点上的多块磁盘,详情参见第 2 章。

  • HDFS Sink

    到了 Flume 流水线的最后一个环节,我们将日志事件持久化到 HDFS。值得注意的是,HDFS Sink 的配置使用 hdfs.pathhdfs.filePrefix 以及 hdfs.fileSuffix 等参数实现最终文件按日期分区和文件名定义。配置这些参数后,最终文件将会呈现这样的格式:/Weblog/combined/YEAR/MONTH/DAY/combined.EPOCH_TIMESTAMP.log。另外,这里要用到纯文本文件,应设置相应的 HDFS Sink 文件格式参数:hdfs.fileType=DataStreamhdfs.writeFormat=Text

下面是用于收集器层的 Flume agent 的配置文件示例。该配置文件将部署到所有从属于收集器层的集群边缘节点上。

# 定义收集器层的数据源为Avro数据源:
collector.sources=r1
collector.sources.r1.type=avro
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.port=4141
collector.sources.r1.channels=ch1
# 对收到的数据进行解压缩:
collector1.sources.r1.compression-type=deflate
# 定义收集器层的Channel为File Channel,并使用多块磁盘以提高可靠性:
collector.channels=ch1
collector.channels.ch1.type=FILE
collector.channels.ch1.checkpointDir=/opt/flume/ch1/cp1,/opt/flume/ch1/cpt2
collector.channels.ch1.dataDirs=/opt/flume/ch1/data1,/opt/flume/ch1/data2
# 定义收集器层的Sink为HDFS Sink,保证将事件以纯文本的形式写到磁盘中。
# 注意:为分散负载,配置使用了多个Sink:
collector.sinks=k1 k2
collector.sinks.k1.type=hdfs
collector.sinks.k1.channel=ch1
# 按照日期对文件进行分区:
collector.sinks.k1.hdfs.path=/Weblogs/combined/%Y/%m/%d
collector.sinks.k1.hdfs.filePrefix=combined
collector.sinks.k1.hdfs.fileSuffix=.log
collector.sinks.k1.hdfs.fileType=DataStream
collector.sinks.k1.hdfs.writeFormat=Text
collector.sinks.k1.hdfs.batchSize=10000
# 对HDFS上的文件,满足10 000条事件或达到30秒即关闭:
collector.sinks.k1.hdfs.rollCount=10000
collector.sinks.k1.hdfs.rollSize=0
collector.sinks.k1.hdfs.rollInterval=30
collector.sinks.k2.type=hdfs
collector.sinks.k2.channel=ch1
# 同样按照日期对文件进行分区:
collector.sinks.k2.hdfs.path=/Weblogs/combined/%Y/%m/%d
collector.sinks.k2.hdfs.filePrefix=combined
collector.sinks.k2.hdfs.fileSuffix=.log
collector.sinks.k2.hdfs.fileType=DataStream
collector.sinks.k2.hdfs.writeFormat=Text
collector.sinks.k2.hdfs.batchSize=10000
collector.sinks.k2.hdfs.rollCount=10000
collector.sinks.k2.hdfs.rollSize=0
collector.sinks.k2.hdfs.rollInterval=30
collector.sinkgroups=g1
collector.sinkgroups.g1.sinks=k1 k2
collector.sinkgroups.g1.processor.type=load_balance
collector.sinkgroups.g1.processor.selector=round_robin
collector.sinkgroups.g1.processor.backoff=true

数据采集话题的讨论即将结束,我们还要谈一下二级数据源导入 Hadoop 的问题。如果你需要关联点击数据与 CRM、ODS 或其他类似系统的数据,那么就应该把这类数据从二级数据源导入到 Hadoop 中。具体的导入方式取决于二级数据源的数据特征。本章假定这类数据存储在传统关系型数据库中。如第 2 章讨论的那样,要将数据从关系型数据库导入到 Hadoop,Sqoop 是不二之选。与点击数据相比,CRM 和 ODS 数据集的数据量很小,也不会像点击数据那样暴增(甚至不会怎么变化)。这样的数据特点决定了它们是 Sqoop 批处理任务的理想选择,即按照一天一次或一天几次的频率,将数据从 CRM 和 ODS 数据库中导入到 Hadoop。如果数据量的确不大,简单删掉后在每个 Sqoop 任务中重新导入即可。当然,如果这些数据的数据量很大,就应该使用 Sqoop 进行增量数据导入了。

8.6 数据处理

图 8-7 展示了本架构设计的数据处理部分。

图 8-7:数据处理的设计

前面提到,点击数据通过 Flume 进入 HDFS。但是,来自网络服务器的原始数据是需要清洗的。举例来讲,不完整和无效的日志行就需要移除。此外,还可能存在一些重复的日志,需要数据去重。还有就是需要根据数据生成会话(如赋予每个点击唯一的会话 ID)。另外,有时还要对这份数据做进一步的预聚合或预分析,比如为了加速后续查询的性能,对点击进行天级别或小时级的聚合、上卷。后续查询通常会涉及市场投资回报率、贡献分析(attribution analysis)的计算(关联营销消耗数据与数据),基于数据源的一些额外属性分析网站活跃度(需将 CRM 或其他数据源与点击关联),等等。预处理的工作需要在这一部分完成。如果进行交互式商业智能(Business Intelligence,BI)分析时还需要做数据预处理,你就会陷入手忙脚乱的窘境,这绝对不是大家希望看到的。最后,我们希望处理后的数据格式在查询过程中有不错的性能表现。

总结起来,对于数据处理的流水线,我们需要达成四点目标。

(1) 清洗(sanitize)和检查原始数据。

(2) 从原始点击流事件中提取(extract)有用的数据。

(3) 转换(tranform)提取到的数据,以生成处理后的数据集。本例需要产出的是会话数据集。

(4) 将结果集以支持高性能查询的格式加载(load)或存储到 Hadoop 中。

你也许已经注意到了,除了第一点的数据清洗检查,后面三点正好对应一个 ETL 流水线的提取、转换、加载步骤,这也解释了为什么要使用 Hadoop 代替 ETL。

在第一步中,我们首先需要移除不完整和无效的日志记录。简单来讲,就是使用 MapReduce 或者 Hive、Pig 这样的工具,确保每条记录(如一行日志)中每个字段均已填充,同时还可以对某些字段或者全部字段做一个快速的字段内容有效性验证。在这里,检查特定的 IP 地址或反向链接可以忽略一些垃圾点击。类似的逻辑可以添加到这个步骤,将对应的点击忽略。

接下来对日志记录进行去重。由于选用了 Flume 的 File Channel 采集数据,所以当 Flume agent 偶发崩溃时,Flume 可以保证全部的日志记录进入 Hadoop。但是,Flume 不能保证所有的日志记录均有且仅有一条,而不发生重复。一个 Flume agent 的崩溃会导致部分重复的日志记录产生,所以有必要对它们进行去重。8.6.1 节会讲解相关内容。

Hive、Pig、Spark 以及 MapReduce 均可以用来去除重复数据。其中,与 MapReduce 相比,Hive 和 Pig 的接口更高一层,实现起来更容易,同时可读性也会更好。因此,这里推荐使用 Hive 或 Pig 进行数据去重,二者皆可。究竟选取哪一个,关键在于开发者的技能背景。你需要考虑哪一个与团队的已有技术选型相符,与其他项目的特定要求相符。无关实现的具体语言,我们推荐将去重后的数据首先写入一个临时表中,然后再将其移动到最终表,以避免对原始数据集造成影响。

谈到提取步骤,我们将时间戳(自 1970 年 1 月 1 日 0 时 0 分 0 秒以来的秒数)、引用 URL 地址、用户代理字符串(包含浏览器和操作系统版本)、IP 地址、语言及 URL 等列抽取出来。之所以选择这些列,是因为我们认为基于这些数据分析结果,足够回答本章一开始提出的问题。如果后续的分析会用到所有的列,你可以不进行过滤,直接将所有的列提取出来。

在转换步骤,我们对点击进行会话生成处理,然后生成一个新的会话数据集。关于会话生成的更多内容,参见 8.6.2 节。

对于来自 ODS 或 CRM 的二级数据,一般不需对其进行数据清洗,因为这些来自关系型数据库的数据一般是正确的。将此类数据直接导入 HDFS 可以查询的位置即可。目录结构如下。

  • 来自 ODS 的数据存储路径:/data/bikeshop/ods。

  • 来自 CRM 的数据存储路径:/data/bikeshop/crm。

另外,一般来讲,二级数据的数据量通常较小,无需对其进行分区。

我们已经概括性地描述了整个数据处理。接下来我们看两个步骤的具体细节:数据去重与会话生成。

8.6.1 数据去重

前面提到过,如果一个 Flume agent 发生崩溃,可能产生某些重复的日志记录。

采用 Hive、Pig、MapReduce 等工具可以进行数据去重。与 MapReduce 相比,Hive 和 Pig 的接口更高一层,实现起来更为简单易读,是我们的首选。使用 Hive 的话,你需要为原始数据创建一张外表(external table),并为去重后的数据创建另外一张表,便于后续使用。使用 Pig 则不需为去重后的数据建表。重申一下,选择取决于开发者的技能背景。你要考虑哪一个与团队已有的技术选型相符,与其他项目的特定要求相符。

1. 使用Hive进行数据去重

以下 Hive 示例代码完成了点击的去重,并将产生的数据插入到按天分区的表中。

INSERT INTO table $deduped_log
SELECT
  ip,
  ts,
  url,
  referrer,
  user_agent,
  YEAR(FROM_UNIXTIME(unix_ts)) year,
  MONTH(FROM_UNIXTIME(unix_ts)) month
FROM (
  SELECT
    ip,
    ts,
    url,
    referrer,
    user_agent,
    UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss') unix_ts
  FROM
    $raw_log
  WHERE
    year=${YEAR} AND
    month=${MONTH} AND
    day=${DAY}
  GROUP BY
    ip,
    ts,
    url,
    referrer,
    user_agent,
    UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss')
  ) t;

此处除了抽取月份和日期信息之外,另外一个操作就是将数据集按照所有的列进行分组。如果两行记录是重复的,那么它们的所有列均相同,通过 GROUP BY 操作后就会只剩下一行。

2. 使用Pig进行数据去重

使用 Pig 去重的方法直截了当,几乎不需解释。

rawlogs = LOAD '$raw_log_dir';
dedupedlogs = DISTINCT rawlogs;
STORE dedupedlogs INTO '$deduped_log_dir' USING PigStorage();

8.6.2 会话生成

点击流分析有一个重要环节是对点击进行分组,即将单个用户单次访问(即同一个会话 ID 下)产生的多个点击聚合在一起,这种处理便称为会话生成处理。

通过分析这些生成的会话,你可以了解一次访问中,用户在网站上发生了哪些行为,包括是否购物,以及访问如何引入(如自然检索、付费搜索、友链接入等)。对于一些市场分析师来说,在一些工具中,会话一词的意思等同于访问

你也可以通过网络服务器生成会话。在这种情况下,网络服务器会为每个点击分配一个会话 ID(代表该点击属于哪个会话)。但是,如果该会话 ID 不可靠,或者点击的生成依赖于定制化的逻辑,这里就需要自行实现(依据日志的)会话生成算法。

生成会话之前,首先需要搞清楚以下两件事情。

  • 给定一系列点击,判断哪些点击来自同一个用户。只通过点击日志中的信息通常很难确定点击和用户的从属关系。一些网站会在用户首次访问其站点(或联盟网站)时,将 cookie 写入用户的浏览器。这类 cookie 也会记录到网络服务器中,因此可以据此识别来自同一个用户的点击。值得注意的是,cookie 信息不是百分之百可靠的,因为用户可能会频繁清空 cookie,这种删除甚至会在访问中途发生。还有一个方案:如果没有 cookie 信息,那么可以使用 IP 地址来识别来自同一个用户的点击。同样,IP 地址有不可靠的时候。许多公司采用网络地址转换技术(Network Address Translator,NAT),可以实现多个工作站之间的 IP 地址共享,因此多个用户可能会处理成一个用户。这时,你可以考虑利用日志中的用户代理和语言等内容进一步区分用户。这里的例子仅使用 IP 地址来识别同一个用户的点击。

  • 给定特定用户的一系列点击,判断一个给定的点击是属于一个新访问的点击还是之前访问继续的点击。如果一个用户发生了三次连续点击,相邻两次之间的间隔均不超过五分钟,那么有理由认为这些点击是在一次浏览会话中发生的。然而,如果前两次点击发生在五分钟之内,而第三次点击发生在一个小时之后,那么合理的推测是第三次点击来自用户的另外一次访问,是此后的另外一个会话。大部分的市场分析工具都遵循这样的标准:如果同一用户的两次点击间隔超过 30 分钟,第二次点击应当从属于一个新的会话。另外,同许多市场分析工具一样,这里的实现也认为所有的会话会在当天结束。这样做会导致一些边界上的错误,但对问题的简化还是很有帮助的。举例来说,我们可能会重新生成某一天的会话,有了这种假设就不会影响前一天或后一天的会话了。同时,这样也会简化会话生成的处理流程。

在这一部分中,我们会举例说明如何使用不同工具实现会话的生成。Hadoop 生态圈中的 MapReduce、Hive、Pig、Spark、Impala 等众多工具均可以实现该处理,第 3 章介绍过它们。通常情况下,你可以选择其中的一种来生成会话数据集。Hadoop 中的绝大多数框架都可以满足需求,因此工具选择的主要考量仍在于开发者的技能背景,看哪一个与团队已有的技术选型相符,哪一个能满足其他项目及性能需求。

总之,任何会话生成算法都包含以下步骤。

(1) 遍历整个数据集,提取每个点击对应的相关列(本例中指 IP 地址,也可加上 cookie)。

(2) 将单个用户当天的所有事件收集起来,形成按照时间戳分类的用户维度的序列。

(3) 遍历每个用户序列,对序列中的每一个点击事件分配一个会话 ID。如果连续点击行为之间的间隔超过了 30 分钟,则会话 ID 自增。

接下来,我们来了解如何使用不同的工具实现这些步骤。

1. 使用Spark生成会话

Spark 是 Hadoop 上的一个新执行引擎,其总体性能比 MapReduce 要好。我们使用 HDFS 上的点击数据创建一个 RDD。然后通过 map() 函数提取每个点击的重要字段,以键值对的方式返回,以 IP 地址为键,以抽取生成的日志字段对象作为值(第一步)。以下为该函数的示例代码。

JavaRDD<String> dataSet =
  (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(testLines);

JavaPairRDD<String, SerializableLogLine> parsed =
  dataSet.map(new PairFunction<String, String, SerializableLogLine>() {
    @Override
    public Tuple2<String, SerializableLogLine> call(String s)
      throws Exception {
        return new Tuple2<String, SerializableLogLine>(getIP(s), getFields(s));
        }
});

接下来,我们将从属于同一个 IP 地址的点击分为一组,并按照时间戳进行排序(第二步),然后使用 sessionize() 方法遍历点击的有序列表,并给每一个点击分配会话 ID(第三步)。以下为上述步骤的代码。

//Sessionize生成函数,输入一个IP的事件序列,
//按照时间戳进行排序,并将30分钟以内的所有事件标记为同一个会话
public static List<SerializableLogLine> sessionize
        (List<SerializableLogLine> lines) {
    List<SerializableLogLine> sessionizedLines = Lists.newArrayList(lines);
    Collections.sort(sessionizedLines);
    int sessionId = 0;
    sessionizedLines.get(0).setSessionid(sessionId);
    for (int i = 1; i < sessionizedLines.size(); i++) {
        SerializableLogLine thisLine = sessionizedLines.get(i);
        SerializableLogLine prevLine = sessionizedLines.get(i - 1);

        if (thisLine.getTimestamp() - prevLine.getTimestamp() > 30 * 60 * 1000) {
            sessionId++;
        }
        thisLine.setSessionid(sessionId);
    }

    return sessionizedLines;
    }

// 此处按照IP地址对点击进行分组
JavaPairRDD<String, List<SerializableLogLine>> grouped = parsed.groupByKey();

JavaPairRDD<String, List<SerializableLogLine>> sessionized =
        grouped.mapValues(new Function<List<SerializableLogLine>,
                List<SerializableLogLine>>() {
@Override
public Iterable<SerializableLogLine> call
        (List<SerializableLogLine> logLines) throws
        Exception {
   return sessionize(logLines);
    }
});

你可以访问本书的 GitHub 仓库,获取使用 Spark 实现会话生成的完整代码。

2. 使用MapReduce生成会话

使用 MapReduce 生成会话,可以从代码层次获得更深一层的控制。使用 map() 函数可以获取点击日志中的相关字段(第一步,参见前面提到的三个步骤)。使用 shuffle() 函数可以按照用户对事件进行集合和分组,并使用一个定制化的比较器(comparator)实现点击列表按照时间戳排序的需求,然后将数据发送给 Reducer 端(第二步)。reduce() 函数会遍历每个用户有序点击的列表,并分配会话 ID(第三步)。

你可以访问本书的 GitHub 仓库,获取使用 MapReduce 实现会话生成的完整代码。

3. 使用Pig生成会话

Pig 中的常用库 DataFu 包含一个 Sessionize 函数。该函数以特定用户特定日期的记录列表作为输入,每条记录的第一个字段是时间戳,点击列表以时间戳升序排列。Sessionize() 函数的输出是一个点击记录列表,每条记录都包含会话 ID。

4. 使用Hive生成会话

虽然仅使用 SQL,利用 SQL 的窗口分析函数的确可以实现会话的生成,但这样难以维护和调试写出的查询。我们不推荐使用 Hive 或 Impala 处理该任务。

8.7 数据分析

在采集和处理完数据之后,就可以进行数据分析,回答本章开头提到的问题了。业务分析人员可以使用若干工具来浏览和分析数据。第 3 章对此有详细介绍,简而言之,这些工具可以分成如下三类。

  • 可视化及 BI 工具,如 Tableau 和 MicroStrategy。

  • 统计分析工具,如 R 或 Python。

  • 面向机器学习的高级分析工具,如 Mahout 或 Spark MLlib。

  • SQL 接口工具,如 Impala。

本章主要关注如何通过 Impala 对处理后的数据集进行 SQL 查询访问和分析。

举例来说,想知道购买者在该网站上平均花了多长时间,可以执行以下查询。

SELECT
  AVG(session_length)/60 avg_min
FROM (
  SELECT
    MAX(ts) - MIN(ts) session_length_in_sec
  FROM
    apache_log_parquet
  GROUP BY
    session_id
  ) t

使用如下查询,还可以计算网站的跳出率(用户打开本网站后尚未跳转至其他页面便已结束访问的百分比)。

SELECT
  (SUM(CASE WHEN count!=1 THEN 0 ELSE 1 END))*100/(COUNT(*)) bounce_rate
FROM (
  SELECT
    session_id,
    COUNT(*) count
  FROM
    apache_log_parquet
  GROUP BY
    session_id)t;

我们还可以把 BI 工具(如 MicroStrategy 或 Tableau)连到 Impala 上,通过 Impala 提供的 ODBC 或 JDBC 驱动,针对点击数据进行更进一步的 BI 查询。

8.8 协调调度

到这里,我们已经将数据采集到了 Hadoop,并针对点击流数据进行了各种各样的处理,最后以终端用户的身份分析处理后的数据。最终的数据分析是以即席查询的方式进行的,但是前面的部分(数据的采集和各种处理行为)应当能够通过多步骤协调、调度的方式自动完成。在这一部分,我们将展示如何协调调度基于 Hadoop 进行点击流分析的各种步骤。

在数据采集方面,我们使用 Flume 获取源源不断的数据,并将其导入系统。在数据处理方面,我们则决定每天运行一次会话生成算法,因为在一天结束的时候会话一般也结束了。考虑到数据延迟与算法复杂性之间的权衡,可以每天运行一次会话生成程序。如果想更频繁地生成会话,则需要维护一个正在运行的会话的列表(因为会话可以持续任意时长),会话生成算法会变得过于复杂。这对于近实时系统来讲是非常慢的,但并不是所有的系统都需要做到实时。第 9 章详细介绍了如何搭建一个近实时分析系统,而且第 9 章中的许多技术也都适用于点击流分析的应用场景。

我们使用第 6 章中提到的 Oozie 协调与调度本章的会话生成处理。

在本章示例中,Oozie 工作流会先做一些预处理工作,然后再生成会话。以下是 Oozie 工作流的示例,包括使用 Pig 处理数据去重,以及使用 MapReduce 生成会话。

<workflow-app xmlns="uri:oozie:workflow:0.4" name="process-clickstream-data-wf">
  <global>
    <job-tracker>${jobTracker}</job-tracker>
    <name-node>${nameNode}</name-node>
  </global>
  <start to="dedup"/>
  <action name="dedup">
    <pig>
      <prepare>
        <delete path="${dedupPath}"/>
      </prepare>
      <script>dedup.pig</script>
      <argument>-param</argument>
      <argument>raw_log_dir='${wfInput}'</argument>
      <argument>-param</argument>
      <argument>deduped_log_dir='${dedupPath}'</argument>
    </pig>
    <ok to="sessionize"/>
    <error to="fail"/>
  </action>

  <action name="sessionize">
  <java>
    <prepare>
      <delete path="${sessionPath}"/>
    </prepare>
    <main-class>com.hadooparchitecturebook.MRSessionize</main-class>
    <arg>${dedupPath}</arg>
    <arg>${sessionPath}</arg>
  </java>
  <ok to="end"/>
  <error to="fail"/>
  </action>

  <kill name="fail">
    <message>Workflow failed:[${wf:errorMessage(wf:lastErrorNode())}]</message>
  </kill>
  <end name="end"/>

</workflow-app>

有一点需要注意,我们通过传递 yearmonthday 参数形成 dedupPath 变量的值,这几个参数确定了要生成的点击数据的确切日期。它们的值通过负责调度该工作流的协调器任务产生。

在我们的设计中,协调器每天都会触发工作流,触发后该工作流会处理前一天的数据。我们需要确保前一天的数据在进行会话生成时已经收集完毕。这样保证了数据处理前的同步性,是非常常见的流程协调模式。如果在处理当天数据时,这些数据很还没有写完,那么就会出现不一致的情况。前面提到的模式可以避免这一点。

有两种方式可以保证上文提到的同步。

  • 在开始处理前一天的工作流之前,验证 Flume 已经开始写当天的数据。

  • 等数据集写完,生成一个文件标识,让 Oozie 等到该标识生成完毕再继续执行处理。文件标识通常名为 _SUCCESS,表明整个数据集已经成功落地,可以开始后续处理。协调器支持这样的调度规则,即等到指定目录下文件标识生成才执行工作流(使用 <done-flag> 选项)。

在本例中,Flume 不太容易生成一个文件标识。相对而言,检查当天的数据是否已经开始写入是容易实现的,只需检查当天的分区是否存在即可。因此,上文提到的第一种方式就可以保证同步了,协调器这边对应的代码示例如下(下文代码有部分截断,全部代码参见本书 GitHub 仓库)。

<coordinator-app name="prepare-clickstream" frequency="${coord:days(1)}"
                 start="${jobStart}" end="${jobEnd}"
                 timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.1">

  <datasets>
    <dataset name="rawlogs" frequency="${coord:days(1)}"
             initial-instance="${initialDataset}" timezone="America/Los_Angeles">
      <uri-template>/etl/BI/casualcyclist/clicks/rawlogs/year=${YEAR}/...
      <done-flag></done-flag>
    </dataset>
  </datasets>

  <input-events>
    <data-in name="input" dataset="rawlogs">
      <instance>${coord:current(0)}</instance>
    </data-in>
    <data-in name="readyIndicator" dataset="rawlogs">
      <!-- Flume完成某一目录的写操作之后并不会设置写完标识,
           无法得知该目录何时可以作为输入,因此直到开始写第二天的数据,
           才开始通过协调器来调度工作流 -->
      <instance>${coord:current(1)}</instance>
    </data-in>
  </input-events>

  <action>
    <workflow>
      <app-path>${workflowRoot}/processing.xml</app-path>
      <configuration>
        <property>
          <name>wfInput</name>
          <value>${coord:dataIn('input')}</value>
        </property>
        <property>
          <name>wfYear</name>
          <value>${coord:formatTime(coord:dateOffset(
          coord:nominalTime(), tzOffset, 'HOUR'), 'yyyy')}</value>
        </property>
        <property>
          <name>wfMonth</name>
          <value>${coord:formatTime(coord:dateOffset(
          coord:nominalTime(), tzOffset, 'HOUR'), 'MM')}</value>
        </property>
        <property>
          <name>wfDay</name>
          <value>${coord:formatTime(coord:dateOffset(
          coord:nominalTime(), tzOffset, 'HOUR'), 'dd')}</value>
        </property>
      </configuration>
    </workflow>
  </action>
</coordinator-app>

如你所见,readyIndicator 告诉 Oozie,开始写第二天的数据时才开始执行工作流,这意味着前一天的数据已经采集完毕。

8.9 小结

在本章讲解了一个常见的 Hadoop 使用情境:以批处理方式分析机器产生的数据。存储和处理如此海量、高吞吐、多样性的数据,使用 Hadoop 再合适不过了。

我们的架构设计使用 Flume 采集点击流数据,使用 Sqoop 将来自 CRM 或 ODS 的二级数据源导入数据集。接下来,我们讨论了基于点击数据的常见处理,涉及数据去重、数据过滤以及最重要的会话生成。另外,这里还描述了生态系统中各种不同的执行引擎是如何满足这些处理需求的。后面,我们展示了基于 Hadoop 上的数据集进行的分析类查询,如查找一个网站的跳出率。最后,我们演示了如何通过协调调度工具(如 Oozie)组织整个工作流。

虽然本章提到的架构是面向批处理的,但面向近实时处理的点击流分析也可以通过将数据存储到 NoSQL 系统(如 HBase)中实现。本章未讨论该方案,不过本书的 GitHub 仓库中有架构设计和代码可供参考(http://bit.ly/haa-session)。

或许本章提到的案例跟你遇到的实际情况并不完全相符,不过基于 Hadoop 进行机器数据处理大抵皆是如此。我们希望本章能够在设计 Hadoop 应用方面穿针引线,给你提供一些帮助。

目录