spark core源码分析17 RDD相干API
spark core源码分析17 RDD相关API
博客地址: http://blog.****.net/yueqian_zhu/
一、RDD创建的操作(SparkContext.scala)
1、从内存集合中创建RDD,RDD中包含的是类型为T的集合
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
2、从文件中读取(path/to/file;hdfs://ip:port/path/to/file;file://path/to/file),没有前缀的默认从hdfs读取。hdfs的配置从sparkConf中hadoop相关的配置项创建
path:
(1) 一个文件路径,这时候只装载指定的文件
(2) 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件)
(3) 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String]例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //textFile,返回行数 val textFileRDD = sc.textFile("file:///Users/zhengze/Downloads/spark-1.4.1-bin-hadoop2.3/README.md") println(textFileRDD.count) //输出行数3、老的hadoopRdd接口def hadoopRDD[K, V]( conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]4、新的hadoopFile接口 path:待读取的文件;conf:hadoop 配置文件,同textFile(1)没有参数conf的接口,默认取sparkContext中读取hadoop相关的配置
(2)也可以自己指定conf,见下面例子
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]未指定path,需要额外通过setName接口设置def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]用到隐式转换,如V为Text,vm就隐式转换为ClassTag[Text],vm.runtimeClass.asInstanceOf[Class[V]],就等同调用了上面的接口,非常方便哦//eg. val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //newAPIHadoopFile import org.apache.hadoop.conf.Configuration val conf = new Configuration(false) conf.addResource("xxx-core-site.xml") val hadoopFileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://ip:port/path/to/file") println(hadoopFileRDD.count) //输出行数5、RDD中元素合并(不去重)def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //union val rdd1 = sc.parallelize(Seq(1,2,3)) val rdd2 = sc.parallelize(Seq(3,4,5)) sc.union(Seq(rdd1,rdd2)).foreach(println) //输出123345二、RDD基本转换操作(RDD.scala)1、存储相关,不解释def persist(newLevel: StorageLevel): this.type def cache(): this.type def unpersist(blocking: Boolean = true): this.type2、map参数f从原始RDD中的类型T一对一的转化为类型U的MapPartitionsRDD/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U]例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //map val m = sc.parallelize(Seq(1,2,3)).map(x => x+1).foreach(println) //输出2343、flatMap参数f从原始RDD中的类型T一对一的转换为一个集合[U],之后再将所有小的集合合并成一个大集合/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //flatmap val fm = sc.parallelize(Seq(1,2,3)).flatMap(x => Seq(x+2)).foreach(println) //输出3454、filter参数f从原始RDD中类型T转换为一个Boolean进行元素过滤操作def filter(f: T => Boolean): RDD[T]5、distinct将原始RDD中的元素去重,这里的ord没用到,即无法排序/** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) } /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(): RDD[T] = withScope { distinct(partitions.length) }例子:val sc = new SparkContext(new SparkConf().setAppName("Spark Test")) //distinct 不管设置多少分区,最终会将所有数据去重 val dis = sc.parallelize(Seq(1,2,3,4,5,4,3)).distinct().foreach(println) //输出413526、重新分区,repartition是coalesce接口的shuffle为true的简易实现假设N个分区重新划分为M个分区:N<M:shuffle需要设置为trueN>M且相差不多:可以shuffle设置为false,此时为窄依赖N>>>M:shuffle如果为false,会影响性能def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]7、union / ++合并RDD,可能存在重复元素
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T]def ++(other: RDD[T]): RDD[T]8、sortBy首先调用keyBy,作用是通过参数f,将原始RDD中的T元素转换为(K,T)类型,即K由T经f方法产生然后调用sortByKey,对key进行按partition分组排序,最后只取排序后的所有T元素/** * Return this RDD sorted by the given key function. */ def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values }9、取交集且交集中不含相同元素def intersection(other: RDD[T]): RDD[T] def intersection( other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T], numPartitions: Int): RDD[T]10、首先通过参数f将原始RDD中的T元素转换为(k,T)类型,即K由T经f方法产生然后调用groupByKey对RDD进行按key聚合,最终产生(k,Iterable[T])类型的RDDdef groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] def groupBy[K]( f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])]11、mapPartitions与map类似,只不过f参数由RDD中的每个元素变成了RDD中的每一个分区的迭代器。这样可以各个分区共享同一个外部对象,而不是每个元素一个对象。比如创建jdbc连接之类的,不需要每个元素都建立一个连接吧。参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。mapPartitionsWithIndex方法参数f中多了一个分区号为参数def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]12、zip将两个RDD组合成key/value形式的RDD,两个RDD的partition数量和元素数量都需要相同zipPartitions是将多个RDD按照partition组合成新的RDD,RDD需要相同的分区数,但是每个分区中的元素数量是没有要求的def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, preservesPartitioning = false)(f) } def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f) } def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f) }三、K/V类型RDD转换操作1、partitionBy类似基本转换中的repartition中的功能,根据partition参数将原RDD重新分区转化成ShuffledRDDdef partitionBy(partitioner: Partitioner): RDD[(K, V)]2、mapValues/flatMapValues针对RDD[k,v]中的V值进行map操作和flatMap操作def mapValues[U](f: V => U): RDD[(K, U)] def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]3、组合操作createCombiner:创建组合器函数,将V类型转换为C类型mergeValue:合并器函数,将一个V类型和一个C类型合并成一个C类型mergeCombiners:将两个C类型合并成一个C类型partitioner:分区函数mapSideCombine:是否需要在map端进行combine操作def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)]下面的操作最终都会归结为对combineByKey的调用def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(func: (V, V) => V): RDD[(K, V)] def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]4、连接操作def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]四、RDD行动操作
1、简单方法def foreach(f: T => Unit): Unit def foreachPartition(f: Iterator[T] => Unit): Unit def collect(): Array[T] private[spark] def collectPartitions(): Array[Array[T]] def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] def count(): Long def take(num: Int): Array[T] def first(): T def top(num: Int)(implicit ord: Ordering[T]): Array[T] //默认取最大的num个元素 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] //默认取最小的num个元素 def max()(implicit ord: Ordering[T]): T def min()(implicit ord: Ordering[T]): T def isEmpty(): Boolean2、toLocalIterator返回这个RDD所有元素的迭代器,计算将消耗RDD最大分区的内存量。调用的RDD有可能是shuffledRDD,为避免重复计算,最好之前能cache住def toLocalIterator: Iterator[T]3、reduce对RDD中的元素进行二元计算,返回计算结果def reduce(f: (T, T) => T): T4、aggregate操作:zeroValue:初始值seqOp:将RDD中每个分区的数据聚合成类型为U的类型的值combOp:将各个分区聚合起来的值与初始值合并得到最终的U类型的值fold方法是aggregate的便利接口,op既是seqOp操作,也是combOp操作,且最终类型的值与原始RDD中的类型T一致def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U def fold(zeroValue: T)(op: (T, T) => T): T5、countByValue将RDD中所有的值分别计数,返回一个本地的map<value, count>对def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]6、存储相关行动操作def saveAsTextFile(path: String): Unit def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]]( path: String)(implicit fm: ClassTag[F]): Unit def saveAsNewAPIHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
版权声明:本文为博主原创文章,未经博主允许不得转载。