Spark SQL

enter image description here

  • Spark SQL可以从各种结构化数据源(如JSON,Hive,Parquet等)中读取数据
  • Spark SQL不仅支持在程序内进行SQL查询,也支持从类似商业智能软件Tableau等这样的外部工具通过数据库连接器(JDBC/ODBC)进行SQL查询
  • Spark SQL 支持 SQL 与常规的 Python/Java/Scala 代码高度整合,包括连接 RDD 与 SQL 表、公开的自定义 SQL 函数接口等。

为了实现这些功能,Spark SQL提供了一种特殊的RDD,叫做SchemaRDD: - SchemaRDD是存放Row对象的RDD,每个Row对象代表一行记录 - SchemaRDD还包含记录的结构信息(即数据字段)

Spark SQL最强大之处就是可以在Spark应用内使用。这种方式让我们可以轻松读取数据并使用SQL查询,同时还能把这一过程和普通的Python/Java/Scala程序代码结合在一起

Spark SQL是基于已有的SparkContext创建一个HiveContext,HiveContext可以创建出表示结构化数据的SchemalRDD,并且使用SQL或类似map()的普通RDD操作

Spark SQL的应用

scala:

// 导入Spark SQL
import org.apache.spark.sql.hive.HiveContext
// 如果不能使用hive依赖的话
import org.apache.spark.sql.SQLContext

//导入隐式转换支持(隐式转换被用来把带有类型信息的 RDD 转变为专门用于 Spark SQL 查询的 RDD(即SchemaRDD))
// 创建Spark SQL的HiveContext
val hiveCtx = ...
// 导入隐式转换支持
import hiveCtx._

//创建SQL上下文
val sc = new SparkContext(...)
val hiveCtx = new HiveContext(sc)

//查询示例
val input = hiveCtx.jsonFile(inputFile)
// 注册输入的SchemaRDD
input.registerTempTable("tweets")
// 依据retweetCount(转发计数)选出推文
val topTweets = hiveCtx.sql("SELECT text, retweetCount FROM
tweets ORDER BY retweetCount LIMIT 10")

SchemaRDD

从内部机理来看,SchemaRDD 是一个由 Row 对象组成的 RDD,附带包含每列数据类型的结构信息。Row 对象只是对基本数据类型(如整型和字符串型等)的数组的封装。

SchemaRDD中可以存储的数据类型 enter image description here

Row 对象表示 SchemaRDD 中的记录,其本质就是一个定长的字段数组。

缓存

Spark SQL 的缓存机制与 Spark 中的稍有不同。由于我们知道每个列的类型信息,所以 Spark 可以更加高效地存储数据。
当缓存数据表时,Spark SQL 使用一种列式存储格式在内存中表示数据。这些缓存下来的表只会在驱动器程序的生命周期里保留在内存中,所以如果驱动器进程退出,就需要重新缓存数据。和缓存 RDD 时的动机一样,如果想在同样的数据上多次运行任务或查询时,就应把这些数据表缓存起来

读取与存储数据

  • Apache Hive

    当从 Hive 中读取数据时,Spark SQL 支持任何 Hive 支持的存储格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro,以及 Protocol Buffer。

scala:

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key, value FROM mytable")
val keys = rows.map(row => row.getInt(0))
  • Parquet

    Parquet(http://parquet.apache.org/)是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。

scala:

//读取数据
HiveContext.parquetFile 或者 SQLContext.parquetFile
//存储数据
saveAsParquetFile()
  • JSON

    如果你有一个 JSON 文件,其中的记录遵循同样的结构信息,那么 Spark SQL 就可以通过扫描文件推测出结构信息,并且让你可以使用名字访问对应字段,而无需编写专门的代码来读取不同结构的文件。

scala:

//读取数据
val input = hiveCtx.jsonFile(inputFile)
  • 基于RDD创建SchemaRDD

    带有 case class 的 RDD 可以隐式转换成 SchemaRDD。

scala:

case class HappyPerson(handle: String, favouriteBeverage: String)
...
// 创建了一个人的对象,并且把它转成SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
// 注意:此处发生了隐式转换
// 该转换等价于sqlCtx.createSchemaRDD(happyPeopleRDD)
happyPeopleRDD.registerTempTable("happy_people")
  • 连接JDBC/ODBC服务器

scala:

//启动JDBC服务器
./sbin/start-thriftserver.sh --master sparkMaster

//使用Spark自带的Beelinek客户端程序连接服务器
holden@hmbp2:~/repos/spark$ ./bin/beeline -u jdbc:hive2://localhost:10000
Spark assembly has been built with Hive, including Datanucleus jars on classpath
scan complete in 1ms
Connecting to jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 1.2.0-SNAPSHOT)
Driver: spark-assembly (version 1.2.0-SNAPSHOT)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.0-SNAPSHOT by Apache Hive
0: jdbc:hive2://localhost:10000> show tables;
+---------+
| result  |
+---------+
| pokes   |
+---------+
1 row selected (1.182 seconds)
0: jdbc:hive2://localhost:10000>

用户自定义函数UDF

Spark SQL 不仅有自己的 UDF 接口,也支持已有的 Apache Hive UDF。

  • Spark SQL UDF

scala:

registerFunction("strLenScala", (_: String).length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
  • Hive UDF

标准的 Hive UDF 已经自动包含在了 Spark SQL 中。如果需要支持自定义的 Hive UDF,我们要确保该 UDF 所在的 JAR 包已经包含在了应用中。

scala:

//注册Hive UDF
hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")。

Spark SQL性能

Spark SQL中的性能选项

enter image description here

  • Beeline配置

scala:

beeline> set spark.sql.codegen=true;
SET spark.sql.codegen=true
spark.sql.codegen=true
Time taken: 1.196 seconds
  • Spark传统配置

scala:

conf.set("spark.sql.codegen", "true")

配置说明:

  • spark.sql.codegen

  • 这个选项可以让 Spark SQL 把每条查询语句在运行前编译为 Java 二进制代码。由于生成了专门运行指定查询的代码,codegen 可以让大型查询或者频繁重复的查询明显变快。
  • 在运行特别快(1 ~ 2 秒)的即时查询语句时,codegen 有可能会增加额外开销,因为 codegen 需要让每条查询走一遍编译的过程。
  • codegen 还是一个试验性的功能,但是我们推荐在所有大型的或者是重复运行的查询中使用 codegen。

  • spark.sql.inMemoryColumnarStorage.batchSize

在缓存 SchemaRDD 时,Spark SQL 会按照这个选项制定的大小(默认值是 1000)把记录分组,然后分批压缩。太小的批处理大小会导致压缩比过低,而批处理大小过大的话,也有可能引发内存问题。如果你表中的记录比较大,你就可能需要调低批处理大小来避免内存不够(OOM)的错误。如果不是在这样的场景下,默认的批处理大小是比较合适的,因为压缩超过 1000 条记录时也基本无法获得更高的压缩比了。