spark core源码分析17 RDD相干API

spark core源码分析17 RDD相关API

博客地址: http://blog.****.net/yueqian_zhu/


一、RDD创建的操作(SparkContext.scala)

1、从内存集合中创建RDD,RDD中包含的是类型为T的集合

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T]

2、从文件中读取(path/to/file;hdfs://ip:port/path/to/file;file://path/to/file),没有前缀的默认从hdfs读取。hdfs的配置从sparkConf中hadoop相关的配置项创建

path:

(1) 一个文件路径,这时候只装载指定的文件

(2) 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件)

(3) 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String]
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))

    //textFile,返回行数
    val textFileRDD = sc.textFile("file:///Users/zhengze/Downloads/spark-1.4.1-bin-hadoop2.3/README.md")
    println(textFileRDD.count)  //输出行数
3、老的hadoopRdd接口
def hadoopRDD[K, V](
    conf: JobConf,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
4、新的hadoopFile接口 path:待读取的文件;conf:hadoop 配置文件,同textFile

(1)没有参数conf的接口,默认取sparkContext中读取hadoop相关的配置

(2)也可以自己指定conf,见下面例子

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
    path: String,
    fClass: Class[F],
    kClass: Class[K],
    vClass: Class[V],
    conf: Configuration = hadoopConfiguration): RDD[(K, V)]
未指定path,需要额外通过setName接口设置
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
    conf: Configuration = hadoopConfiguration,
    fClass: Class[F],
    kClass: Class[K],
    vClass: Class[V]): RDD[(K, V)]
用到隐式转换,如V为Text,vm就隐式转换为ClassTag[Text],vm.runtimeClass.asInstanceOf[Class[V]],就等同调用了上面的接口,非常方便哦
//eg. val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
    (path: String)
    (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))
    //newAPIHadoopFile
    import org.apache.hadoop.conf.Configuration
    val conf = new Configuration(false)
    conf.addResource("xxx-core-site.xml")
    val hadoopFileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://ip:port/path/to/file")
    println(hadoopFileRDD.count)  //输出行数
5、RDD中元素合并(不去重)
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))
    //union
    val rdd1 = sc.parallelize(Seq(1,2,3))
    val rdd2 = sc.parallelize(Seq(3,4,5))
    sc.union(Seq(rdd1,rdd2)).foreach(println) //输出123345
二、RDD基本转换操作(RDD.scala)
1、存储相关,不解释
def persist(newLevel: StorageLevel): this.type
def cache(): this.type 
def unpersist(blocking: Boolean = true): this.type
2、map
参数f从原始RDD中的类型T一对一的转化为类型U的MapPartitionsRDD
/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U]
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))
    //map
    val m = sc.parallelize(Seq(1,2,3)).map(x => x+1).foreach(println)  //输出234
3、flatMap
参数f从原始RDD中的类型T一对一的转换为一个集合[U],之后再将所有小的集合合并成一个大集合
/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))
    //flatmap
    val fm = sc.parallelize(Seq(1,2,3)).flatMap(x => Seq(x+2)).foreach(println)  //输出345
4、filter
参数f从原始RDD中类型T转换为一个Boolean进行元素过滤操作
def filter(f: T => Boolean): RDD[T]
5、distinct
将原始RDD中的元素去重,这里的ord没用到,即无法排序
/**
 * Return a new RDD containing the distinct elements in this RDD.
 */
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

/**
 * Return a new RDD containing the distinct elements in this RDD.
 */
def distinct(): RDD[T] = withScope {
  distinct(partitions.length)
}
例子:
val sc = new SparkContext(new SparkConf().setAppName("Spark Test"))
    //distinct  不管设置多少分区,最终会将所有数据去重
    val dis = sc.parallelize(Seq(1,2,3,4,5,4,3)).distinct().foreach(println) //输出41352
6、重新分区,repartition是coalesce接口的shuffle为true的简易实现
假设N个分区重新划分为M个分区:
N<M:shuffle需要设置为true
N>M且相差不多:可以shuffle设置为false,此时为窄依赖
N>>>M:shuffle如果为false,会影响性能
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
    : RDD[T]
7、union / ++
合并RDD,可能存在重复元素
/**
   * Return the union of this RDD and another one. Any identical elements will appear multiple
   * times (use `.distinct()` to eliminate them).
   */
  def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T]
8、sortBy
首先调用keyBy,作用是通过参数f,将原始RDD中的T元素转换为(K,T)类型,即K由T经f方法产生
然后调用sortByKey,对key进行按partition分组排序,最后只取排序后的所有T元素
/**
 * Return this RDD sorted by the given key function.
 */
def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}
9、取交集且交集中不含相同元素
def intersection(other: RDD[T]): RDD[T]
def intersection(
    other: RDD[T],
    partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
10、首先通过参数f将原始RDD中的T元素转换为(k,T)类型,即K由T经f方法产生
然后调用groupByKey对RDD进行按key聚合,最终产生(k,Iterable[T])类型的RDD
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](
    f: T => K,
    numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])]
11、mapPartitions与map类似,只不过f参数由RDD中的每个元素变成了RDD中的每一个分区的迭代器。
这样可以各个分区共享同一个外部对象,而不是每个元素一个对象。比如创建jdbc连接之类的,不需要每个元素都建立一个连接吧。
参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。
mapPartitionsWithIndex方法参数f中多了一个分区号为参数
def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
12、zip将两个RDD组合成key/value形式的RDD,两个RDD的partition数量和元素数量都需要相同
zipPartitions是将多个RDD按照partition组合成新的RDD,RDD需要相同的分区数,但是每个分区中的元素数量是没有要求的
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipPartitions[B: ClassTag, V: ClassTag]
    (rdd2: RDD[B], preservesPartitioning: Boolean)
    (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag]
    (rdd2: RDD[B])
    (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
  zipPartitions(rdd2, preservesPartitioning = false)(f)
}
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
    (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
    (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
    (rdd2: RDD[B], rdd3: RDD[C])
    (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
  zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
}
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
    (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
    (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
    (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
    (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
  zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
}
三、K/V类型RDD转换操作
1、partitionBy
类似基本转换中的repartition中的功能,根据partition参数将原RDD重新分区转化成ShuffledRDD
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
2、mapValues/flatMapValues
针对RDD[k,v]中的V值进行map操作和flatMap操作
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
3、组合操作
createCombiner:创建组合器函数,将V类型转换为C类型
mergeValue:合并器函数,将一个V类型和一个C类型合并成一个C类型
mergeCombiners:将两个C类型合并成一个C类型
partitioner:分区函数
mapSideCombine:是否需要在map端进行combine操作
def combineByKey[C](createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
  : RDD[(K, C)]
下面的操作最终都会归结为对combineByKey的调用
def foldByKey(
    zeroValue: V,
    partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
4、连接操作
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

四、RDD行动操作
1、简单方法
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
def collect(): Array[T]
private[spark] def collectPartitions(): Array[Array[T]]
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
def count(): Long
def take(num: Int): Array[T]
def first(): T 
def top(num: Int)(implicit ord: Ordering[T]): Array[T]    //默认取最大的num个元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]    //默认取最小的num个元素
def max()(implicit ord: Ordering[T]): T
def min()(implicit ord: Ordering[T]): T 
def isEmpty(): Boolean
2、toLocalIterator
返回这个RDD所有元素的迭代器,计算将消耗RDD最大分区的内存量。调用的RDD有可能是shuffledRDD,为避免重复计算,最好之前能cache住
def toLocalIterator: Iterator[T]
3、reduce
对RDD中的元素进行二元计算,返回计算结果
def reduce(f: (T, T) => T): T
4、aggregate操作:
zeroValue:初始值
seqOp:将RDD中每个分区的数据聚合成类型为U的类型的值
combOp:将各个分区聚合起来的值与初始值合并得到最终的U类型的值
fold方法是aggregate的便利接口,op既是seqOp操作,也是combOp操作,且最终类型的值与原始RDD中的类型T一致
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def fold(zeroValue: T)(op: (T, T) => T): T
5、countByValue
将RDD中所有的值分别计数,返回一个本地的map<value, count>对
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
6、存储相关行动操作
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
    path: String)(implicit fm: ClassTag[F]): Unit
def saveAsNewAPIHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
    conf: Configuration = self.context.hadoopConfiguration): Unit




版权声明:本文为博主原创文章,未经博主允许不得转载。