spark 算子之RDD
map
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
返回通过函数func传递源的每个元素形成的新的分布式数据集。通过函数得到一个新的分布式数据集。
var rdd = session.sparkContext.parallelize(1 to 10) rdd.foreach(println) println("=========================") rdd.map(x => (x,1)).foreach(println)
结果:
67891012345
=========================
(6,1)(7,1)(8,1)(9,1)(10,1)(1,1)(2,1)(3,1)(4,1)(5,1)
filter
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
通过自定义函数对元素进行过滤
val rdd = session.sparkContext.parallelize(1 to 10) rdd.foreach(print) val rdd2 = rdd.filter(_>6) println("=========================") rdd2.foreach(print)
结果:
67891012345
=========================
78910
filtMap
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
通过自定义函数把RDD中的每一个元素映射成多个元素,返回一个集合。
val ds = session.sparkContext.textFile("D:/公司/test.txt") ds.foreach(println) val ds2 = ds.flatMap(x => { x.toString().split(":") }) println("===================") ds2.foreach(println)
结果:
{ "DEVICENAME": "����4", "LID": 170501310, "ADDRESS": "xxxx", "ID": 230001160 }
===================
{ "DEVICENAME"
"����4", "LID"
170501310, "ADDRESS"
"xxxx", "ID"
230001160 }
mapFunction
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
类型map.不过是分区进行。类似于批量。
mapPartitionsWithIndex
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
sample
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
采集一个RDD的随机样本。
其中包含三个参数
replacement 布尔类型,表示是否重样。
fraction 返回的比例数 介于0到1。如原来RDD数10,fraction=0.5,那么将返回一个长度为5的随机RDD。
seed 表示随机比例。默认为long的最大值。如果此值为恒值(不随机),那么返回的RDD相等。
union
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
将两个RDD合并,不去重。
intersection
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
返回两个RDD的交集。去重。
var rdd = session.sparkContext.parallelize(1 to 10) rdd.foreach(println) val rdd2 = rdd.sample(true, 0.5) println("==============") rdd2.foreach(println) val rdd3 = rdd.intersection(rdd2) println("==============") rdd3.foreach(println)
结果:
==============
89955
==============
958
distinct
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. |
对RDD进行去重。参数为任务数。
其内部实现原理对元素进行分组,然后取第一个。
/** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) }
var rdd = session.sparkContext.parallelize(1 to 10) val rdd2 = rdd.sample(true, 0.5) rdd2.foreach(print) println("====================") val rdd3 = rdd2.distinct(10) rdd3.foreach(print)
结果:
7792224
==============
4279
groupByKey
对一个(k,v)对的数据集进行k的分组,并返回一个v的集合。此算子使用前提是一个(k,v)对的RDD。
官方的建议是,如果要进行类似操作,最好使用reduceByKey
或者aggregateByKey
。
相当于group by ,它不可以自定义函数。如果在这个基础上需要做count等运算,需要使用reduceByKey
或者aggregateByKey
。
groupBy
groupBy和groupByKey略有不同。1:groupby可以自定义key;2:在返回值上,groupby返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]
val seq = Seq[String]("spark", "hadoop", "spark") val rdd = session.sparkContext.parallelize(seq) val rdd2 = rdd.map(x => (x, 1)).groupBy(_._1)//默认元素为key,此时同groupByKey,但返回值略有不同 rdd2.foreach(println) println("==============") val rdd4 = rdd.map(x => (x, 1)).groupBy(x => { x._1 + new Random().nextInt(100);//可以自定义key }) rdd4.foreach(println) println("==============") val rdd3 = rdd.map(x => (x, 1)).groupByKey()//默认元素为key rdd3.foreach(println)
结果:
(spark,CompactBuffer((spark,1), (spark,1)))
(hadoop,CompactBuffer((hadoop,1)))
==============
(spark92,CompactBuffer((spark,1)))
(hadoop72,CompactBuffer((hadoop,1)))
(spark46,CompactBuffer((spark,1)))
==============
(spark,CompactBuffer(1, 1))
(hadoop,CompactBuffer(1))
reduceByKey
针对一个(K,V)对的RDD,返回一个对K去重的值。这个具体的值是什么样,取决于第一个参数。
val seq = Seq[String]("spark", "hadoop", "spark") val rdd = session.sparkContext.parallelize(seq) val rdd2 = rdd.map(x => (x, 1)).reduceByKey(_+_) rdd2.foreach(println)
结果:
(spark,2)
(hadoop,1)
小结:groupBy groupByKey reduceByKey
1:groupBy可以自定义key;2:在返回值上,groupBy返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]
2:reduceByKey(func, [numTasks])的第一个参数为自定义函数,可以对结果进行再处理。groupBy([numTasks])和groupByKey([numTasks])都不能自定义函数,如实现wordcount的功能,需额外使用算子或自定义实现。
3:reduceByKey和groupByKey内部原理不一样。这一点在官方注释上已经讲得很明白。reduceByKey会经过类似于Map与reduce之间的combiner操作(similarly to a "combiner" in MapReduce.)。会将各个节点上的数据进行合并之后再进行传输。
reduceByKey
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }
groupByKey
/** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }