数据读取与保存

  • 文件格式与文件系统

对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的数据,Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer。我们会展示几种常见格式的用法,以及 Spark 针对不同文件系统的配置和压缩选项。

  • Spark SQL中的结构化数据源

通过Spark SQL 模块,针对包括 JSON 和 Apache Hive 在内的结构化数据源,提供了一套更加简洁高效的 API。

  • 数据库与键值存储

    Spark 自带的库和一些第三方库,它们可以用来连接 Cassandra、HBase、Elasticsearch 以及 JDBC 源。

支持的文本格式

enter image description here

文本文件

  • 读取文件

    只需要使用文件路径作为参数调用 SparkContext 中的 textFile() 函数,就可以读取一个文本文件
    如果要控制分区数的话,可以指定 minPartitions

scala:

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

有时候我们希望同时处理多个文件,除上述方法外,也可以用SparkContext.wholeTextFiles() 方法,该方法会返回一个 pair RDD,其中键是输入文件的文件名。
scala:

val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble}
  • 保存文件

scala:

result.saveAsTextFile(outputFile)

JSON

  • 读取文件

    将数据作为文本文件读取,然后对 JSON 数据进行解析,这样的方法可以在所有支持的编程语言中使用。这种方法假设文件中的每一行都是一条 JSON 记录。如果你有跨行的 JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。如果在你使用的语言中构建一个 JSON 解析器的开销较大,你可以使用 mapPartitions() 来重用解析器。

scala:

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature

case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类
// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
// 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
    try {
    Some(mapper.readValue(record, classOf[Person]))
    } catch {
case e: Exception => None
}})
  • 保存文件

scala:

//筛选后写入文件
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

CSV(逗号分隔)和 TSV(制表符分隔)

  • 读取文件

scala:

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
    reader.readNext();
}

如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段
scala:

case class Person(name: String, favoriteAnimal: String)

val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
    reader.readAll().map(x => Person(x(0), x(1)))
}
  • 保存文件

scala:

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

SequenceFile

SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。SequenceFile 文件有同步标记,Spark可以用它来定位到文件中的某个点,然后再与记录的边界对齐。这可以让 Spark 使用多个节点高效地并行读取 SequenceFile 文件。
SequenceFile 也是 Hadoop MapReduce 作业中常用的输入输出格式,所以如果你在使用一个已有的 Hadoop 系统,数据很有可能是以 SequenceFile 的格式供你使用的。

由于 Hadoop 使用了一套自定义的序列化框架,因此 SequenceFile 是由实现 Hadoop 的 Writable 接口的元素组成。

enter image description here

  • 读取SequenceFile

    可以用sequenceFile(path, keyClass, valueClass, minPartitions),其中keyClass 和 valueClass 参数都必须使用正确的 Writable 类

scala:

val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case (x, y) => (x.toString, y.get())}
  • 保存SequenceFile

    因为 SequenceFile 存储的是键值对,所以需要创建一个由可以写出到 SequenceFile 的类型构成的 PairRDD。
    我们已经进行了将许多 Scala 的原生类型转为 Hadoop Writable 的隐式转换,所以如果你要写出的是 Scala 的原生类型,可以直接调用 saveSequenceFile(path) 保存你的 PairRDD,它会帮你写出数据。
    如果键和值不能自动转为 Writable 类型,或者想使用变长类型(比如 VIntWritable),就可以对数据进行映射操作,在保存之前进行类型转换

scala:

val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

对象文件

对象文件看起来就像是对 SequenceFile 的简单封装,它允许存储只包含值的 RDD。和 SequenceFile 不一样的是,对象文件是使用 Java 序列化写出的。

  • 读取
    用 SparkContext 中的 objectFile() 函数接收一个路径,返回对应的 RDD。

  • 保存
    要保存对象文件,只需在 RDD 上调用 saveAsObjectFile 就行了

Hadoop输入输出格式

  • 读取其他Hadoop输入格式

hadoopFile()用于旧的API实现Hadoop输入格式
每一行都会被独立处理,键和值之间用制表符隔开。这个格式存在于 Hadoop 中,所以无需向工程中添加额外的依赖就能使用它。

scala:

val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile)
.map{
    case (x, y) => (x.toString, y.toString)
}

newAPIHadoopFile() 接收一个路径以及三个类。第一个类是“格式”类,代表输入格式。

scala:

