spark教程(九)-sparkSQL 和 RDD-DF-DS 关系

spark教程(九)-sparkSQL 和 RDD-DF-DS 关系

sparkSQL 的由来

我们知道最初的计算框架叫 mapreduce,他的缺点是计算速度慢,还有一个就是代码比较麻烦,所以有了 hive;

hive 是把类 sql 的语句转换成 mapreduce,解决了开发难的问题,但是 hive 的底层还是 mapreduce,仍然是慢;

spark 也看到了 hive 的优势,以 hive 为中心的一套框架 shark 营运而生,它是 spark 的前身,h 就是 hive 的意思;

但是 为了 提高 shark 的效率,spark 自己开发了一套算法,替代了之前 hive 的思路,这套算法就是 sparkSQL

sparkSQL 简介

sparkSQL 是 spark 专门处理结构化数据的一个模块,也就是像数据表一样的数据,处理方式就是像 sql 一样;

换句话说,sparkSQL 使用 sql 的方式代替了之前数据处理的方式。

sparkSQL 提供了两个编程抽象:DataFrame 和 DataSet,起到了分布式 SQL 查询引擎的作用;

sparkSQL 把 sql 语句 和 dataFrame、dataSet 转换成了 RDD,执行效率非常快;

也就是说 dataFrame、dataSet 的底层仍然 是 RDD,并且可以互相转换

sparkSQL 的特点

官方解释

易整合

兼容 hive

统一的数据访问方式:用同样的方式读取各类文件

标准的数据库连接:可以通过 JDBC 或者 ODBC 连接标准数据库

后面会详细解释。

DataFrame (df)

与 RDD 类似,df 也是分布式的数据容器,不同的是,df 更像一个 二维数据表,除了数据本身外,还包含了数据的结构信息,即 schema;

df 的 API 提供了更高层的关系操作,比函数式的 RDD API 更加友好;

df 的底层仍是 RDD,所以 df 也是惰性执行的,但值得注意的是,它比 RDD 性能更高

问题来了:为什么底层实现是 RDD,却比 RDD 更快,不合常理啊

其实是这样的,因为 df 是由 spark 自己转换成 RDD 的,那么 spark 自然会用最合适的、最优化的方式转换成 RDD,因为它比任何人都清楚怎么才能更高效,

对比我们自己操作 RDD 去实现各种功能,大部分情况下我们的作法可能不是最优,自己玩不如作者玩,所以说 df 性能高于 RDD

举个简单例子:

data1 = sc.parallelize([('1','a'), ('2', 'b'), ('3', 'c')])
data2 = sc.parallelize([('1','1'), ('2', '2'), ('3', '3')])
### 找到两个list中 key 为 1 的对应值的集合

## 自己写可能这么写
data1.join(data2).collect() # [('1', ('a', '1')), ('3', ('c', '3')), ('2', ('b', '2'))]
data1.join(data2).filter(lambda x: x[0] == '1').collect()   # [('1', ('a', '1'))]

## spark 可能这么写
data1.filter(lambda x: x[0] == '1').join(data2.filter(lambda x: x[0] == '1')).collect() # [('1', ('a', '1'))]

为什么 spark 这么写快呢?这里简单解释下

join 是把 两个元素做 笛卡尔內积,生成了 3x3=9 个元素,然后 shuffle,每个分区分别比较 key 是否相同,如果相同,合并,然后合并分区结果;

我们自己写的就是这样,shuffle 了 9 个元素;

而 spark 是先 filter,每个 list 变成了 一个元素,然后 join,join 的结果直接就是所需,不用 shuffle;

shuffle 本身是耗时的,而 filter 无需 shuffle,所以效率高      【join 是个 低效方法的原因】

小结

1. df 也是一个查询优化的手段

2. df 允许我们像操作数据库一样操作它

DataSet

DataSet 是 DataFrame 的扩展,是 spark 最新的数据抽象;

dataSet 像个对象,允许我们像操作类一样操作它,通过属性查看数据;

实际上 DataSet 是在 df 的基础上增加了数据类型(这样表述或许不太准确,可以理解为 类或者对象)的概念

python 目前不支持 dataSet,所以后续支持了再说 

rdd-df-dataset 

发展历程

RDD(spark1.0)  ===>  DataFrame(spark1.3)  ===> DataSet(spark1.6)

转换逻辑

rdd + 表结构 = df

rdd + 表结构 + 数据类型 = ds

df + 数据类型 = ds

ds - 数据类型 - 表结构 = rdd

ds - 数据类型 = df

df - 表结构 = rdd

转换方法

具体如何转换,我后续会专门写一篇博客

SparkSession

在老版本中,sparkSQL 提供了两种 SQL 查询的起始点:

SQLContext,用于 spark 自己提供的 SQL 查询;

HiveContext,用于连接 hive 的查询

sparkSession 是新版的 SQL 查询起始点,实质上是组合了 SQLContext 和 HiveContext;

sparkSession 只是封装了 sparkContext,sparkContext 包含 SQLContext 和 HiveContext;

所以 sparkSession 实际上还是 依靠 sparkContext 实现了 SQLContext 和 HiveContext,故老版本用法也适用新版本。

sparkSession 直接生成 df

DataFrame 的创建

sparkSession 自动生成 df

df 的创建有 3 种方式

从 spark 的数据源创建:读取 spark 支持的文件

从内部 RDD 创建:RDD 转换成 df

从 hive 创建:hive 查询

spark 数据源创建df

spark 支持的文件格式都有统一的入口

>>> dir(spark.read)
[ 'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text']

json

文件读取方式都一样,所以仅以 json 为例

##  json 文件如下
# {'age': '10','name': 'zhangsan'}
# {'age': '20','name': 'lisi'}

df1 = spark.read.json('data.json')    # 相对路径
# >>> df1     # DataFrame[age: string, name: string] 可以看到 df 具备了字段名和字段属性

df1.show()
# +---+--------+
# |age|    name|
# +---+--------+
# | 10|zhangsan|
# | 20|    lisi|
# +---+--------+

df2 = spark.read.json('file:///usr/lib/spark/data.json')  # 绝对路径

RDD 创建 df

待续...

hive 创建 df

待续...

sparkSQL 操作 df

sparkSQL 有两种风格,这里只介绍简单用法,详细用法后续会一一介绍。

SQL 风格操作 df

df 生成后,怎么写 sql 呢?没表啊

所以还需创建表,方法可能不止一种,比如

### 创建临时视图
df1.createTempView('student')
# df1.createOrReplaceTempView('student')    # ok
spark.sql('select * from student').show()
# +---+--------+
# |age|    name|
# +---+--------+
# | 10|zhangsan|
# | 20|    lisi|
# +---+--------+

spark.sql('select age from student').show()
spark.sql('select avg(age) from student').show()
# +------------------------+
# |avg(CAST(age AS DOUBLE))|
# +------------------------+
# |                    15.0|
# +------------------------+

到这里有个问题,不知道大家想到没?

这张表在哪呢?能不能重复操作它呢?退出 session 还能操作吗?或者换个 session 还能操作吗?

session

这里穿插讲下 session 的概念;

session 的本意是会话,我们在多个场合都见过 session,如 web,如 tensorflow,但是在 web 中貌似不是 会话啊;

其实是这样的,session 有广义和狭义之分

广义 session:就是我们说的会话

狭义 session:它是一个存储位置,和 cookie 相对,cookie 是把某个信息存在客户度,session 是把 某个信息存在服务器上

全局表 

临时表是在 session 范围内的,session 关闭后,临时表失效,如果想应用范围内有效,可以使用全局表,

全局表需要全路径访问

### 为了在应用范围内使用数据表,创建全局表
df1.createGlobalTempView('people')
## 查询
spark.sql('select * from global_temp.people').show()    # global_temp.people 全路径访问表


## 在另一个 session 中查询该表
spark.newSession().sql('select * from global_temp.people').show()

DSL 风格操作 df

不常用

df1.printSchema() # 打印表结构
# root
#  |-- age: string (nullable = true)
#  |-- name: string (nullable = true)

df1.select('name').show()   # 查询name字段
df1.select("name", df1.age + 1).show() # age 字段的值都 加1,scala 中是用 $'age' 代替 df.age
# +--------+---------+
# |    name|(age + 1)|
# +--------+---------+
# |zhangsan|     11.0|
# |    lisi|     21.0|
# +--------+---------+

df1.filter(df1.age > 15).show()  # 查看 age 大于 15
# +---+----+
# |age|name|
# +---+----+
# | 20|lisi|
# +---+----+

总结

sparkSQL 已逐渐成为主流,代替了 RDD 操作,所以必须掌握哦