第 3 章 Pig进阶

第 3 章 Pig进阶

在Hadoop上运行Java MapReduce作业,以最少的抽象概念提供了最大的灵活性。 然而,抽象对于推断模式、完成日常的数据操作任务、降低复杂性和扁平化学习曲线来说仍是必要的。而Pig就提供了这样一个框架以及高级抽象,可以在Hadoop上创建MapReduce程序。它包含一种称为Pig Latin的脚本语言。就操作符的功能而言,Pig Latin可以跟SQL相媲美。

Pig由雅虎于2006年左右开发,当时作为一个框架用于指定特别的MapReduce工作流。次年,被迁移到Apache软件基金会。最新的发布版是0.12.1。

目前,Pig的正式发布和Hadoop 2.2.0不兼容。它会从Hadoop 1.2.1中寻找库文件。运行任何Pig脚本都会失败,并抛出下面的异常:

Unexpected System Error Occured:
java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected.

修复这个问题需要重新编译Pig的二进制文件。运行下面的命令,然后改用新生成的pig.jar和pig-withouthadoop.jar文件:

ant clean jar-all -Dhadoopversion=23

本章,我们将通过以下内容来看看Pig的高级特性。

  • 与SQL相比,Pig有何不同。

  • 分析Pig Latin脚本如何被转成MapReduce程序。

  • 深入研究Pig支持的高级关系操作符;我们将深入了解这些关系操作符,并通过实例来看一下它们的应用程序。

  • 学习如何扩展Pig现有的功能:使用用户定义函数(UDF)可以实现多种接口,我们将仔细研究其中的一些接口。

3.1 Pig对比SQL

SQL是一种非常流行的查询和数据处理语言。任何用于数据处理的高级语言都应该与SQL对比一下。在本节中,我们将比较Pig Latin和SQL。对比的结果如下所示。

  • Pig Latin基本上可以算是一种过程化语言。相反,SQL实际上是一种声明式语言。在SQL中,数据转换发生时不需要数据管道,然而,在Pig Latin中,数据管道中的每一步数据转换都是有序的。在SQL中可以通过使用中间临时表来模拟这种行为,但是创建、管理和清理这些中间表却是一件又麻烦又容易犯错的事。尽管Pig Latin脚本是过程化的,但语句却是懒惰执行,也就是说,直到真正需要某个值的时候,相应的语句才会被执行。

  • 开发者使用像SQL这样的声明式语句书写数据流时,会过度地依赖于查询优化器来为数据转换步骤选择正确的实现。SQL引擎确实能够提供帮助,但不具有选择的灵活性,或者说是添加插件的能力。而Pig Latin天生带有这种灵活性。

  • SQL是一种理想的线性数据流——转换产生一个单一的结果集。然而,数据流通常是有向无环图(DAG),常见的操作是将数据分割成流,在每个流上应用不同的转换函数,然后再将这些流连接起来。要在SQL上实现DAG则需要重复操作或是物化中间结果。而Pig由于物化了中间结果,所以能够通过减少磁盘的读写次数来有效地处理数据流的DAG。

  • SQL不是一个抽取-转换-加载(ETL)工具,它只作用于那些已经在数据库中存在的数据。Pig为UDF提供了便捷,用户可以在数据流中自定义Java代码。它考虑到流的情况,也就是说,在数据流中可以随意插入可执行的代码。而流有助于重用数据流管道中现有的工具和代码。Pig的这些特性使它成为一个多用途的平台,不需要为了ETL和数据处理分别使用不同的工具。

  • Pig过程化的特性允许它在数据管道处理过程中在任意点存储数据。这有助于手动引入检查点,并且当发生故障时,也不必从头开始执行整个查询。对于大数据的处理,这一点显得格外重要,因为数据加载和处理的时间特别长。SQL没有赋予开发者对这项功能的控制权,它很有可能需要重新执行查询中最花时间的部分。

3.2 不同的执行模式

Pig有以下三种执行模式。

  • 交互式模式:在这种模式中,用户可以利用一个叫作grunt的外壳程序(shell)。用户可以在一个连接Pig和Hadoop集群的交互式会话中,输入Pig命令。

  • 批处理模式:在这种模式中,用户可以在一个脚本文件中书写一系列的Pig语句,然后提交、执行这个文件。

  • 嵌入式模式:在这种模式中,通过导入Pig的库文件,可以在任何Java程序中调用Pig命令。

除了这几种执行模式以外,Pig还可以在本地的执行环境中以本地模式执行,或者在Hadoop集群的执行环境中以mapreduce模式执行。在本地模式下,所有的命令利用本地文件系统在单一的环境中执行。如果不带-x参数,Pig默认以mapreduce模式运行。如果指定-x参数,用户可以选择运行本地模式还是mapreduce模式,并使用相关的执行环境。环境变量HADOOP_CONF_DIR就是用来指定Pig在哪个Hadoop集群上运行MapReduce作业。

下面的代码演示了如何以本地或mapreduce模式运行Pig脚本:

pig -x local …
pig -x mapreduce … OR pig …

3.3 Pig的复合数据类型

