scala实现spark读取文件、荡涤、入库base中
scala实现spark读取文件、清洗、入库base中
日常工作中我们往往面对的数据都是海量的文件数据,我们如何快速通过spark将文件导入到hbase库中,我这写了一个简单的例子仅供参考,实际上数据是需要经过清洗才能放入到hbase库中的。
由于数据文件内容涉及到公司实际项目,不便贴出,此文着重spark提出数据、清洗、入hbase库这个逻辑的实现,scala写的代码比较精简,代码如下:
ParseClient.java主要实现文件加载、清洗、入库的工作:
package main.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ListBuffer import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import scala.collection.mutable.ListBuffer object ParseClient { def main(args: Array[String]) { val conf = new SparkConf(); conf.setAppName("ParseClient") conf.setMaster("local"); val sc = new SparkContext(conf); val textRdd = sc.textFile("WW_2016-10-13~2016-10-13.txt"); ---数据清洗 var smailList = new ListBuffer[String](); val arrRdd = textRdd.flatMap { line => { val allList = new ListBuffer[ListBuffer[String]](); if (line == "" || "".equals(line)) { allList += smailList; smailList = new ListBuffer[String](); } else { smailList += line; } allList; } } val truncArrRdd = arrRdd.map { arr => { val lineArr = new ListBuffer[(String, String, String, String)]; arr.foreach { element => { val eleArr = element.split(" "); if (eleArr.length >= 4) { val tuple = (eleArr(0), eleArr(1), eleArr(2), eleArr(3)); lineArr += tuple; } } } lineArr } } val resultRdd = truncArrRdd.map(tupleArr => { var serviceName: String = ""; var cosumerName: String = ""; var date: String = ""; var context: String = ""; for (tuple <- tupleArr) { if (tuple._3.contains("官方旗舰店")) { serviceName = tuple._3 } else { cosumerName = tuple._3; } date = tuple._1; context = context + tuple._4 + "\n"; } (cosumerName, serviceName, date, context); }) ----获取hbase库模型对象、入hbase库代码实现 val job = new HBaseCommon().getJob("jeffTest"); val put = resultRdd.map(tuple => { val rowkey = tuple._3 + tuple._2 + tuple._1; val put = new Put(Bytes.toBytes(rowkey)); put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(tuple._4)) (new ImmutableBytesWritable, put) }).saveAsNewAPIHadoopDataset(job.getConfiguration()); sc.stop(); } }
下面这个是辅助类HBaseCommon.java,主要是负责连接spark、hbase库、以及附带写了hbase库读取:
package main.scala import java.util import scala.collection.mutable.{ListBuffer, LinkedHashMap} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{KeyValue, HBaseConfiguration} import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkContext import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{Get, Scan, HTable, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} class HBaseCommon { /** * 获取初始化配置 */ def getConfiguration(): Configuration = { val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String](); val HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", map.getOrElse("zookeeper_quorum", "host1,host2,host3")); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", map.getOrElse("zookeeper_port", "2181")); val configuration = HBaseConfiguration.create(HBASE_CONFIG); configuration; } /** * 获取作业信息 */ def getJob(tableName: String): Job = { val configuration = this.getConfiguration(); configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName); var job = new Job(configuration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job; } /** * 读取hbase某个表中的全部内容 */ def getTableInfo(tableName: String, sc: SparkContext): util.ArrayList[String]= { val configuration = this.getConfiguration() configuration.set(TableInputFormat.INPUT_TABLE, tableName) val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) var strinfo = "" val count = hBaseRDD.count() println("HBase RDD Count:" + count) hBaseRDD.cache() val table = new HTable(configuration, tableName); val g = new Get("row1".getBytes) val result = table.get(g) val value = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes)) hBaseRDD.cache() println("------------------------scan----------") val res = hBaseRDD.take(count.toInt) val reslist=new util.ArrayList[String]() for (j <- 1 until count.toInt) { var rs = res(j - 1)._2 var kvs = rs.raw for (kv <- kvs) { strinfo += ("rowkey:" + new String(kv.getRow()) + " cf:" + new String(kv.getFamily()) + " column:" + new String(kv.getQualifier()) + " value:" + new String(kv.getValue())+"\n") reslist.add(new String(kv.getValue())) } } reslist } }