Spark延长SparkContext初始化时间 输入表数据: 输出表数据:
有些应用中可能希望先在driver上运行一段java单机程序,然后再初始化SparkContext用集群模式操作java程序返回值。从而避免过早建立SparkContext对象分配集群资源,使资源长时间空闲。
这里涉及到两个yarn参数:
<property> <name>yarn.am.liveness-monitor.expiry-interval-ms</name> <value>6000000</value> </property> <property> <name>yarn.resourcemanager.am.max-retries</name> <value>10</value> </property>
Yarn会周期性遍历所有的ApplicationMaster,如果一个ApplicationMaster在一定时间(可通过参数yarn.am.liveness-monitor.expiry-interval-ms配置,默认为10min)内未汇报心跳信息,则认为它死掉了,它上面所有正在运行的Container将被置为运行失败(RM不会重新执行这些Container,它只会通过心跳机制告诉对应的AM,由AM决定是否重新执行,如果需要,则AM重新向RM申请资源),AM本身会被重新分配到另外一个节点上(管理员可通过参数yarn.resourcemanager.am.max-retries指定每个ApplicationMaster的尝试次数,默认是1次)执行。
还需要两个spark参数:
<property> <name>spark.yarn.am.waitTime</name> <value>6000000</value> </property> <property> <name>spark.yarn.applicationMaster.waitTries</name> <value>200</value> </property>
集群管理
Spark On YARN
属性名称 | 默认值 | 含义 |
---|---|---|
spark.yarn.scheduler.heartbeat.interval-ms | 5000 | Spark AppMaster发送心跳信息给YARN RM的时间间隔 |
spark.yarn.am.waitTime | 100000 | 启动时等待时间 |
spark.yarn.applicationMaster.waitTries | 10 | RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败 |
下面是一个测试用例,现在driver打印30分钟的信息,然后再初始化SparkContext
import iie.udps.common.hcatalog.SerHCatInputFormat; import iie.udps.common.hcatalog.SerHCatOutputFormat; import java.io.IOException; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; import org.apache.spark.SerializableWritable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; /** * 实现功能:首先在driver上单机打印30分钟数据,然后初始化SparkContext开启集群模式,用spark+hcatlog 读hive表数据,实现GroupByAge功能, * 输出结果到hive表中,同时打印xml信息到hdfs文件。 * spark-submit --class iie.udps.example.spark.SparkTest --master yarn-cluster * --num-executors 2 --executor-memory 1g --executor-cores 1 --driver-memory 1g * --conf spark.yarn.applicationMaster.waitTries=200,--conf spark.yarn.am.waitTime=1800000 --jars /home/xdf/udps-sdk-0.3.jar,/home/xdf/udps-sdk-0.3.jar * /home/xdf/sparktest.jar -c /user/hdfs/TestStdin2.xml */ public class SparkTest { @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: <-c> <stdin.xml>"); System.exit(1); } String stdinXml = args[1]; OperatorParamXml operXML = new OperatorParamXml(); List<java.util.Map> stdinList = operXML.parseStdinXml(stdinXml);// 参数列表 // 获得输入参数 String inputDBName = stdinList.get(0).get("inputDBName").toString(); String inputTabName = stdinList.get(0).get("inputTabName").toString(); String outputDBName = stdinList.get(0).get("outputDBName").toString(); String outputTabName = stdinList.get(0).get("outputTabName").toString(); String tempHdfsBasePath = stdinList.get(0).get("tempHdfsBasePath") .toString(); String jobinstanceid = stdinList.get(0).get("jobinstanceid").toString(); System.out.println(inputDBName+": "+ inputTabName +": "+outputDBName+": "+ outputTabName +": "+ tempHdfsBasePath+": "+ jobinstanceid); long begin = System.currentTimeMillis(); int count = 600;// 写文件行数 for (int i = 0; i < count; i++) { System.out.println("aaaaaaaaaaaaaaa"+i); Thread.sleep(3000); } long end = System.currentTimeMillis(); System.out.println("FileOutputStream执行耗时:" + (end - begin) + "ms"); if (inputDBName == "" || inputTabName == "" || jobinstanceid == "" || outputDBName == "" || outputTabName == "" || tempHdfsBasePath == "" || jobinstanceid == "") { // 设置异常输出参数 java.util.Map<String, String> stderrMap = new HashMap<String, String>(); String errorMessage = "Some operating parameters is empty!!!"; String errotCode = "80001"; stderrMap.put("errorMessage", errorMessage); stderrMap.put("errotCode", errotCode); stderrMap.put("jobinstanceid", jobinstanceid); String fileName = ""; if (tempHdfsBasePath.endsWith("/")) { fileName = tempHdfsBasePath + "stderr.xml"; } else { fileName = tempHdfsBasePath + "/stderr.xml"; } // 生成异常输出文件 operXML.genStderrXml(fileName, stderrMap); } else { // 根据输入表结构,创建与输入表同样结构的输出表 HCatSchema schema = operXML .getHCatSchema(inputDBName, inputTabName); // Spark程序第一件事情就是创建一个JavaSparkContext告诉Spark怎么连接集群 SparkConf sparkConf = new SparkConf().setAppName("SparkExample"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // 读取并处理hive表中的数据,生成RDD数据并处理后返回 JavaRDD<SerializableWritable<HCatRecord>> LastRDD = getProcessedData( jsc, inputDBName, inputTabName, schema); // 将处理后的数据存到hive输出表中 storeToTable(LastRDD, outputDBName, outputTabName); jsc.stop(); // 设置正常输出参数 java.util.Map<String, String> stdoutMap = new HashMap<String, String>(); stdoutMap.put("outputDBName", outputDBName); stdoutMap.put("outputTabName", outputTabName); stdoutMap.put("jobinstanceid", jobinstanceid); String fileName = ""; if (tempHdfsBasePath.endsWith("/")) { fileName = tempHdfsBasePath + "stdout.xml"; } else { fileName = tempHdfsBasePath + "/stdout.xml"; } // 生成正常输出文件 operXML.genStdoutXml(fileName, stdoutMap); } System.out.println(inputDBName+": "+ inputTabName +": "+outputDBName+": "+ outputTabName +": "+ tempHdfsBasePath+": "+ jobinstanceid); System.exit(0); } /** * * @param jsc * @param dbName * @param inputTable * @param fieldPosition * @return * @throws IOException */ @SuppressWarnings("rawtypes") public static JavaRDD<SerializableWritable<HCatRecord>> getProcessedData( JavaSparkContext jsc, String dbName, String inputTable, final HCatSchema schema) throws IOException { // 获取hive表数据 Configuration inputConf = new Configuration(); Job job = Job.getInstance(inputConf); SerHCatInputFormat.setInput(job.getConfiguration(), dbName, inputTable); JavaPairRDD<WritableComparable, SerializableWritable> rdd = jsc .newAPIHadoopRDD(job.getConfiguration(), SerHCatInputFormat.class, WritableComparable.class, SerializableWritable.class); // 获取表记录集 JavaPairRDD<Integer, Integer> pairs = rdd .mapToPair(new PairFunction<Tuple2<WritableComparable, SerializableWritable>, Integer, Integer>() { private static final long serialVersionUID = 1L; @SuppressWarnings("unchecked") @Override public Tuple2<Integer, Integer> call( Tuple2<WritableComparable, SerializableWritable> value) throws Exception { HCatRecord record = (HCatRecord) value._2.value(); return new Tuple2((Integer) record.get(1), 1); } }); JavaPairRDD<Integer, Integer> counts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); JavaRDD<SerializableWritable<HCatRecord>> messageRDD = counts .map(new Function<Tuple2<Integer, Integer>, SerializableWritable<HCatRecord>>() { private static final long serialVersionUID = 1L; @Override public SerializableWritable<HCatRecord> call( Tuple2<Integer, Integer> arg0) throws Exception { HCatRecord record = new DefaultHCatRecord(2); record.set(0, arg0._1); record.set(1, arg0._2); return new SerializableWritable<HCatRecord>(record); } }); // 返回处理后的数据 return messageRDD; } /** * 将处理后的数据存到输出表中 * * @param rdd * @param dbName * @param tblName */ @SuppressWarnings("rawtypes") public static void storeToTable( JavaRDD<SerializableWritable<HCatRecord>> rdd, String dbName, String tblName) { Job outputJob = null; try { outputJob = Job.getInstance(); outputJob.setJobName("SparkExample"); outputJob.setOutputFormatClass(SerHCatOutputFormat.class); outputJob.setOutputKeyClass(WritableComparable.class); outputJob.setOutputValueClass(SerializableWritable.class); SerHCatOutputFormat.setOutput(outputJob, OutputJobInfo.create(dbName, tblName, null)); HCatSchema schema = SerHCatOutputFormat .getTableSchemaWithPart(outputJob.getConfiguration()); SerHCatOutputFormat.setSchema(outputJob, schema); } catch (IOException e) { e.printStackTrace(); } // 将RDD存储到目标表中 rdd.mapToPair( new PairFunction<SerializableWritable<HCatRecord>, WritableComparable, SerializableWritable<HCatRecord>>() { private static final long serialVersionUID = -4658431554556766962L; public Tuple2<WritableComparable, SerializableWritable<HCatRecord>> call( SerializableWritable<HCatRecord> record) throws Exception { return new Tuple2<WritableComparable, SerializableWritable<HCatRecord>>( NullWritable.get(), record); } }).saveAsNewAPIHadoopDataset(outputJob.getConfiguration()); } }
hive> select * from test_in; OK 120 220 321 420 521 620 721 819 919 1021
输出表数据:
hive> select * from test_out; OK 192 214 204