Pig的原生数据类型有intlongfloatdoublechararraybytearray等。此外,Pig也支持复合数据类型。使用这些复合数据类型,可以指定Pig关系操作符的输入及输出。在某些场合下,操作符的行为取决于它所使用的复合数据类型。这些复合数据类型如下。

  • Map:不要将这种数据类型与MapReduce中的map函数相混淆。Map数据类型是一种关联数据类型,它保存chararray类型的键(key)及相应的值(value)。map中,值的数据类型并没有限制,它也可以是一个复合数据类型。如果无法指定值的类型,Pig就会将其默认为bytearray数据类型。键和值的关联通过符号#完成。map中的键值必须是唯一的:

    [key#value, key1#value1…]
    
  • Tuple:Tuple数据类型是一组数据值的集合。它们长度固定且有序。它们就好比是SQL表中的一条记录,只是对列的类型并没有什么限制。每个数据值被称为字段(field)。由于值是按序排列,因此可以在tuple中随机访问某个值:

    (value1, value2, value3…)
    
  • Bag:Bag数据类型是tuple和其他bag的容器。它们是无序的,也就是说无法随机访问一个bag中的某个tuple或bag。对于包含在bag中的tuple,它的结构是没有限制的。另外,一个bag中允许出现重复的tuple或是bag:

    {(tuple1), (tuple2)…}
    
    

 Pig中的Bag数据类型可以溢出保存到磁盘中,这样它就可以拥有数量庞大的tuple,而无需顾虑内存的限制。但Map和Tuple数据类型不属于这种情况。

3.4 编译Pig脚本

Pig的架构被设计成分层结构,这有利于插件式的执行引擎。Hadoop的MapReduce作为一个执行平台,就像插件一样被加载到Pig中。编译和执行Pig脚本有三个主要阶段:准备逻辑计划,将它转成物理计划,最后,将物理计划编译成MapReduce计划,这样就可以在相应的执行环境中运行。

3.4.1 逻辑计划

Pig首先会解析语句的语法错误,同时验证输入文件和输入的数据结构。如果有模式(schema)存在,那么本阶段还会进行类型检查。然后生成一个DAG的逻辑计划,DAG中的节点是操作符,边是数据流。逻辑计划是不能执行的,它并不知道执行层。本阶段还会基于内置的规则进行优化。其中一些规则我们会在稍后讨论。逻辑计划与有效的操作符一一对应。在下面的脚本中,两个文本文件作为数据输入,被加载并保存到Pig的变量中,这也被称为关系(relation)。两组输入的数据经历诸如空值过滤(filter)、连接(join)等转换,最终根据连接键(join key)被分组和聚合。

cc = load 'countrycodes.txt' using PigStorage(',') as
    (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
    (ccode:chararray, cityName:chararray, cityFullName:chararray,
         region:int, population:long, lat:double, long:double);
filteredCcity = filter ccity by population is not null;
joinCountry = join cc by ccode, ccity by ccode;
generateRecords = foreach joinCountry generate cc::cname,
    ccity::cityName, ccity::population;
groupByCountry = group generateRecords by cname;
populationByCountry = foreach groupByCountry generate group,
    SUM(generateRecords.population);

下图表明了上述脚本的逻辑计划:

{%}

Pig脚本的逻辑计划和物理计划

3.4.2 物理计划

物理计划使Pig的编译过程开始感知到执行平台。在本阶段中,每一个操作符被转换成执行的物理形态。比如说MapReduce框架,除了少数几个操作符以外,大多数都可以和物理计划一一对应。除逻辑操作符以外,还有几个物理操作符:Local Rearrange(LR)、 Global Rearrange (GR)、和 Package(P)操作符。

GROUPCOGROUP或是JOIN这样的逻辑操作符,会被转换成有序的LR、GR和P操作符,上图中也已经表明了这一点。LR操作符对应洗牌的准备阶段,将数据按键进行分区。GR操作符对应Map和Reduce任务之间真正的洗牌操作。P操作符则是Reduce过程中的分区操作符。

3.4.3 MapReduce计划

Pig编译过程中的最后一个阶段是将物理计划编译成真实的MapReduce作业。物理计划中的LR、GR和P操作符组成的序列至少需要一个Reduce任务。编译器也会寻找机会在合适的地方添加一些Combiner。上图中的脚本转换成MapReduce计划后将会有两个MapReduce作业,一个对应逻辑计划中的JOIN操作符,另一个则对应GROUP操作符。下图表明了上述脚本所对应的MapReduce计划。对应GROUP操作符的MapReduce任务中多了一个Combiner。必须要说明的是,GROUP操作发生在Map任务中。这是因为下图中Reduce1的输出将按键做排序。

3.5 开发和调试助手

有三个重要的命令可以帮助我们开发,调试及优化Pig脚本。

3.5.1 DESCRIBE命令

DESCRIBE命令会显示一个关系的模式(schema)。当你是Pig Latin的初学者,或是想要了解操作符是如何转换数据时,会发现这个命令很有用。上述脚本中的groupByCountry(用来找出每个国家的人口数)所对应的输出是:

groupByCountry: {group: chararray,generateRecords: {(cc::cname:
    chararray,ccity::cityName: chararray,ccity::population: long)}}

DESCRIBE命令的输出同样符合Pig的语法。在前面的例子中,groupByCountry是Bag数据类型,它包含一组元素和另一个bag:generateRecords

3.5.2 EXPLAIN命令

对于一个关系,EXPLAIN命令会显示Pig脚本将如何执行。当我们试着去优化Pig脚本或是调试错误时,它会很有帮助。它会显示一个关系的逻辑计划、物理计划及MapReduce计划。下面的截屏显示的就是当我们对populationByCountry执行EXPLAIN命令时,第二个MapReduce作业(对应GROUP操作符)的MapReduce计划。你可以使用EXPLAIN命令去学习针对各种计划所采取的优化。

3.5.3 ILLUSTRATE命令

ILLUSTRATE命令也许是最重要的开发助手了。当你对一个关系使用ILLUSTRATE命令时,它会给出一些样本数据,然后你可以在这些样本数据上做查询,这可以节省大量的调试时间。样品数据要比实际数据小很多,这样,编码-测试-调试周期就变得非常快。在很多情况下,JOINFILTER操作符并不会产生输出,这时候ILLUSTRATE会制造一些流经这些操作符的记录,并将它们放进样本数据集中。下面的截屏显示的是对populationByCountry关系执行ILLUSTRATE命令后的部分结果:

{%}

3.6 Pig操作符的高级特性

在本节中,我们将研究一些Pig操作符的高级特性及有用的提示。

3.6.1 FOREACH操作符进阶

FOREACH操作符主要用于将输入关系的每一条记录转换成别的记录。表达式列表被用来做这个转换操作。在某些情况下,FOREACH操作符会增加输出数据的记录条数。我们会在后面的部分中讨论这些内容。

1. FLATTEN操作符

FLATTEN关键字是一个操作符,不过它在语法上看起来像一个UDF。它是用来解套(un-nest)那些嵌套的tuple和bag的。然而当解套分别作用在tuple和bag上时,语义却是不同的。

如下面的代码片段所示,当对一个嵌套的tuple运用FLATTEN时,会产生一个单一的tuple。所有的嵌套tuple都会被提升到最高级别。

思考一下下面的数据:

(1, (2, 3, 4))
X = FOREACH A GENERATE $0, FLATTEN($1);

这会产生一个(1,2,3,4)的tuple。

对于bag,情况就更加复杂了。当我们解套一个bag时,会产生新的tuple。假设我们有一个由tuple组成的bag关系({(b,c),(d,e)}),然后我们对它做GENERATE FLATTEN$0)操作,最后我们会得到两个tuple,(b,c)(d,e)。换句话说,FLATTEN做了一次交叉乘积,bag中的每一元素都会产生一行记录。

让我们以worldcitiespop.txt作为一个例子来看一下:

cCity = load 'worldcitiespop.txt' using PigStorage(',') as
    (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
groupCcityByCcode = group cCity by ccode;

groupCcityByCcode变量产生了一个包含tuple的嵌套bag,如下面所示,tuple的个数等于group的个数,在这个例子中即为国家的个数:

(ae, {(ae,ae,sharjah,Sharjah,6,543942,25.35731,55.403304),
(ae,ae,dubai,Dubai,3,1137376,25.258172,55.304717)})

FLATTEN操作符可以将它们解套,对于指定的国家,每个城市生成一条记录,代码如下:

unGroupCcityByCcode = foreach groupCcityByCcode generate group,
    FLATTEN(cCity);

嵌套bag中每个元素与国家代码的交叉乘积结果是:

(ae,ae,sharjah,Sharjah,6,543942,25.35731,55.403304)
(ae,ae,dubai,Dubai,3,1137376,25.258172,55.304717)

 对空的嵌套bag和tuple运用FLATTEN操作符不会产生输出。这是因为,在数学上,一个非空的集合与一个空的集合做交叉乘积后会得到一个空的集合。如果你并不希望得到这样的结果,最好的办法是用一个常量bag取代空的bag和tuple。

2. 嵌套FOREACH操作符

关系操作符可以应用在FOREACH操作符的每一条记录上。这被称为嵌套FOREACH或内FOREACH操作符。让我们通过worldcitiespop.txt文件上的一个例子来研究一下。我们想找出每个国家人口最多的那个城市的详细信息。有很多方法可以解决这个问题,但这次我们将使用嵌套FOREACH操作符,代码如下:

ccity = load 'worldcitiespop.txt' using PigStorage(',') as
    (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
groupCcityByCcode = group cCity by ccode;
cityWithHighestPopulation = foreach groupCcityByCcode {
    citiesWithPopulation = filter cCity by (population is
        not null AND population > 0);
    orderCitiesWithPopulation = order citiesWithPopulation by
        population desc;
    topPopulousCity = limit orderCitiesWithPopulation 1;
    generate flatten(topPopulousCity); };

具体步骤如下。

(1) 第一步是基于一种模式(schema)加载worldcitiespop.txt文件。

(2) 然后,通过国家编号将数据分组。下一句就是嵌套FOREACH操作符了。花括号({)语法表示嵌套FOREACH操作符。

(3) 在嵌套的部分中,各种关系操作符被应用于分组bag。

  (a) FILTER操作符用来剔除每个国家中那些缺少人口值的城市。另一种更有效的方法是在FOREACH操作符之前先做过滤(参见3.9节)。 在这个例子中,我们在嵌套FOREACH操作符里面做过滤。

  (b) 过滤后的城市bag,通过ORDER操作符,按城市人口的降序进行了排序。现在第一条记录是人口最多的城市了。

  (c) 我们用LIMIT操作符来选择第一条记录。

  (d) 嵌套FOREACH操作符的最后一句语句往往是GENERATE方法。这一步,生成了人口最多城市的记录。FLATTEN操作符用来解套bag。

最终输出的结尾部分内容如下:

(uz,tashkent,Tashkent,13,1978078,41.3166667,69.25)
(vc,kingstown,Kingstown,4,17995,13.1333333,-61.2166667)
(ve,maracaibo,Maracaibo,23,1948269,10.6316667,-71.6405556)
(vg,road town,Road Town,0,8449,18.4166667,-64.6166667)
(vn,ho chi minh city,Ho Chi Minh City,20,3467426,10.75,106.666667)
(vu,vila,Vila,8,35903,-17.7333333,168.3166667)
(wf,alele,Alele,0,901,-13.2333333,-176.15)
(ws,apia,Apia,0,40407,-13.8333333,-171.7333333)
(ye,aden,Aden,2,550744,12.7794444,45.0366667)
(yt,mamoudzou,Mamoudzou,0,54837,-12.7794444,45.2272222)
(za,cape town,Cape Town,11,3433504,-33.925839,18.423218)
(zm,lusaka,Lusaka,9,1267458,-15.4166667,28.2833333)
(zw,harare,Harare,4,2213701,-17.8177778,31.0447222)

 现在,Pig的嵌套FOREACH操作符支持LIMITORDERDISTINCTCROSSFOREACHFILTER这些关系操作符。

3. COGROUP操作符

这个操作符有点类似GROUP操作。它按键聚积n 组输入的记录,而不是只针对一组。GROUP操作符作用于一个单一的输入关系,而COGROUP则能够作用于很多组输入关系。COGROUP可以被认为是连接(join)的第一个阶段——COGROUP操作符后面紧跟一个用于解套bag的FOREACH操作符,这就是一个内连接(inner join)操作。下面的代码演示了worldcitiespop.txt和countrycodes.txt这两个文件之间使用COGROUP实现的连接:

cc = load 'countrycodes.txt' using PigStorage(',') as
   (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
   (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
groupedCity = cogroup cc by ccode, ccity by ccode;
flattendGroupedCity = foreach groupedCity generate flatten(cc),
    flatten (ccity);
filteredGroup = filter flattendGroupedCity by cc::ccode ==
    ccity::ccode;

 COGROUP操作符最多可以将127个关系一起分组。

像半连接(semi-join)这样的操作可以通过COGROUP操作符实现。

 两个关系的半连接指的是第一个关系中的记录,通过连接键可以在第二个关系中找到一条或多条匹配的记录。

4. UNION操作符

UNION操作符被用来连接两个或多个数据集。不同于SQL,Pig中的UNION操作符对这两个数据集的模式并没有限制。如果它们模式相同,则结果也是同样的模式。如果一个模式通过强制转换可以变成另一个模式,那么结果将是这个模式。否则,结果就没有模式。

UNION操作符不保留tuple的顺序,它也不会剔除重复的tuple。UNION操作符有一个叫作ONSCHEMA的关键字,它会给出结果集的模式。这个模式是数据集中所有命名字段的一个并集。ONSCHEMA关键字要求所有的输入关系有同一个模式。

在我们的countrycodes.txt和worldcitiespop.txt文件里,数据集的模式并不一致,于是结果也就没有任何模式。然而,若我们使用UNION操作符时一起使用ONSCHEMA关键字,结果集就会有一个模式。这个模式就是所有关系的模式的一个并集。下面的代码会清楚地说明这一点:

cc = load 'countrycodes.txt' using PigStorage(',') as
   (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
   (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);

unionCountryCity = union cc, ccity;
unionOnSchemaCountryCity = union onschema cc, ccity;
describe unionCountryCity;
describe unionOnSchemaCountryCity;

 模式的比较包括字段的名称。对于字段有不同名称的数据集,如果使用UNION操作符的话,结果集将没有模式。这种情况下的解决方法是在UNION语句前,先使用FOREACH操作符将那些字段改成相同的名称。

Describe语句的结果如下:

Schema for unionCountryCity unknown.
unionOnSchemaCountryCity: {ccode: chararray,cname:
    chararray,cityName: chararray,cityFullName: chararray,region:
        int,population: long,lat: double,long: double}

5. CROSS操作符

CROSS操作符对两个关系进行交叉集合的操作。Pig中的CROSS操作符是通过并行地构造一个人工的连接键,然后复制记录来实现的。这使得使用CROSS操作符的开销十分昂贵,特别是在洗牌和排序阶段,这是因为每一个被创建的人工连接键都需要复制所有参与连接的记录。

尽管如此,在有些情况下CROSS操作符仍然是必须的。其中一种情况就是θ连接。内连接是基于相等的连接键,也就是说,在交叉连接几个关系的记录时使用的是相等操作。然而,有时候需要使用不等式来连接记录。这时候,就可以通过CROSS操作符,随后紧跟一个对连接键的FILTER操作来实现θ连接。下面这个假设的例子演示了如何使用CROSS操作符来实现θ连接。只有当a1的值小于b1时,连接才会发生:

A = LOAD 'inputA.txt' AS (a0:chararray, a1:int);
B = LOAD 'inputB.txt' AS (b0:chararray, b1:int);
ACrossB = CROSS A, B;
thetaJoin = FILTER ACrossB BY a1 < b1;

模糊连接(fuzzy join)则是另一种可以用CROSS操作符来实现的连接变种。将worldcitiespop.txt中有相同区域编号的城市做自连接(self-join)就是一个例子。

3.6.2 Pig的特殊连接

Pig支持连接优化。这些优化根据数据集以及连接的特性可以直接使用。这些连接优化提升了Pig脚本的性能,所以强烈推荐。

1. 复制连接

在第2章中,我们实现了Map侧的连接和Reduce侧的连接。当连接的某个输入数据集小到能够全部加载进内存时,Map侧就会将最小的数据集复制到所有的Map任务中,然后执行连接操作。这在Pig中被称为分段复制连接(fragment-replicate join)。这也就是Map侧的连接在Pig中的实现。关于分段复制连接,下面有一些关键点需要记住。

  • 如果小文件不能被加载进内存,Pig会抛出异常并执行失败。如果小文件大于pig.join.replicated.max.bytes属性的设定值 I,Pig也会抛出异常。

  • 分段复制连接可以连接多于2个数据集。然而,除了第一个数据集以外,其他的都会被加载进内存。

  • 输入数据通过分布式缓存被复制到不同的Map任务中去,这一点跟我们在第2章中讨论过的实现很相似。

  • 分段复制连接可以被用于内连接(inner join)或是左外连接(left-outer join),不能用于右外(right-outer)或是全外连接(full-outer join)。这是因为左边的关系会被分段,而右边的关系会被整个复制。当连接处理器从右边的关系中得到一条记录时,由于它只有左边关系的一个本地视图,所以它并不知道在左边关系的别的分片里是否存在着匹配的键。

你可以通过replicated关键字来使用分段复制连接。下面的例子就是使用这种连接将countrycodes.txt和worldcitiespop.txt连接起来。必须注意的是,countrycodes.txt是其中数据量相对较小一点的关系,而且在连接的定义中也是被写在了后面:

cc = load 'countrycodes.txt' using PigStorage(',') as
   (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
   (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
joinCountryCity = join ccity by ccode, cc by ccode using
    'replicated';

2. 倾斜连接

数据倾斜的存在会使某个Reduce任务超载运行,从而影响整个连接的性能。数据倾斜是统计上的一种奇怪问题,恰巧一个或是少量的键却拥有超大量的记录。通过提供数据倾斜连接(skewed join),Pig能帮助缓解这种情况。具体思路就是对连接的输入关系做采样,然后对每个键的记录条数绘制直方图。

随后分析直方图,那些拥有很多记录的键会被分割,然后分派给不同的Reduce任务。通过这种方法,在Reduce端实现了记录的负载均衡。然而,这也需要复制其他的输入关系,以便那些Reduce任务都能够拥有相关的记录,从而顺利连接。

当执行数据倾斜连接时,有一些关键点需要记住。

  • 数据倾斜连接只可以对两个数据集进行操作。如果你有两个以上数据集需要连接,那么开发者的职责就是将它分解成多个双表连接。

  • 当使用这种连接时,由于需要采样和构造直方图,所以会增加一些性能开销。据观察,这个额外开销平均在5%左右。

  • 被采样的数据集是连接的第二个关系。

  • pig.skewedjoin.reduce.memusage参数值用来决定需要多少额外的Reduce任务去处理数据倾斜键。这个属性的默认值是0.5,也就是说JVM堆的50%可分配给Reduce任务去运行这个连接。

通过skewed关键字来使用数据倾斜连接,如下例所示:

cc = load 'countrycodes.txt' using PigStorage(',') as
   (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
   (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
joinCountryCity = join cc by ccode, ccity by ccode using 'skewed';

3. 合并连接

正如我们在第2章中看到的,如果输入的数据都按连接键做了排序,那么可以在Map侧直接做连接。Pig也实现了这种类型的连接,称为排序连接(sort join)或合并连接(merge join)。

这个连接算法把第二个关系放到一边,将第一个关系放入Map任务。被放到一边的关系会对键做采样,然后用MapReduce作业创建索引。索引是键和偏移(offset)之间的映射,其中的键记录了文件的开始位置。一旦索引创建完毕,另一个MapReduce作业就开始了,并将第一个关系作为输入数据。作业读取每一条记录,并且在索引中寻找第二个文件中相应的偏移量。随后第二个关系中的记录也被读取出来,连接也随之完成。

下面的例子用countrycodes.txt和worldcitiespop.txt演示了一个合并连接。同样,我们注意到countrycodes.txt会被创建索引。通过merge关键字来使用合并连接:

cc = load 'countrycodes.txt' using PigStorage(',') as
   (ccode:chararray, cname:chararray);
ccity = load 'worldcitiespop.txt' using PigStorage(',') as
   (ccode:chararray, cityName:chararray, cityFullName:chararray,
        region:int, population:long, lat:double, long:double);
joinCountryCity = join cc by ccode, ccity by ccode using 'merge';

合并连接有一种变体称为合并稀疏连接(merge-sparse join)。当某一个关系中的数据十分稀疏时可以使用这个连接,也就是说,在这个连接中只有很少量的记录被匹配。这个连接类型仍然处于实验中。目前,合并稀疏连接算法仅支持内连接。

3.7 用户定义函数

用户定义函数(User-Defined Functions,UDF),可由开发者自行实现,用以扩展Pig的功能,添加自定义处理。这些函数几乎可以被所有的Pig操作符所调用。最初UDF用Java编写。从Pig 0.8开始,UDF支持Python。在Pig的最新版本中,除Python和Java以外,还可以使用Jython、JavaScript、Ruby和Groovy编写。

除了Java,其他的语言并不支持所有的Pig接口。比如,加载和存储接口就不支持其他的语言。本书中,我们将使用Java来创建和说明UDF的力量。

有一个Java UDF存储库称为piggy bank。这是一个公共的存储库,你可以利用别人写好的UDF,同时也可以贡献你自己的UDF给社区。

在Pig中使用UDF之前,需要在Pig的脚本中先注册这个JAR文件。使用REGISTER命令可以进行注册。除了每个Map或是Reduce任务中的UDF类的实例外,Pig还会在脚本的逻辑计划和物理计划中创建一个实例。这样主要是为了做验证。

 每个Map和Reduce任务都有自己的UDF副本。跨Map和Reduce任务是无法共享状态的,不过,在同一个Map或Reduce任务中是可以共享的。

Pig的UDF大致可以分为以下三种类型:

  • 运算函数(evaluation function)

  • 加载函数(load function)

  • 存储函数(store function)

让我们一个个来看看其中的细节。

3.7.1 运算函数

顾名思义,这些函数都是用于计算的。下面的例子就是一个自定义的UDF以及它在Pig脚本中的使用方法。所有的运算函数都从org.apache.pig.EvalFunc基类派生而来。最重要的重写(override)方法是exec方法。EvalFunc类的返回值是一个泛型,我们需要明确UDF的返回类型。exec方法的输入是一个Tuple类型。使用get()方法来解开这个Tuple,然后exec方法就能处理这些解开的数据项。最简单的UDF只需要重写exec方法即可:

package MasteringHadoop;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import java.io.IOException;

public class UPPER extends EvalFunc<String>{


    @Override
    public String execTuple objects) throws IOException {

        if(objects == null || objects.size) == 0){
            return null;
        }
        try{
            String inputString = String) objects.get0);
            return inputString.toUpperCase);
        }
        catch(Exception ex){

            throw new IOException("Error processing input ", ex);
        }

    }
}
register MasteringHadoop-1.0-SNAPSHOT-jar-with-dependencies.jar;
cc = load 'countrycodes.txt' using PigStorage(',') as
    (ccode:chararray, cname:chararray);
ccCapitalized = foreach cc generate
    MasteringHadoop.UPPER(cc.cname);

1. 聚合函数

这些UDF都是针对组(group)的运算函数。比如内置的SUMCOUNT之类的函数都是这类聚合函数(aggregate function)。聚合UDF接受一个bag的输入,返回一个标量(scalar)。

 整条记录可以用*传递给UDF。当整条记录被传递后,它会被包裹进别的tuple。比如,执行input.get(0).get(1)就可以得到一条记录的第二个元素。第一个get()调用可以从tuple中得到整条记录。

  • Algebraic接口

    如果一个聚合函数实现了可用于本地聚合处理的Algebraic接口,那么Combiner就会被使用。在第2章中,我们研究了Combiner如何帮助减少从Map任务到Reduce任务的数据流,以及它是如何通过减少IO次数来加快查询的。

    任何代数(algebraic)函数都可以分解成三个函数:初始函数(initial function),中间函数( intermediate function),最终函数(final function)。如果这三个函数以级联方式连接,它就被标记为一个代数函数。也就是说,首先数据被分解成分段,然后初始函数对这些分段数据进行处理,随后初始函数的结果又被中间函数处理,最后,中间函数的结果又会被最终函数处理。COUNT函数就是一个代数函数的例子。它的初始函数是count,中间函数和最终函数都是前一个函数执行结果的sum(求和)。

    分布(distributive)函数是一种特殊的代数函数。所有的三个子函数都做同样的计算。SUM就是分布函数的一个例子。

    Pig提供了org.apache.pig.Algebraic接口,实现这个接口可以使UDF具有代数特性。下面的例子显示的就是一个实现Algebraic接口的COUNT聚合UDF。

    代数函数要进行如下的转换。使用Combiner的Map任务将执行InitialIntermediate静态类的exec方法,Reduce任务将执行Final类的exec方法:

    package MasteringHadoop;
     
    import org.apache.pig.Algebraic;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.Tuple;
     
    import java.io.IOException;
    import java.util.Iterator;
     
    public class COUNT extends EvalFunc<Long> implements Algebraic {
     
        protected static Long count(Tuple input) throws
            ExecException{
            DataBag dataBag = (DataBag) input.get(0);
            return dataBag.size();
        }
     
        protected static Long sum(Tuple input) throws ExecException{
     
            long returnSum = 0;
            DataBag dataBag = (DataBag) input.get(0);
            for(Iterator<Tuple> it = dataBag.iterator();
                it.hasNext();){
                Tuple tuple = it.next();
                returnSum += (long)tuple.get(0);
            }
            return returnSum;
        }
     
        static class Initial extends EvalFunc<Long>{
     
            @Override
            public Long exec(Tuple objects) throws IOException {
                return count(objects);
            }
        }
     
        static class Intermediate extends EvalFunc<Long>{
     
            @Override
            public Long exec(Tuple objects) throws IOException {
                return sum(objects);
            }
        }
     
        static class Final extends EvalFunc<Long>{
     
            @Override
            public Long exec(Tuple objects) throws IOException {
                return sum(objects);
            }
        }
     
        @Override
        public Long exec(Tuple objects) throws IOException {
            return count(objects);
        }
     
        @Override
        public String getInitial() {
            return Initial.class.getName();
        }
     
     
        @Override
        public String getIntermed() {
            return Intermediate.class.getName();
        }
     
     
        @Override
        public String getFinal() {
            return Final.class.getName();
        }
    }
    
  • Accumulator接口

    在很多情况下,当使用GROUPCOGROUP操作符时,tuple中的所有bag不能按某个特定键全部加载到内存里。而且,UDF也并不需要一次性访问所有的tuple。Pig允许UDF通过实现Accumulator接口去处理这些情况。Pig并不是一次性传递全部记录,而是通过这个接口,针对某个给定的键,增量地传递记录的子集。

    虽然Algebraic接口通过聚合处理缓解了内存的问题,但仍然有很多函数并不具有代数性质。这些函数仍可以通过累加来实现聚合,而且可能不需要访问整个数据集。

    让我们实现LongMax UDF,通过Accumulator接口找出bag中的最大值。如下面的代码所示,有三个方法需要实现:accumulategetValuecleanup。当一个中间记录集被传递给UDF时会调用accumulate方法,当每个键被处理后会调用cleanup方法:

    package MasteringHadoop;
     
    import org.apache.pig.Accumulator;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.Tuple;
     
    import java.io.IOException;
    import java.util.Iterator;
     
    public class LONGMAX extends EvalFunc<Long> implements
        Accumulator<Long> {
     
        private Long intermediateMax = null;
     
        @Override
        public Long exec(Tuple objects) throws IOException {
            return max(objects);
        }
     
        @Override
        public void accumulate(Tuple objects) throws IOException {
            Long newIntermediateMax = max(objects);
     
            if(newIntermediateMax == null){
                return;
            }
     
            if(intermediateMax == null){
                intermediateMax = Long.MIN_VALUE;
            }
     
            intermediateMax = Math.max(intermediateMax,
                newIntermediateMax);
        }
     
        @Override
        public Long getValue() {
            return intermediateMax;
        }
     
        @Override
        public void cleanup() {
            intermediateMax = null;
        }
     
        protected static Long max(Tuple input) throws ExecException{
            long returnMax = Long.MIN_VALUE;
            DataBag dataBag = (DataBag) input.get(0);
            for(Iterator<Tuple> it = dataBag.iterator();
                it.hasNext();){
                Tuple tuple = it.next();
                Long currentValue = (Long)tuple.get(0);
                if(currentValue > returnMax){
                    returnMax = currentValue;
                }
            }
            return returnMax;
     
        }
    }
    
    

2. 过滤函数

过滤函数(filter function)也是运算函数,只不过它返回的是布尔值(Boolean)。只要是布尔表达式运算的地方就可以使用它们。它们最常被用作FILTER操作符的一部分。它们实现了FilterFunc接口。

3.7.2 加载函数

Pig脚本中的加载函数是用来处理输入数据的。它们实现了LoadFunc抽象类,且随同LOAD语句一起被使用。下例是一个简单的加载CSV文件的UDF。需要重写setLocationgetInputFormatprepareToReadgetNext这些方法。

setLocation函数告知加载的路径,随后加载器(loader)将这个信息通知给InputFormatsetLocation方法可以被Pig多次调用。

prepareToRead方法得到InputFormat类的RecordReader对象。然后在getNext方法中,可以用RecordReader来读取并解析记录。getNext方法将记录解析成Pig的复合数据类型,如下例所示,它读取每一行记录,然后将它们解析成tuple。

getInputFormat方法通过加载器将InputFormat类交给Pig。Pig同样以Hadoop MapReduce作业的方式调用InputFormat。下面的代码段显示了这个加载CSV文件的UDF。

 如果需要递归地读取HDFS文件夹中的文件,则可以使用PigFileInputFormatPigTextInputFormat。你可以在org.apache.pig.backend.hadoop.executionengine.mapReduceLayer包里找到这些Pig所特有的InputFormat类。Hadoop自带的TextInputFormatFileInputFormat只能读取一层目录的文件。

package MasteringHadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine
    .mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import java.io.IOException;
import java.util.ArrayList;

public class CsvLoader extends LoadFunc {
    private RecordReader recordReader = null;
    private TupleFactory tupleFactory =
        TupleFactory.getInstance();
    private static byte DELIMITER = (byte)',';
    private ArrayList<Object> tupleArrayList = null;


    @Override
    public void setLocation(String s, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, s);
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new TextInputFormat();
    }

    @Override
    public void prepareToRead(RecordReader recordReader, PigSplit
        pigSplit) throws IOException {
        this.recordReader = recordReader;
    }

    @Override
    public Tuple getNext() throws IOException {
        try{

            if(recordReader.nextKeyValue()){

                Text value = (Text)
                    recordReader.getCurrentValue();
                byte[] buffer = value.getBytes();

                tupleArrayList = new ArrayList<Object>();

                int start = 0;
                int i = 0;
                int len = value.getLength();

                while(i < len){

                    if(buffer[i] == DELIMITER){

                        readFields(buffer, start, i);
                        start = i + 1;

                    }
                    i++;
                }

                readFields(buffer, start, len);

                Tuple returnTuple =
                    tupleFactory.newTupleNoCopy(tupleArrayList);
                tupleArrayList = null;

                return returnTuple;


            }

        }
        catch(InterruptedException ex){
            // 错误处理

        }
        return null;


    }


    private void readFields(byte[] buffer, int start, int i){
        if(start == i){
            // Null字段
            tupleArrayList.add(null);
        }
        else{
            // 从start读到i
            tupleArrayList.add(new DataByteArray(buffer, start, i));
        }

    }


}

3.7.3 存储函数

存储UDF和加载UDF类似。它们扩展StoreFunc抽象类,处理Hadoop的OutputFormat相关的类以及RecordWriter。需要重写StoreFunc抽象类中的putNextgetOutputFormatsetStoreLocationprepareToWrite方法。

3.8 Pig的性能优化

在本节中,我们将看到不同的性能参数,以及如何调整它们从而优化Pig脚本的执行。

3.8.1 优化规则

优化规则适用于为Pig脚本而生成的逻辑计划。默认情况下,所有规则都是打开的。pig.optimizer.rules.disabled属性可以关闭这些规则,也可以在执行Pig脚本时通过指定命令行参数-optimizer_off来关闭规则。不过有些规则是强制性的,不能被关闭。参数all可以关闭所有的非强制性规则:

set pig.optimizer.rules.disabled <comma-separated rules list>

或者,你也可以使用以下的命令:

pig -t|-optimizer_off [rule name | all]

 默认情况下,FilterLogicExpressionSimplifier是关闭的。可以通过将属性pig.exec.filterLogicExpressionSimplifier的值设为true来打开它。

我们即将讨论的多数优化规则都很简单,而且都借鉴了数据库的查询优化。

  • PartitionFilterOptimizer:这个规则将所有上游的过滤都下推到加载器。很多加载器都是分区敏感的,并且会被指示用过滤条件加载一个分区。

  • FilterLogicExpressionSimplifier:打开这个规则可以简化过滤语句表达式。以下是一些已经完成的简化处理。

    • 常量预计算:任何计算常量的表达式都会被预先计算。

      X = FILTER A BY $0 > 2*5;会被简化成X = FILTER A BY $0 > 10;

    • 去除否定:过滤表达式中的否定都会被去除,当然逻辑含义不会发生改变。

      X = FILTER A BY NOT(NOT ($0 > 10) OR $0 > 20);会被简化成X = FILTER A BY $0 > 10 AND $0 <= 20;

    • 去除AND中的隐含表达式AND表达式中多余的逻辑条件会被去除。

      X = FILTER A BY $0 > 5 AND $0 > 10;会被简化成X = FILTER A BY $0 > 10;

    • 去除OR中的隐含表达式OR表达式中多余的逻辑条件会被去除。

      X = FILTER A BY $0 > 5 OR $0 > 15;会被简化成X = FILTER A BY $0 > 5;

    • 去除等价:表达式中的等价比较都会被简化。

      X = FILTER A BY $0 != 5 AND $0 > 5;会被简化成X = FILTER A BY $0 > 5;

    • 去除OR互补表达中的过滤:当OR中存在互补表达式时,过滤是不会被执行的。

      这个例子X = FILTER A BY $0 <= 5 OR $0 > 5;中的过滤是不会被执行的。

    • 去除“总是为真”的表达式:结果总是为真的过滤表达式会被去除。

      X = FILTER A BY 1 == 1;

  • SplitFilter:这个优化规则尝试分割过滤语句。这个SplitFilter优化与其他的过滤优化组合使用时,对于提升性能将会非常有效。在下面的例子中,SplitFilter优化将joinCountryFilter关系分割成两个过滤。

    joinCountryFilter1 = filter joinCountry by
        INDEXOF(cc::ccode, 'a', 0) == 0;
    joinCountryFilter = filter joinCountryFilter1 by population > 0;
    
    cc = load 'countrycodes.txt' using PigStorage(',') as
        (ccode:chararray, cname:chararray);
    
    ccity = load 'worldcitiespop.txt' using PigStorage(',') as
        (ccode:chararray, cityName:chararray,
            cityFullName:chararray, region:int,
                population:long, lat:double, long:double);
    joinCountry = join cc by ccode, ccity by ccode;
    store joinCountry into 'country-code-join-pig' using
        PigStorage(',');
    joinCountryFilter = filter joinCountry by
        INDEXOF(cc::ccode, 'a', 0) == 0 and population > 0;
    
  • PushUpFilter:这种优化背后的思想是将数据管道中的过滤语句推往上游。这样做的好处是减少了将要被处理的记录条数。在SplitFilter例子中,一旦过滤被分割,PushUpFilter会移动joinCountryFilter1,并且把joinCountryFilter移到JOIN语句和LOAD语句之间。

  • MergeFilterMergeFilter规则是SplitFilter的补充。SplitFilter应用在PushUpFilter之前,而MergeFilter是应用在PushUpFilter之后。多个相同数据集的过滤被合并成一个单一的过滤。

    X = FILTER A BY $0 > 10;和
    Y = FILTER X BY $1 > 10; 会被合并成
    Y = FILTER A BY ($0 > 10 AND $1 > 10);
    
  • PushDownForEachFlattenFOREACH语句中的FLATTEN操作通常会产生比输入更多的tuple。秉承着“在数据管道中处理最少记录条数”这一原则,PushDownForEachFlatten优化将这些FOREACH语句推往下游。在下面的例子中,FOREACH语句将被移到JOIN语句之后。

    X = FOREACH A GENERATE FLATTEN($0), $1;
    Y = JOIN X BY $1, B BY $1;
    
  • LimitOptimizer:和PushUpFilter类似,这里的思想是将LIMIT操作符语句往上游推动。这样可以减少下游需要处理的记录条数。

  • ColumnMapKeyPrune:这种优化背后的思想是让加载器只加载需要的数据列。如果加载器无法做到这一点,那么就在加载调用之后插入一条FOREACH语句。这个优化可以很好地作用在map键上。

  • AddForEachAddForEach优化用于将脚本不再需要的列尽快裁剪掉。在下面的例子中,column1ORDER语句之后不再被使用。

    A = LOAD 'input.txt' AS (column1, column2);
    X = ORDER A by column1;
    Y = FILTER X by column2 > 0;
    
    

    一个FOREACH操作符会被添加到ORDERFILTER语句的中间:

    X1 = FOREACH X GENERATE column2;
    Y = FILTER X1 by column2 > 0;
    
  • MergeForEach:这个优化将多个FOREACH语句合并成一个FOREACH语句。这样可以不必多次遍历数据集。这个优化只有当下面的三个条件都满足时才生效。

    • FOREACH中不包含FLATTEN操作符。

    • FOREACH语句是连续的。

    • FOREACH语句中没有嵌套。序列中第一个FOREACH语句除外。

  • GroupByConstParallelSetter:在一个执行GROUP ALL的语句中,即使将PARALLEL设置成Reduce任务的数量,仍然只会使用一个Reduce任务。其余的Reduce任务会返回空的结果。这个优化自动将Reduce任务的数量设置为1。

3.8.2 Pig脚本性能的测量

UDF是开发者所写的函数,这些函数可能需要性能分析来识别热点。Pig提供了一些使用了Hadoop计数器的UDF统计功能。可以把pig.udf.profile设为true。一旦这个设置有效以后,Pig会跟踪执行某个特定UDF所花的时间,以及UDF的调用频率。approx._microsecs测量UDF中大致花费的时间,approx._invocations则测量UDF在执行过程中被调用的次数。

 通过设置pig.udf.profile,可以在Hadoop作业执行过程中启用计数器。正如我们在前一章中看到的,计数器是全局的,而且在跟踪Hadoop作业时会增加额外开销。所以此设置应谨慎使用,最好只在测试时设置。

3.8.3 Pig的Combiner

在前一章,我们已经了解Combiner如何减少磁盘I/O,同时减少通过网络从Map任务发送到Reduce任务的数据量。在Pig中,基于脚本的结构,Combiner也可能会被调用。下面是一些调用Combiner的条件。

  • 使用无嵌套的FOREACH语句。

  • 一条FOREACH语句中的所有投影都是分组表达式,或者说所使用的UDF都是代数函数,也就是说它们实现了Algebraic接口。

 当DISTINCT是嵌套中唯一一个操作符时,Combiner也可以被用在嵌套FOREACH语句中。

在以下条件下,Combiner不会被使用。

  • 脚本在执行前面提到的规则时失败。

  • GROUPFOREACH之间存在任何语句;Pig 0.9以后,LIMIT操作符除外。

 逻辑优化器可能会使用PushUpFilter优化将任何紧随FOREACHFILTER操作符推往上游。这可能会阻碍Combiner的使用。

3.8.4 Bag数据类型的内存

Bag是唯一一种当数据不能全部加载到内存时,会被保存到磁盘的复合数据类型。pig.cachedbag.memusage参数决定了分配给bag的内存百分比。默认值是0.2,也就是说应用中的所有bag可以共享20%的的内存。

3.8.5 Pig的reducer数量

不同于原始的MapReduce,Pig会根据输入数据的大小来决定Reduce任务的数量。输入的数据根据pig.exec.reducers.bytes.per.reducer参数的值来进行切分,从而得到Reduce任务的数量。这个参数的默认值是1000000000(1 GB)。不过,Reduce任务的最大数量由pig.exec.reducers.max参数的值决定。它的默认值是999。

实现计算Reduce数量算法的类由pig.exec.reducer.estimator决定。只要实现了org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator接口,然后将完整的类名写到该配置项中,就可以用自定义算法覆盖它。通过提供一个值给pig.exec.reducer.estimator.arg配置项,就可以传递参数给这个自定义的算法。这个值被作为字符串参数传递给构造函数。

3.8.6 Pig的multiquery模式

默认的情况下,Pig以multiquery模式执行。一个Pig脚本中所有的语句将作为一个Pig作业执行。比如:

#使用-M或-no_multiquery参数来关闭multi-query模式的执行.
pig -M <script> or pig -no_multiquery <script>

 应避免使用DUMP,因为它禁用multiquery执行,而这会导致重新评估关系,使得Pig脚本变得低效。相反,使用STORE不失为一个好办法。交互式命令DUMP会强制Pig编译器避免multiquery执行。

当multiquery执行发生时,对用户来说,一定要区分哪些作业执行成功,哪些执行失败。STORE命令的输出将有不同的路径,然后通过查看这些文件可以知道执行的结果。此外,在执行结束时,Pig会返回一个表明脚本执行状态的返回值。下表显示了不同的返回值及它们所代表的含义:

返回值

含义

0

成功

1

可修复的错误

2

失败(全部)

3

失败(部分)

3.9 最佳实践

上一节中所述的优化规则通过改变Pig脚本的逻辑计划以提高性能。我们知道,这些规则将有助于开发高效的脚本,同时,另外有一些做法也可以加快Pig脚本。但这些最佳的实践并不能称之为规则,因为它们是针对特定的应用程序和数据而言。同时,这些优化规则趋于保守,所以并不能保证一定有效。

3.9.1 明确地使用类型

Pig支持很多类型,既有基本的,也有复杂的。类型的正确使用可以加快你的脚本,有时甚至能达到2倍。比如,在Pig中,所有没有类型声明的数值计算都默认为double计算。Pig中的double类型占用8个字节的存储,而int类型只占4个字节。int的计算速度比double类型的更快。

3.9.2 更早更频繁地使用投影

正如我们之前所看到过的AddForEachColumnMapKeyPrune优化器,只投影(project)下游需要的字段就是一种好的实践。这有助于减少需要传输的数据以及下游要处理的数据。检查脚本,看看其中是否包含没有被使用的字段,这也是一种很好的做法。在每个使用FOREACH语句的操作后,只投影必要的字段从而剔除未使用的字段。更早、更频繁地使用投影是Pig的一项最佳实践。

3.9.3 更早更频繁地使用过滤

类似于投影,更早、更频繁地使用过滤一样有效。同样,过滤减少需要传输的数据以及下游要处理的数据。过滤通过减少记录的条数来减少数据,而投影是通过减少数据集的字段数来减少数据。

 如果过滤去除的是很少量的数据,且过滤操作开销很高的话,那么更早更频繁地使用过滤就不一定有效。在实施这一实践时,一定要了解你的数据。

3.9.4 使用LIMIT操作符

很多时候,我们感兴趣的是取样或是取得结果集中最上面的几条记录。这时你可以用LIMIT操作符来做这个。正如我们在LimitOptimizer规则中看到的,LIMIT操作符将被推往上游,以减少整体的处理时间。

3.9.5 使用DISTINCT操作符

在Pig中有两种方法可以找出某个字段中有多少不同的元素:一种方法是使用GROUP操作符,然后生成分组键,另一种方法是使用DISTINCT操作符。后者比前者更高效。

3.9.6 减少操作

MergeForEachMergeFilter将连续的FOREACHFILTER语句合并成一个单一的FOREACHFILTER语句。设法找出那些可以合并多个操作的机会。在数据管道中减少操作符的数量可以提高Pig脚本的性能。

3.9.7 使用Algebraic UDF

当你要开发UDF,并且处理过程是代数性质,那么让UDF实现Algebraic接口是一个很好的实践。当Algebraic UDF作用在已经被分过组的数据时,将调用Combiner。在MapReduce中,使用Combiner将提高作业的性能。

3.9.8 使用Accumulator UDF

通过将输入数据分块,Accumulator UDF可以减少UDF所需的内存数。

3.9.9 剔除数据中的空记录

对关系做JOINGROUP操作会将所有的空(NULL)键分配到一个单一的Reduce任务。如果使用FLATTEN将分组解开,那么所有的空记录也将被剔除。然而,这个剔除是发生在Reduce任务执行以后。在JOINGROUP/COGROUP操作符之前主动过滤掉空记录,去除那些需要处理空键的Reduce任务,可以显著地提高脚本的性能。

3.9.10 使用特殊连接

普通连接的第二个输入被作为流传输,而不是被加载到内存中。这在Pig中是一种常规的连接优化。当连接不同大小的数据集时,更高效的做法是将数据量大的数据集作为连接的最后一个输入:

C = JOIN small_file BY s, large_file by F;

正如我们在3.6.2节中所看到的,Pig中还可以利用很多其他的连接优化。

3.9.11 压缩中间结果

Pig脚本可能被编译成多个MapReduce作业。每个作业都可能产生中间输出。可以用LZO压缩编码来压缩这些中间输出。这不仅有助于节省HDFS的存储,还可以帮助减少加载时间从而更快地执行作业。

pig.tmpfilecompression属性决定了是否压缩中间文件。默认情况下,该值为falsepig.tmpfilecompression.codec属性的值表示用于压缩的编码器。目前,这个参数的可用值是gzlzo。虽然GZIP压缩编码提供了更好的压缩,但它并不是首选,因为它的执行时间相对比较慢。

3.9.12 合并小文件

在第2章中,我们已经看到小文件带来的问题,以及CombineFileInputFormat的使用方法。Pig现在已经内置支持小文件的合并。这可以减少分片的数量,进而减少Map任务数。

可以将pig.splitCombination属性的值设为true来合并小文件。每个分片的大小由pig.maxCombinedSplitSize属性决定。将这个属性的值设为每个Map任务输入数据的建议大小(单位为字节)。小文件将被合并,直到达到这个限制值。

 自带的PigStorage加载器对于合并小文件很有效。如果你要写一个自定义的加载器,它必须是无状态地调用prepareToRead方法。此外,这个加载器不能实现IndexableLoadFuncOrderedLoadFuncCollectableLoadFunc接口。

3.10 小结

在本章中,我们探讨了Pig的一些高级特性,此外还深入了解了Pig提供的优化功能。本章学到的主要内容如下。

  • 一般说来,尽可能在更多的情况下尝试使用Pig。Pig的抽象、开发助手及其灵活性可以节省你的时间和金钱。在转换成MapReduce作业前伸展Pig的能力。

  • 逻辑计划优化可能会改变语句的执行顺序。广泛使用EXPLAINILLUSTRATE命令来学习Pig脚本。

  • 遵循本章提到的一些准则有助于Pig更快地执行脚本。努力让UDF实现AlgebraicAccumulator接口,两个都实现当然更理想。

  • 了解你正在尝试处理的数据。特殊问题特殊对待,某些类型的数据问题就可以采用专门的支持,比如数据倾斜连接可用于连接倾斜的数据。

下一章,我们会详细探讨Hive(Hadoop MapReduce上一种更高层面的SQL抽象)的一些高级特性。

目录

  • 版权声明
  • 推荐序一
  • 推荐序二
  • 译者序
  • 前言
  • 致谢
  • 第 1 章 Hadoop 2.X
  • 第 2 章 MapReduce进阶
  • 第 3 章 Pig进阶
  • 第 4 章 Hive进阶
  • 第 5 章 序列化和Hadoop I/O
  • 第 6 章 YARN——其他应用模式进入Hadoop的引路人
  • 第 7 章 基于YARN的Storm——Hadoop中的低延时处理
  • 第 8 章 云上的Hadoop
  • 第 9 章 HDFS替代品
  • 第 10 章 HDFS联合
  • 第 11 章 Hadoop安全
  • 第 12 章 使用Hadoop进行数据分析
  • 附录 微软Windows中的Hadoop