//在 Scala 中使用 Elephant Bird 读取 LZO 算法压缩的 JSON 文件
val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
// "输入"中的每个MapWritable代表一个JSON对象
  • 保存Hadoop输出格式

    旧接口用saveAsHadoopFile(),新接口用saveAsNewAPIHadoopFile()
    非文件系统数据源可以使用hadoopDataset/saveAsHadoopDataSetnewAPIHadoopDataset/saveAsNewAPIHadoopDataset 来访问 Hadoop 所支持的非文件系统的存储格式。

scala:

val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
    val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
    protoWritable.set(pb)
    (null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text],
    classOf[ProtobufWritable[Places.Venue]],
    classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)

文件压缩

在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数 Hadoop 输出格式来说,我们可以指定一种压缩编解码器来压缩数据

textFile()sequenceFile()读取文件,最好不要考虑使用spark的封装,使用newAPIHadoopFile() 或者 hadoopFile(),并指定正确的压缩编解码器。

文件系统

  • 本地/“常规”文件系统

    本地文件要求在集群中所有节点的相同路径下都可以找到,路径形式:file:// 路径

  • Amazon S3

    路径形式:s3n:// 开头的路径以 s3n://bucket/path-within-bucket。s3支持通配符:s3n://bucket/my-Files/*.txt

  • HDFS

    Hadoop 分布式文件系统(HDFS)是一种广泛使用的文件系统,HDFS被设计为可以在硬件上工作,有弹性地应对节点失败且提供高吞吐量。路径形式:hdfs://master:port/path

Spark SQL中的结构化数据

在各种情况下,我们把一条 SQL 查询给 Spark SQL,让它对一个数据源执行查询(选出一些字段或者对字段使用一些函数),然后得到由 Row 对象组成的 RDD,每个 Row 对象表示一条记录。在 Java 和 Scala 中,Row 对象的访问是基于下标的。每个 Row 都有一个 get() 方法,会返回一个一般类型让我们可以进行类型转换。另外还有针对常见基本类型的专用 get() 方法(例如 getFloat()、getInt()、getLong()、getString()、getShort()、getBoolean() 等)

Apache Hive

Apache Hive 是 Hadoop 上的一种常见的结构化数据源。Hive 可以在 HDFS 内或者在其他存储系统上存储多种格式的表。这些格式从普通文本到列式存储格式,应有尽有。Spark SQL 可以读取 Hive 支持的任何表。

scala:

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段

JSON

如果你有记录间结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。

scala:

val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

Spark连接数据库

Java数据库连接

Spark 可以从任何支持 Java 数据库连接(JDBC)的关系型数据库中读取数据,包括 MySQL、Postgre 等系统。要访问这些数据,需要构建一个 org.apache.spark.rdd.JdbcRDD,将 SparkContext 和其他参数一起传给它。

scala:

//用于对数据库创建连接的函数
def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}

//将输出结果从 java.sql.ResultSet转为对操作数据有用的格式的函数
def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}

//读取一定范围内数据的查询,以及查询参数中 lowerBound 和 upperBound 的值
val data = new JdbcRDD(sc,
    createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
    lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

Cassandra

随着 DataStax 开源其用于 Spark 的 Cassandra 连接器

Maven 依赖:

<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector</artifactId>
    <version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java</artifactId>
    <version>1.0.0-rc5</version>
</dependency>

scala:

//配置Cassandra属性
val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "hostname")

val sc = new SparkContext(conf)

//对整张键值对表读取为RDD
// 为SparkContext和RDD提供附加函数的隐式转换
import com.datastax.spark.connector._

// 将整张表读为一个RDD。假设你的表test的创建语句为
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
val data = sc.cassandraTable("test" , "kv")
// 打印出value字段的一些基本统计。
data.map(row => row.getInt("value")).stats()

在scala中保存数据到Cassandra

val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

HBase

scala:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 扫描哪张表

val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])

Elasticsearch

Spark 可以使用 Elasticsearch-Hadoop,从 Elasticsearch 中读写数据。Elasticsearch 是一个开源的、基于 Lucene 的搜索系统。

在 Scala 中使用 Elasticsearch 输出

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)

在 Scala 中使用 Elasticsearch 输入

def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
val currentTweets = sc.hadoopRDD(jobConf,
    classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
    classOf[MapWritable])
// 仅提取map
// 将MapWritable[Text, Text]转为Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }