RDD键值对操作

创建pair RDD

scala:

val pairs = lines.map(x => (x.split(" ")(0), x))

转化操作

Pair RDD的转化操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例) enter image description here enter image description here

针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)}) enter image description here

scala:

pairs.filter{case (key, value) => value.length < 20}

1. 聚合操作(转化操作!)

类似于转化操作,reducebykey(),foldbykey(),combinebykey()
scala:

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

scala:

val result = input.combineByKey((v) => (v, 1),(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
.map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))

mapvalues(func)可以将rdd转成pairrdd

  • 并行度调优

Method1: 通过分组或聚合的动作

scala:

val data = Seq(("a", 3), ("b", 4), ("a", 1))  
sc.parallelize(data).reduceByKey((x, y) => x + y)    // 默认并行度
sc.parallelize(data).reduceByKey((x, y) => x + y, 10)    // 自定义并行度

Method2: 分区操作

  • repartition()

    它会把数据通过网络进行混洗,并创建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作

  • coalesce()

    repartition()的优化版本

2.数据分组

  • groupByKey()(对单个RDD)

    对于一个由类型 K 的键和类型 V 的值组成的 RDD,所得到的结果 RDD 类型会是 [K, Iterable[V]]。

  • cogroup()(对多个RDD)

    cogroup() 不仅可以用于实现连接操作,还可以用来求键的交集。cogroup()对两个键的类型均为 K 而值的类型分别为 V 和 W 的 RDD 进行 cogroup() 时,得到的结果 RDD 类型为 [(K, (Iterable[V], Iterable[W]))]

3.连接

  • join()

scala:

val storeAddress = sc.parallelize(Seq(
   (Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
   (Store("Philz"),"3101 24th St"), (Store("Starbucks"), "Seattle")))
val storeRating = sc.parallelize(Seq(
   (Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
storeAddress.join(storeRating)

join(内连接),leftOuterJoin(other),rightOuterJoin(other)

4.排序

  • sortbykey()

scala:

val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()

排序参数,assending(默认)
当把数据排好序后,后续对数据进行 collect() 或 save() 等操作都会得到有序的数据。

Pair RDD的行动操作

Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例) enter image description here

数据分区

在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能,spark可以通过控制RDD的分区来优化通信开销

enter image description here

如上图所示,userData表 join events表

Q: 默认情况下,连接操作会将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作
A: 在程序开始时,对 userData 表使用 partitionBy() 转化操作,将这张表转为哈希分区。可以通过向 partitionBy 传递一个 spark.HashPartitioner 对象来实现该操作

scala:

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
             .partitionBy(new HashPartitioner(100))   // 构造100个分区
             .persist()

1.partitionBy() 是一个转化操作,因此它的返回值总是一个新的 RDD,但它不会改变原来的 RDD。RDD 一旦创建就无法修改。因此应该对 partitionBy() 的结果进行持久化,并保存为 userData,而不是原来的 sequenceFile() 的输出
2.传给 partitionBy() 的 100 表示分区数目,它会控制之后对这个 RDD 进行进一步操作(比如连接操作)时有多少任务会并行执行。总的来说,这个值至少应该和集群中的总核心数一样。
3. 如果没有将 partitionBy() 转化操作的结果持久化,那么后面每次用到这个 RDD 时都会重复地对数据进行分区操作。不进行持久化会导致整个 RDD 谱系图重新求值。那样的话,partitionBy() 带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况十分相似。

  • 自定义分区(partitioner())

scala:

class DomainNamePartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
  val domain = new Java.net.URL(key.toString).getHost()
  val code = (domain.hashCode % numPartitions)
  if(code < 0) {
    code + numPartitions // 使其非负
  }else{
    code
  }
}
// 用来让Spark区分分区函数对象的Java equals方法
override def equals(other: Any): Boolean = other match {
  case dnp: DomainNamePartitioner =>
    dnp.numPartitions == numPartitions
  case _ =>
    false
  }
}

numPartitions: Int:返回创建出来的分区数。
getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。