import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* # _*_ coding:utf-8 _*_
* # Author:xiaoshubiao
* # Time : 2020/5/13 14:13
* java map,mapfunctions
**/
public class spark_function {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark_java_function");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = Arrays.asList("a","b","c","d");
JavaRDD<String> parallelize = sc.parallelize(list,3);
parallelize
.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterator<String> call(Iterator<String> stringIterator) throws Exception {
List<String> list = new ArrayList<>();
while (stringIterator.hasNext()){
String next = stringIterator.next();
System.out.println("mapPartitions函数的输出"+next);
list.add(next);
}
return list.iterator();
}
}
)
.mapPartitionsWithIndex(
new Function2<Integer, Iterator<String>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception {
List<String> l = new ArrayList();
while (stringIterator.hasNext()){
String next = stringIterator.next();
System.out.println("分区id:"+integer+"--值:"+next);
l.add(next+integer);
}
return l.iterator();
}
}
,false).collect();
}
}