Spark学习--SparkCore02 Action算子 RDD 的 Shuffle 和分区 缓存 Checkpoint

Action function 解释
reduce((T,T)=>U) 对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总
count() 返回元素个数
collect() 以数组形式返回数据集中所有元素
first() 返回第一个元素
take(N) 返回前N个元素
countByKey() Key对应Key出现的次数
foreach(T=>...) 遍历
saveAsTextFile(path) 保存文件
takeSample(withReplacement,num) 类似于sample

RDD 对数字型数据的额外支持

算子 含义

count

个数

mean

均值

sum

求和

max

最大值

min

最小值

variance

方差

sampleVariance

从采样中计算方差

stdev

标准差

sampleStdev

采样的标准差

RDD 的 Shuffle 和分区

分区的作用

RDD 使用分区来分布式并行处理数据, 并且要做到尽量少的在不同的 Executor 之间使用网络交换数据, 所以当使用 RDD 读取数据的时候, 会尽量的在物理上靠近数据源, 比如说在读取 Cassandra 或者 HDFS 中数据的时候, 会尽量的保持 RDD 的分区和数据源的分区数, 分区模式等一一对应

分区和 Shuffle 的关系

分区的主要作用是用来实现并行计算, 本质上和 Shuffle 没什么关系, 但是往往在进行数据处理的时候, 例如 reduceByKey,groupByKey等聚合操作, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区, 于是理解分区才能理解 Shuffle 的根本原理

Spark 中的 Shuffle 操作的特点

  • 只有 Key-Value`型的 RDD 才会有 Shuffle 操作, 特例: repartition 算子可以对任何数据类型 Shuffle
  • 早期版本 Spark 的 Shuffle 算法是 Hash base shuffle, 后来改为 Sort base shuffle, 更适合大吞吐量的场景

RDD 的分区操作

  1. 查看分区数:rdd.partitions.size
  2. 创建分区:创建 RDD 时指定分区数、coalesce 算子指定、repartition 算子指定

RDD 的 Shuffle 是什么

Spark学习--SparkCore02
Action算子
RDD 的 Shuffle 和分区
缓存
Checkpoint

Spark学习--SparkCore02
Action算子
RDD 的 Shuffle 和分区
缓存
Checkpoint

reduceByKey 这个算子本质上就是先按照 Key 分组, 后对每一组数据进行 reduce, 所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中, 甚至可能在不同的节点中, 但是它们必须被共同计算.

为了让来自相同 Key 的所有数据都在 reduceByKey 的同一个 reduce 中处理, 需要执行一个 all-to-all 的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做 Shuffle.

RDD 的 Shuffle 原理

Hash base shuffle

Spark学习--SparkCore02
Action算子
RDD 的 Shuffle 和分区
缓存
Checkpoint

大致的原理是分桶, 假设 Reducer 的个数为 R, 那么每个 Mapper 有 R 个桶, 按照 Key 的 Hash 将数据映射到不同的桶中, Reduce 找到每一个 Mapper 中对应自己的桶拉取数据.

假设 Mapper 的个数为 M, 整个集群的文件数量是 M * R, 如果有 1,000 个 Mapper 和 Reducer, 则会生成 1,000,000 个文件, 这个量非常大了.

过多的文件会导致文件系统打开过多的文件描述符, 占用系统资源. 所以这种方式并不适合大规模数据的处理, 只适合中等规模和小规模的数据处理, 在 Spark 1.2 版本中废弃了这种方式.

Sort base shuffle

Spark学习--SparkCore02
Action算子
RDD 的 Shuffle 和分区
缓存
Checkpoint

对于 Sort base shuffle 来说, 每个 Map 侧的分区只有一个输出文件, Reduce 侧的 Task 来拉取, 大致流程如下

  1. Map 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中, 同时可以在这个特殊的数据结构中做聚合操作

  2. 然后通过一个类似于 MergeSort 的排序算法 TimSort 对 AppendOnlyMap 底层的 Array 排序

    • 先按照 Partition ID 排序, 后按照 Key 的 HashCode 排序

  3. 最终每个 Map Task 生成一个 输出文件, Reduce Task 来拉取自己对应的数据

从上面可以得到结论, Sort base shuffle 确实可以大幅度减少所产生的中间文件, 从而能够更好的应对大吞吐量的场景, 在 Spark 1.2 以后, 已经默认采用这种方式.

缓存

使用缓存的意义?

使用缓存不仅可以容错,还能够帮助开发者在进行一些昂贵操作后,如 一个RDD 需要重复多次利用, 将其结果保存下来, 以便下次使用无需再次执行, 提升性能.

缓存相关API

.cache()

.persist(StorageLevel.XXX):可以指定缓存级别

缓存级别 userDisk 是否使用磁盘 useMemory 是否使用内存 useOffHeap 是否使用堆外内存 deserialized 是否以反序列化形式存储 replication 副本数

NONE

false

false

false

false

1

DISK_ONLY

true

false

false

false

1

DISK_ONLY_2

true

false

false

false

2

MEMORY_ONLY

false

true

false

true

1

MEMORY_ONLY_2

false

true

false

true

2

MEMORY_ONLY_SER

false

true

false

false

1

MEMORY_ONLY_SER_2

false

true

false

false

2

MEMORY_AND_DISK

true

true

false

true

1

MEMORY_AND_DISK

true

true

false

true

2

MEMORY_AND_DISK_SER

true

true

false

false

1

MEMORY_AND_DISK_SER_2

true

true

false

false

2

OFF_HEAP

true

true

true

false

1

.unpersist():清除缓存

Checkpoint

作用

Checkpoint 的主要作用是斩断 RDD 的依赖链, 并且将数据存储在可靠的存储引擎中, 例如支持分布式存储和副本机制的 HDFS.

Checkpoint和Cache的区别

Cache 可以把 RDD 计算出来然后放在内存中, 但是 RDD 的依赖链(相当于 NameNode 中的 Edits 日志)是不能丢掉的, 因为这种缓存是不可靠的, 如果出现了一些错误(例如 Executor 宕机), 这个 RDD 的容错就只能通过回溯依赖链, 重放计算出来.

但是 Checkpoint 把结果保存在 HDFS 这类存储中, 就是可靠的了, 所以可以斩断依赖, 如果出错了, 则通过复制 HDFS 中的文件来实现容错.

所以他们的区别主要在以下两点

  • Checkpoint 可以保存数据到 HDFS 这类可靠的存储上, Persist 和 Cache 只能保存在本地的磁盘和内存中

  • Checkpoint 可以斩断 RDD 的依赖链, 而 Persist 和 Cache 不行

  • 因为 CheckpointRDD 没有向上的依赖链, 所以程序结束后依然存在, 不会被删除. 而 Cache 和 Persist 会在程序结束后立刻被清除.

使用

// 设置保存 checkpoint 的目录, 也可以设置为 HDFS 上的目录
    sc.setCheckpointDir("checkpoint")
    // RDD 的处理部分
    ......
    // 如果调用 checkpoint, 则会重新计算一下 RDD, 然后把结果存在 HDFS 或者本地目录中.所以, 应该在 Checkpoint 之前, 进行一次 Cache
    aggRDD = aggRDD.cache()
    aggRDD.checkpoint()