Spark SQL官网阅读笔记 分布式SQL引擎

Spark SQL是Spark中用于结构化数据处理的组件。

Spark SQL可以从Hive中读取数据。

执行结果是Dataset/DataFrame。

DataFrame是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

DataSet是Spark 1.6之后加入的,同时提供了RDD和Spark SQL执行引擎的优点。可以从jvm对象创建,然后通过transformation算子(mapflatMapfilter, etc)转换得到。

DataFrame被DataSet中的RowS替代

Scala中用DataSet[Row],Java中用DataSet<Row>。

SparkSession

Spark中所有功能的入口点是SparkSession类(Spark 1.x叫SQLContext http://spark.apache.org/docs/2.0.0/api/java/index.html#org.apache.spark.sql.SparkSession

Spark SQL官网阅读笔记
分布式SQL引擎

Spark 2.0内置支持Hive,如使用HiveQL查询,访问Hive UDFs,从Hive获取数据。不需要安装Hive。

 

创建DataFrames

使用SparkSession,可以从已有的RDD,Hive表,或Spark数据源创建DataFrames。

Spark SQL官网阅读笔记
分布式SQL引擎

Dataset 操作(也叫做 DataFrame 操作)

 Spark SQL官网阅读笔记
分布式SQL引擎

API文档: http://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.sql.Dataset

 

运行 SQL 查询

查询结果是DataFrame类型。

Spark SQL官网阅读笔记
分布式SQL引擎

创建 Datasets

Spark SQL官网阅读笔记
分布式SQL引擎

与RDD互操作

 两种方式

1.反射

这种基于反射的方法可以得到更简洁的代码,并且在编写Spark应用程序时,当已经知道模式时,它可以很好地工作。

Spark SQL官网阅读笔记
分布式SQL引擎

2.通过编程接口创建

Spark SQL官网阅读笔记
分布式SQL引擎

数据源

load、save

1. 默认数据源

parquet

Spark SQL官网阅读笔记
分布式SQL引擎

2.手动指定

Spark SQL官网阅读笔记
分布式SQL引擎

3.在文件上直接运行SQL

Spark SQL官网阅读笔记
分布式SQL引擎

保存模式

保存操作的时候可以指定一个SaveMode

Spark SQL官网阅读笔记
分布式SQL引擎

 存储到持久化表中

可以使用saveAsTable将DataFrame存储到Hive metastore中。saveAsTable会实例化在Hive metastore中的DataFrame内容,并创建一个指针指向它。持久化表会一直存在,即使重启了Spark,只要保持同一个metastore的连接。

Parquet 文件

Spark SQL支持对Parquet文件的读写。

Spark SQL官网阅读笔记
分布式SQL引擎

分区发现

表分区是Hive等系统中常用的优化方法。

从spark 1.6.0开始,默认情况下,分区发现仅查找给定路径下的分区

Schema 合并

和 ProtocolBuffer, Avro, and Thrift, Parquet 也支持schema变化。可以增加列。

但是代价高。

1.5.0之后默认被关闭了。

Hive metastore 和Parquet table的转换

当从Hive metastore中读写Parquet table时,Spark SQL为了更好的性能,会尝试使用它自己的支持而不是Hive SerDe。这个行为由spark.sql.hive.convertMetastoreParquet配置,默认开启。

Metadata 刷新

Spark SQL缓存了Parquet metadata

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

Spark SQL官网阅读笔记
分布式SQL引擎

JSON Datasets

请注意,作为JSON文件提供的文件不是典型的JSON文件。每一行必须包含一个独立的、自包含的有效JSON对象。因此,常规的多行JSON文件通常会失败。

 Spark SQL官网阅读笔记
分布式SQL引擎

Hive Tables

Spark SQL支持读写存储在Hive中的数据

注意hive-site.xml 中的hive.metastore.warehouse.dir 从Spark 2.0.0开始已经过时了,用spark.sql.warehouse.dir

Spark SQL官网阅读笔记
分布式SQL引擎

与不同版本的Hive metastore交互

Spark SQL官网阅读笔记
分布式SQL引擎

使用JDBC和其它数据库交互

最好使用JdbcRDD

性能调优

可以通过调用spark.cacheTable("tableName") 或 dataFrame.cache().使Spark SQL以列格式缓存表。然后spark sql将只扫描所需的列,并自动调整压缩以最小化内存使用和GC压力。

调用spark.uncacheTable("tableName")移除缓存中表。

 通过SparkSession或在SQL中以SET Key = Value形式来设置。

Spark SQL官网阅读笔记
分布式SQL引擎

Spark SQL还可以使用其JDBC/ODBC或命令行接口作为分布式查询引擎。在这种模式下,最终用户或应用程序可以直接与Spark SQL交互以运行SQL查询,而无需编写任何代码。

 运行Thrift JDBC/ODBC服务器

./sbin/start-thriftserver.sh

This script accepts all bin/spark-submit command line options, plus a --hiveconf option to specify Hive properties. You may run ./sbin/start-thriftserver.sh --help for a complete list of all available options. By default, the server listens on localhost:10000. You may override this behaviour via either environment variables, i.e.:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh 
  --master <master-uri> 
  ...

or system properties:

./sbin/start-thriftserver.sh 
  --hiveconf hive.server2.thrift.port=<listening-port> 
  --hiveconf hive.server2.thrift.bind.host=<listening-host> 
  --master <master-uri>
  ...

Now you can use beeline to test the Thrift JDBC/ODBC server:

./bin/beeline

Connect to the JDBC/ODBC server in beeline with:

beeline> !connect jdbc:hive2://localhost:10000