scala实现spark读取文件、荡涤、入库base中

scala实现spark读取文件、清洗、入库base中

       日常工作中我们往往面对的数据都是海量的文件数据,我们如何快速通过spark将文件导入到hbase库中,我这写了一个简单的例子仅供参考,实际上数据是需要经过清洗才能放入到hbase库中的。

       由于数据文件内容涉及到公司实际项目,不便贴出,此文着重spark提出数据、清洗、入hbase库这个逻辑的实现,scala写的代码比较精简,代码如下:

 

ParseClient.java主要实现文件加载、清洗、入库的工作:

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import scala.collection.mutable.ListBuffer

object ParseClient {
  def main(args: Array[String]) {
    val conf = new SparkConf();
conf.setAppName("ParseClient")
    conf.setMaster("local");
val sc = new SparkContext(conf);
val textRdd = sc.textFile("WW_2016-10-13~2016-10-13.txt");
---数据清洗
var smailList = new ListBuffer[String]();
val arrRdd = textRdd.flatMap { line => {
      val allList = new ListBuffer[ListBuffer[String]]();
if (line == "" || "".equals(line)) {
        allList += smailList;
smailList = new ListBuffer[String]();
} else {
        smailList += line;
}
      allList;
}
    }

    val truncArrRdd = arrRdd.map { arr => {
      val lineArr = new ListBuffer[(String, String, String, String)];
arr.foreach { element => {
        val eleArr = element.split(" ");
if (eleArr.length >= 4) {
          val tuple = (eleArr(0), eleArr(1), eleArr(2), eleArr(3));
lineArr += tuple;
}
      }
      }
      lineArr
    }
    }

    val resultRdd = truncArrRdd.map(tupleArr => {
      var serviceName: String = "";
var cosumerName: String = "";
var date: String = "";
var context: String = "";
for (tuple <- tupleArr) {
        if (tuple._3.contains("官方旗舰店")) {
          serviceName = tuple._3
        } else {
          cosumerName = tuple._3;
}
        date = tuple._1;
context = context + tuple._4 + "\n";
}

      (cosumerName, serviceName, date, context);
})

----获取hbase库模型对象、入hbase库代码实现
    val job = new HBaseCommon().getJob("jeffTest");
val put = resultRdd.map(tuple => {
      val rowkey = tuple._3 + tuple._2 + tuple._1;
val put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(tuple._4))
      (new ImmutableBytesWritable, put)
    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
sc.stop();
}
}

 

下面这个是辅助类HBaseCommon.java,主要是负责连接spark、hbase库、以及附带写了hbase库读取:

package main.scala

import java.util

import scala.collection.mutable.{ListBuffer, LinkedHashMap}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{KeyValue, HBaseConfiguration}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Get, Scan, HTable, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}

class HBaseCommon {
  /**
   * 获取初始化配置
   */
def getConfiguration(): Configuration = {
    val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String]();
val HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", map.getOrElse("zookeeper_quorum", "host1,host2,host3"));
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", map.getOrElse("zookeeper_port", "2181"));
val configuration = HBaseConfiguration.create(HBASE_CONFIG);
configuration;
}

  /**
   * 获取作业信息
   */
def getJob(tableName: String): Job = {
    val configuration = this.getConfiguration();
configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName);
var job = new Job(configuration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job;
}

  /**
   * 读取hbase某个表中的全部内容
   */
def getTableInfo(tableName: String, sc: SparkContext): util.ArrayList[String]= {
    val configuration = this.getConfiguration()
    configuration.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

    var strinfo = ""
val count = hBaseRDD.count()
    println("HBase RDD Count:" + count)
    hBaseRDD.cache()

    val table = new HTable(configuration, tableName);
val g = new Get("row1".getBytes)
    val result = table.get(g)
    val value = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes))

    hBaseRDD.cache()
    println("------------------------scan----------")
    val res = hBaseRDD.take(count.toInt)
    val reslist=new util.ArrayList[String]()
    for (j <- 1 until count.toInt) {
      var rs = res(j - 1)._2
      var kvs = rs.raw
      for (kv <- kvs) {
        strinfo += ("rowkey:" + new String(kv.getRow()) +
          " cf:" + new String(kv.getFamily()) +
          " column:" + new String(kv.getQualifier()) +
          " value:" + new String(kv.getValue())+"\n")

        reslist.add(new String(kv.getValue()))
      }
    }
    reslist
  }
}