第 1 章 Spark与大数据

第 1 章 Spark与大数据

数据只是工具,最终还是要用于创造价值,大数据只是一种新的实践。

1.1 大数据的发展及现状

大数据是一个很热门的话题,但它是从什么时候开始兴起的呢?

“大数据”(big data)这个词最早在UNIX用户协会的会议上被使用,来自SGI公司的科学家在其文章“大数据与下一代基础架构”(“Big Data and the Next Wave of InfraStress”,参见http://static.usenix.org/event/usenix99/invited_talks/mashey.pdf)中用它描述数据的快速增长。那么,怎样才叫“大数据”呢?META集团(已被Gartner收购)的一份报告提出大数据的特征“3V”,即大量(volume)、多样(variety)、快速(velocity)。如今,大家更普遍地使用“4V”来描述大数据的特征,即增加了第四个V——价值(value)。

从数据量上来看,人们通常认为当一个计算单元容纳不下要处理的数据,就是在面对大数据了。其实早在电脑被发明之前,我们做的人口统计、气象预报等工作就属于大数据范畴了。下面,让我们看看大数据时代所面临的问题。

1.1.1 大数据时代所面临的问题

我们拿搜索引擎举例。搜索引擎需要把网上的多数网页抓取下来进行分析,并建立索引,这样在我们搜索一个词的时候,它才能在毫秒级别返回结果。我们假设每个网页平均的大小为20 KB,大概有价值的中文网页有200亿个页面,不过考虑到压缩情况,大概总共有400 TB的大小。那么,一台计算机以30 MB/s~35 MB/s的速度从硬盘读写,大概需要4个多月,这还不包括在这些数据上做复杂的分析工作。因此,我们通常需要一个计算机集群来完成相关的工作。

有人可能会说,我们的计算机处理能力在以指数级增长,这样机器能够处理更多的数据。是的,这也是事实,但是这些计算机生产数据的能力也在以指数级增长。而且,每人拥有的设备数在增加,如今除了个人电脑,越来越多的人拥有智能手机、平板电脑、智能家居、穿戴设备等。有数据表明,90%的数据是过去两年左右时间内产生的。

我们面对的最直接的问题是如何存储和计算大数据。接下来仍以搜索引擎作例子。假设每台机器存储400 GB的数据,那么我们需要1000台机器来存储这些数据。下面来分析下1000台服务器的故障率。

X表示一台服务器是否正常工作,X = 0表示正常工作,X = 1表示不正常工作。服务器一个月内发生故障的概率是εP(X = 0) = 1 - ε,那么1000台电脑都正常工作的概率是(1 - ε)1000。假设故障率ε = 0.001,所有1000台服务器1个月内正常工作的概率只有0.37,不到一半。

所以,随着数据的增加、机器的增加,处理数据的技术也变得越来越复杂了。为了解决这些问题,各种大数据解决方案诞生了。如今,程序员在100台机器上编程和在1000台上面编程没有太多区别,不需要考虑容错性、并发等问题。

随着云计算技术的发展,不仅大数据处理的复杂度降低了,成本也大大降低,云计算服务商的IaaS(Infrastructure as a Service,基础设施即服务)支持按天付费,而且可以动态按需扩容。在云计算服务商的帮助下,如今一个小创业公司都能够快速开发和部署大数据应用。云计算服务商的国外领导者是Amazon AWS,国内主要是阿里云、腾讯云和UCloud。

另外一个困扰我们的问题是:数据这么多,我们要存储哪些数据,以及存储多长时间的数据。通常我们认为,从一个字节可以获取多少价值,存储它又要花费多少费用,如果两者的比值大于1,就值得存储更多数据。大数据还有一个特点,就是存储单条数据的价值可能不大,拥有更多数据时,就有价值了。举一个简单的例子。如果我们只有一个用户的访问日志,显然没有太大价值,但是如果拥有全站的用户访问日志,我们就有可能对数据进行分析,从而发现潜在规律和趋势等。

解决了大数据存储和计算的问题之后,如何对数据进行分析呢?这是一个复杂的技术。Spark的很多库就是为了解决不同场景下的分析任务而存在的,比如MLlib库就是为解决各种机器学习问题开发的库,这些问题包括分类问题、回归问题、聚类等。GraphX就是为了分析社交网络等应用开发的库。

大数据时代面临的另外一个问题就是大数据交易,数据本身已成为一种可以交易的商品。2015年9月5日国务院印发了《促进大数据发展行动纲要》,纲要提出要建设公共数据资源开放的统一开放平台,“2020年底前,逐步实现信用、交通、医疗、卫生、就业、社保、地理、文化、教育、科技、资源、农业、环境、安监、金融、质量、统计、气象、海洋、企业登记监管等民生保障服务相关领域的政府数据集向社会开放”,以及“到2020年,培育10家国际领先的大数据核心龙头企业,500家大数据应用、服务和产品制造企业”。

1.1.2 谷歌的大数据解决方案

谷歌的搜索引擎是搜索引擎界的领导者,很重要的原因之一就是谷歌在大数据技术方面领先。2003年开始,谷歌先后发表了GFS、MapReduce、BigTable等几篇论文,其中2004年,谷歌的Jeff等发表了论文“MapReduce: Simplified Data Processing on Large Clusters”,提出了大数据分析的范式MapReduce。虽然MapReduce范式在函数式编程中为人所熟知,但是该论文提供了在集群环境中该范式的可扩展性实现,使得程序员可以利用机器集群处理更多数据,而不需要考虑容灾和并发等问题。谷歌的三大论文引发了人们在大数据领域的大量研究,直接导致了Hadoop的出现——MapReduce范式的开源实现。如今Hadoop,包括MapReduce与分布式文件系统(HDFS),已经成为数据处理的事实标准。大量的工业界应用,例如腾讯、百度、阿里巴巴、华为、迪斯尼、沃尔玛、AT&T都已经有自己的Hadoop集群。

MapReduce能做的事情很多,包括实现各种机器学习算法(参见http://machinelearning.wustl.edu/mlpapers/paper_files/NIPS2006_725.pdf)。

但MapReduce不是唯一的大数据分析范式,有一些场景是不适合使用MapReduce的,比如处理网状的数据结构时,这要求能够处理顶点和边的增加和减少操作,并在所有节点进行运算等。

典型的场景就是在搜索引擎中链接地图计算和社交网络分析。谷歌也建设了基于图的计算系统Pregel,允许连通的节点之间互相交换信息。Pregel基本运算是节点之间的超级步骤(superstep),每个顶点都有一个用户自定义的计算函数和值,所有的边都可以并行计算,顶点可以通过边来发送消息并与其他顶点交互数据,可以聚合所有节点的信息,计算最大最小值等。Spark上也能够很方便地支持图计算应用。事实上,最初AMPLabs团队仅用数百行代码就开发出了整个Pregel,如今图计算库GraphX已经是Spark主要的库之一。

当然,并不是说谷歌出现前就不存在大数据。我们使用的很多方法和技术都是过去已有的,比如分布式系统广泛应用的Paxos算法,是莱斯利·兰伯特(Leslie Lamport)于1990年提出的一种基于消息传递的一致性算法。

1.1.3 Hadoop生态系统

Hadoop是谷歌大数据解决方案的开源实现,使用Java语言开发,其核心主要是两部分:分布式文件系统(HDFS)和MapReduce。为了处理更多的应用场景,在此基础上,经过业界巨头雅虎等,以及开源界其他力量的努力,人们建设了很多其他的重要系统,这里我们简单介绍下。

Hive是在HDFS和MapReduce上提供一个类似于SQL风格的抽象层,非常容易上手。

用户可以用数据库、表的概念来管理数据,使用SQL来访问、计算,不需要写MapReduce程序。SQL语法非常类似于关系型数据库,支持常见的SelectJoinGroup byInsert等操作。

HBase是基于Hadoop的非关系型数据库,具备分布式、可扩展的特点,支持在几十亿行、数百万列的一张大表上进行实时、随机地读写访问。典型场景有各种数据仓库,比如淘宝用户历史订单查询等。

ZooKeeper是提供分布式应用程序协调服务的系统,是谷歌的Chubby一个开源的实现,是Hadoop和HBase的重要组件。比如,Spark为了保证高可用,同时运行多台Master节点,但只有一台是活跃的,其他都处于热备状态,通过ZooKeeper可以协调选择出当前活跃的节点,当这个活跃节点异常时,再从剩下的热备节点中重新选择一台活跃节点。

Hadoop是一个批处理系统,不擅长实时计算,如果需要实时或准实时的分析,可以使用Storm(Twitter)、S4(雅虎)、Akka等系统。另外,Hadoop也不擅长复杂数据结构计算,比如前面提到的图计算,可以利用的开源系统有GraphLab和Spark的GraghX库。

从Hadoop 2.0开始,Hadoop YARN将资源调度从MapReduce范式中分离出来。YARN已经成为通用的资源管理系统,可为上层应用提供统一的资源管理和调度,Spark支持部署在YARN管理的集群上。

在工业界大规模应用Hadoop生态的系统,还要面临部署、排错、升级等问题。为解决这些问题,降低Hadoop应用门槛,我们可以使用Hadoop商用解决方案的提供商的产品,目前比较成熟的提供商有Cloudera、Hortonworks和MapR。

Cloudera由Doug Cutting和Jeff Hammerbacher共同创建,为合作伙伴提供Hadoop的商用解决方案,主要包括支持、咨询服务、培训。Cloudera产品主要为CDH、Cloudera Manager、Cloudera Support。CDH是Cloudera的Hadoop发行版,完全开源,比Apache Hadoop在兼容性、安全性、稳定性上有增强。Cloudera Manager是集群的软件分发及管理监控平台,可以在几个小时内部署好一个Hadoop集群,并对集群的节点及服务进行实时监控。Hortonworks是雅虎与硅谷风投公司Benchmark Capital合资组建的公司,公司成立之初吸纳了大约30名专门研究Hadoop的雅虎工程师,这些工程师均在2005年开始协助雅虎开发Hadoop,贡献了Hadoop 80%的代码。Hortonworks的主打产品是Hortonworks Data Platform(HDP)。

目前CDH和HDP在国内普及率不高,国内巨头使用的都是在Apache Hadoop开源版本上自己修改的定制版本,每家公司的集群规模也都有几千台。国内的公司也开始给开源社区同步部分修改,根据不完全统计,华为有4个Hadoop committer,小米有3个HBase committer,腾讯有一个Storm的committer,阿里巴巴有一个Spark的committer等。相信往开源社区贡献代码是未来的一种趋势。

1.2 Spark应时而生

Hadoop MapReduce可以解决很多通用计算的问题,但在某些场景下效率不高,Spark就是在这个场景下诞生的。

1.2.1 Spark的起源

机器学习算法通常需要对同一个数据集合进行多次迭代计算,而MapReduce中每次迭代都会涉及HDFS的读写,以及缺乏一个常驻的MapReduce作业,因此每次迭代需要初始化新的MapReduce任务。这时,MapReduce就显得效率不高了。同时,基于MapReduce之上的Hive、Pig等技术也存在类似问题。

Spark作为一个研究项目,诞生于加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)。AMP实验室的研究人员发现在机器学习迭代算法场景下,Hadoop MapReduce表现得效率低下。为了迭代算法和交互式查询两种典型的场景,Matei Zaharia和合作伙伴开发了Spark系统的最初版本。2009年Spark论文发布,Spark项目正式诞生,在某些任务表现上,Spark相对于Hadoop MapReduce有10~20倍的性能提升。2010年3月Spark开源,且在开源社区下发展迅速。2014年5月,Spark 1.0正式发布,如今已经是Apache基金会的顶级项目了。

1.2.2 Spark的特点

Spark刚出现时,常常被大家概括为内存计算,这不是没有缘由的。在典型应用中,Spark读取HDFS中的文件,加载到内存,在内存中使用弹性分布式数据集(Resillient Distributed Dataset,RDD)来组织数据。RDD可以重用,支持重复的访问,在机器学习的各个迭代中它都会驻留内存,这样能显著地提升性能。即使是必须使用磁盘进行复杂计算的场景,Spark也常常比Hadoop MapReduce更高效。

Spark频频在效率上表现出色,其官方博客显示,Spark在最近的排序大赛(https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html)中创造了新纪录,参见表1-1。

表1-1 Spark的新纪录

 

Hadoop MR

Spark

Spark

数据规模

102.5 TB

100 TB

1000 TB

耗时

72 min

23 min

234 min

节点数

2100

206

190

核数

50 400(物理的)

6592(虚拟的)

6080(虚拟的)

磁盘吞吐量

3150 GB/s(估计)

618 GB/s

570 GB/s

环境

专用数据中心,10 Gbit/s

虚拟(EC2)10 Gbit/s网络

虚拟(EC2)10 Gbit/s网络

排序效率

1.42 TB/min

4.27 TB/min

4.27 TB/min

节点排序效率

0.67 GB/min

20.7 GB/min

22.5 GB/min

最新的结果可以关注(http://sortbenchmark.org/),最近提交的截止时间是2015-9-1。

Spark一直寻求保持Spark引擎小而紧凑。Spark 0.3版本只有3900行代码,其中1300行为Scala解释器,600行为示例代码,300行为测试代码。即使在今天,Spark核心代码也只有约50 000行,因此更容易为许多开发人员所理解和供我们改变和提高。

Spark是一个通用计算框架,包含了特定场景下的计算库:Streaming、SQL、MLlib、GraphX等。除了支持常见的MapReduce范式,它还能够支持图计算、流式计算等复杂计算场景,在很大程度上弥补了Hadoop的不足。

此外,Spark的接口丰富,提供了Python、Java、Scala等接口,文档清晰,为初学者提供了便利的上手条件。

在容错方面,Spark也有自己的特色。容错机制又称“血统”(Lineage)容错,即记录创建RDD的一系列变换序列,每个RDD都包含了它是如何由其他RDD变换过来的,而且包括如何重建某一块数据的信息。由于Spark只允许进行粗粒度的RDD转换,所以其容错机制相对高效。Spark也支持检查点(checkpoint)的容错机制。容错的机制和原理将在后面详细阐述。

Spark自带调度器,同时能够运行在Hadoop YARN集群、Apache Mesos上,可以很方便地和现有的集群进行融合。

Spark的输入支持本地存储、Hadoop的HDFS,以及其他支持Hadoop接口的系统:S3、Hive、HBase等。Spark还有一个优点,就是当RDD的大小超出集群的所有内存时,可以优雅地进行降级支持,存储在磁盘。

从成本的角度出发,由于Spark能适应多种应用场景,这样一个公司或者组织可能就不需要部署多套大数据处理系统,可大大减低学习、维护、部署、支持等成本。

1.2.3 Spark的未来发展

Spark在过去的5年里发展迅速,社区活跃程度一点儿不亚于Hadoop社区。我们从Matei Zaharia在Spark五周年的总结中可以看到,Spark的contributor呈指数级增长,参见图1-1。

{%}

图1-1 Spark的贡献者在过去5年内的增长情况

Spark峰会(http://spark-summit.org)是Spark社区分享Spark各种应用的重要交流会。在Spark Summit 2015上,来自Databricks、UC Berkeley AMPLab、百度、阿里巴巴、雅虎、英特尔、亚马逊、Red Hat、微软等的数十个机构共分享了近100个精彩纷呈的报告。

目前,Databricks、英特尔、雅虎、加州大学伯克利分校是Spark主要的贡献者。

Spark 1.4中还发布了SparkR,引入了更友好的R语法支持,进一步扩展了Spark数据分析的应用场景。

工业界应用上,从Spark官方提供的用户列表可以看到,国内的BAT都在用Spark,完整的列表可以参考https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark。Spark Summit 2015中,Databricks的CTO Matei Zaharia在Keynotes里指出Spark最大的集群来自腾讯,一共有8000个节点,单个Job最大分别是阿里巴巴和Databricks的1 PB。

如今,Cloudera、Hortonworks和MapR在内的大数据发行版也添加了Spark,Spark已经得到广泛认可,成为一种优秀的大数据处理技术平台。

传统道家思想把事物分为道、法、术、器4个层次。如果说Spark是大数据的一件利器,那么本书最多就算是指导使用Spark的“术”,然后解决大数据的问题,需要的更多是“道”“法”的探索。

目录

  • 序一
  • 序二
  • 前言
  • 第 1 章 Spark与大数据
  • 第 2 章 Spark基础
  • 第 3 章 Spark工作机制
  • 第 4 章 Spark内核讲解
  • 第 5 章 Spark SQL与数据仓库
  • 第 6 章 Spark流式计算
  • 第 7 章 Spark图计算
  • 第 8 章 Spark MLlib
  • 附录 Scala语言参考