sparkstreaming中使用dataframe写回数据库导致程序缓慢
问题描述:
第一次写sparkstreaming 代码基本逻辑是 spark读取kafka主题分区数据成array数组 然后用自己写的一个类 解析每条消息 拼装成一个arraybuffer 然后通过sc.parallelize注册成rdd转dataframe注册成临时表
现在的问题是一调用dataframe进行write写数据回数据库就变得很慢
这是没有使用dataframe写数据回数据库的UI界面
stage15就是解析消息的那一行
这是使用dataframe写数据回数据库的UI界面
下面红框的就是使用jdbc的
我发现在处理完数据最后 加上df写会数据库这个代码 会使前面的解析代码变得缓慢 最后会出现数据堆积实时进程直接挂掉 不加上的话单独解析成dataframe很快
这是提交参数
spark2-submit --class com.kafka.FastUtilTest --driver-class-path /spark/test/postgresql-42.1.1.jar --driver-memory 2G --num-executors 10 --executor-cores 2 --executor-memory 1G --master yarn --deploy-mode client --jars /spark/test/fastutil-8.1.0.jar /spark/test/parse-jdbc.jar 1>/spark/parse-jdbc.log 2>&1 &
试了很久不知道哪里出了问题 求解答 急!!
贴上代码
主类方法
object KafkaReadJson extends Serializable {
def main(args: Array[String]): Unit = {
//1、创建StreamingContext
val conf = new SparkConf()
//.setMaster("local[*]")
.setAppName("kafka").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc: SparkContext = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
ssc.sparkContext.setLogLevel("warn")
//累加器
val accum = sc.collectionAccumulator[Object2ObjectOpenHashMap[String, String]]("My Accumulator")
val accum1 = sc.collectionAccumulator[String]("My Accumulator1")
//创建SparkSession
val spark: SparkSession = SparkSession.builder()
.appName("etl")
.config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.sql.autoBroadcastJoinThreshold", ConfigUtils.SPARK_SQL_AUTOBROADCASTJOINTHRESHOLD)
.config("spark.shuffle.compress", ConfigUtils.SPARK_SHUFFLE_COMPRESS)
.config("spark.shuffle.io.maxRetries", ConfigUtils.SPARK_SHUFFLE_IO_MAXRETRIES)
.config("spark.shuffle.io.retryWait", ConfigUtils.SPARK_SHUFFLE_IO_RETRYWAIT)
.config("spark.broadcast.compress", ConfigUtils.SPARK_BROADCAST_COMPRESS)
.config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
.config("spark.memory.fraction", ConfigUtils.SPARK_MEMORY_FRACTION)
.config("spark.memory.storageFraction", ConfigUtils.SPARK_MEMORY_STORAGEFRACTION)
.config("spark.default.parallelism", ConfigUtils.SPARK_DEFAULT_PARALLELISM)
.config("spark.speculation", ConfigUtils.SPARK_SPECULATION)
.config("spark.speculation.multiplier", ConfigUtils.SPARK_SPECULATION_MULTIPLIER)
.getOrCreate()
import spark.implicits._
//2、消费kafka数据
//消费topic
val topics = Array("web_event_log")
//kafka参数配置
val kafkaParams = Map[String, Object](
//指明kafka borkerlist
"bootstrap.servers" -> "111.111.11.111:1111",
//kafka message key反序列化器
"key.deserializer" -> classOf[StringDeserializer],
//kafka message value反序列化器
"value.deserializer" -> classOf[StringDeserializer],
//消费者组
"group.id" -> "hymtest5",
//指明从什么位置开始消费kafka ,latest 从最新的offset开始消费, earliest 从小的offset开始消费
"auto.offset.reset" -> "latest",
//是否自动提交offset
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//获取当前时间点
def NowDate(): String = {
val now: Date = new Date()
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = dateFormat.format(now)
date
}
def createTableJson100(jsonMap: util.List[Object2ObjectOpenHashMap[String, String]], arrayBuffer: ArrayBuffer[TableJson1]): ArrayBuffer[TableJson1] = {
val array = arrayBuffer
var res = ""
for (i <- 0 until jsonMap.size()) {
try {
val map = jsonMap.get(i)
val tsj = TableJson1(map("s_user_id"), map("product_type"), map("id"), map("app_key"), map("os_version"),
map("os_type"), map("screem_size"), map("s_user_type"), map("to_url"), map("event_time"),
map("type"), map("from_url"), map("browser"), map("event_id"), map("etl_date"),
map("outer_channel"), map("open_channel"), map("ip"), map("to_url_json"),
map("from_url_json"),
map("pre_object_id"), map("properties"), map("partition"), map("offset"))
array += tsj
} catch {
case e: Exception => {
println("异常值为:" + jsonMap.get(i).toString)
e.printStackTrace()
}
}
}
array
}
val source: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
//3、数据处理
//数据处理、offset更新
source.foreachRDD(rdd => {
val nowDate = NowDate()
//当该批次接收到的消息条数 > 0,才进入清洗流程
if (rdd.count() > 0) {
println("该批次接受到的数量条数为: " + rdd.count())
println("该批次处理的开始时间为:" + NowDate())
def isIntByRegex(s: String): String = {
val pattern = """^(\d+)$""".r
s match {
case pattern(_*) => {
//println("true")
s
}
case _ => {
//println("false")
"111111"
}
}
}
def parseInt(suserid: String): Int = {
try {
val intId = isIntByRegex(suserid).toInt
intId
} catch {
case e: Exception => {
//println("not int!!")
111111
}
}
}
val array: RDD[String] = rdd.map(_.value())
for (value <- array) {
try {
//自定义类解析消息取得所需值
val data: Object2ObjectOpenHashMap[String, String] = new Object2ObjectOpenHashMap[String, String]
val jsonMap: Object2ObjectOpenHashMap[String, String] = KafkaJsonParse1.parseJson(value, data)
var suserid: String = KafkaJsonParse1.getUserID(value, data)
suserid = parseInt(suserid).toString
accum.add(jsonMap)
accum1.add(suserid)
} catch {
case e: Exception => {
println("异常值为:" + value)
}
}
}
println("该批次处理的结束时间为:" + NowDate())
val properties = new Properties()
properties.setProperty("user", "111")
properties.setProperty("password", "111")
println("accum.value.length " + accum.value.toArray().length)
println("sc.parallelize start:" + NowDate())
val arr1: util.List[Object2ObjectOpenHashMap[String, String]] = accum.value
val table = createTableJson100(arr1, arrayBuffer)
sc.parallelize(table).toDF().createOrReplaceTempView("e_bxr_web_event_log_real")
//累加器清空
accum.reset()
accum1.reset()
arrayBuffer = ArrayBuffer[TableJson1]()
println("sc.parallelize end:" + NowDate())
//写数据回数据库 使程序变得缓慢
sc.parallelize(table).toDF().write.mode(SaveMode.Append).jdbc("jdbc:postgresql://111.111.111.111:1111/edw", "edwreal.e_bxr_web_event_log_real_20200707_bigdata", properties)
//获取当前消费的offset
val offset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//提交offset
source.asInstanceOf[CanCommitOffsets].commitAsync(offset)
println("已提交偏移量")
}
}
)
//4、启动streaming
ssc.start()
//5、阻塞主线程
ssc.awaitTermination()
}
}
这个是主类里面的KafkaJsonParse1.parseJson方法
def parseJson(word: String, data: Object2ObjectOpenHashMap[String, String]): Object2ObjectOpenHashMap[String, String] = {
val parseData: Object2ObjectOpenHashMap[String, String] = data
try {
//初始化对象
var time = ""
var beat = new JSONObject()
var input_type = ""
var message = ""
var offset = 0
var source = ""
var msgtype = ""
//pgJdbcUtils.insert(word)
val jSONObject: JSONObject = new JSONObject(word)
if (jSONObject.has("@timestamp")) {
time = jSONObject.getString("@timestamp")
}
if (jSONObject.has("beat")) {
beat = jSONObject.getJSONObject("beat")
}
if (jSONObject.has("input_type")) {
input_type = jSONObject.getString("input_type")
}
if (jSONObject.has("message")) {
message = jSONObject.getString("message")
}
if (jSONObject.has("offset")) {
offset = jSONObject.getInt("offset")
}
if (jSONObject.has("source")) {
source = jSONObject.getString("source")
}
if (jSONObject.has("type")) {
msgtype = jSONObject.getString("type")
}
message = message.trim() //去除首位空字符串
.replaceAll("\\?inner_channel", "&inner_channel")
.replaceAll("\\?random", "&random")
.replaceAll("User-agent=XRK", "User-agent-XRK") //处理ua中带有=号的数据,尤其是华为的ua
val type_reg: String = seperate_type(message)
//把msg中的type值去掉
if (!type_reg.isEmpty) {
message = message.replace(type_reg, "")
}
//println("匹配到的type " + type_reg)
val utm_detail = seperate_utm(message)
//把msg中的utm值去掉
if (!utm_detail.isEmpty) {
message = message.replace(utm_detail, "")
}
//println("匹配到的utm_detail " + utm_detail)
//取得msg中键值对集合map
val msg_para_map: Map[String, String] = get_para_map(message)
// println("--打印集合--")
// for ((key, value) <- msg_para_map) {
// println(key + ":" + value)
// }
// println("--打印集合--")
//取值
var s_user_id: String = msg_para_map.getOrElse("s_user_id", "111111")
def isIntByRegex(s: String) = {
val pattern = """^(\d+)$""".r
s match {
case pattern(_*) => true
case _ => false
}
}
if (!isIntByRegex(s_user_id)) {
//println("error" + s_user_id)
s_user_id = 111111.toString
}
var product_type = msg_para_map.getOrElse("product_type", "")
val id = str_decode(msg_para_map.getOrElse("id", ""))
val app_key = msg_para_map.getOrElse("app_key", "")
val os_version = msg_para_map.getOrElse("os_version", "")
val os_type = msg_para_map.getOrElse("os_type", "")
val screem_size = msg_para_map.getOrElse("screem_size", "")
val s_user_type = msg_para_map.getOrElse("s_user_type", "")
val to = msg_para_map.getOrElse("to", "")
var to_url = URLDecoder.decode(to, "UTF8") //to_url的中文字段要保留
to_url = URLDecoder.decode(to_url, "UTF8") // add by zhangf -- 2020-07-10
var event_time = msg_para_map.getOrElse("server_time", "").replaceAll("T", " ") //T
if (event_time.isEmpty) {
event_time = msg_para_map.getOrElse("event_timer", "")
}
var etl_date = event_time.substring(0, 10)
var type1 = msg_para_map.getOrElse("type", "")
if (type1.isEmpty) {
type1 = type_reg
type1 = URLDecoder.decode(type1, "UTF8")
}
//println("type1 " + type1)
val from = msg_para_map.getOrElse("from", "")
var from_url: String = URLDecoder.decode(from, "UTF8") //to_url的中文字段要保留
from_url = URLDecoder.decode(from_url, "UTF8") // add by zhangf -- 2020-07-10
//println("from_url " + from_url)
val browser = msg_para_map.getOrElse("browser", "")
val event_id: String = str_decode(msg_para_map.getOrElse("event_id", ""))
val outer_channel = msg_para_map.getOrElse("outer_channel", "")
val open_channel = msg_para_map.getOrElse("open_channel", "")
val ip = msg_para_map.getOrElse("ip", "")
val to_url_json = get_json_params(to_url)
//println("to_url_json " + to_url_json)
val from_url_json = get_json_params(from_url)
//println("from_url_json " + from_url_json)
val pre_object_id = msg_para_map.getOrElse("pre_object_id", "") //py296
//去除已经筛选出来的字段
var del_para_map: Map[String, String] = delete_exist_param(msg_para_map) //py313
if (event_id == "pv" || event_id == "app-share_click" || event_id == "weixin-share_result" || event_id == "invite-share_scan"
|| event_id == "invite-share_friend" || event_id == "invite-share_million") {
val str: String = get_url_id(to_url)
//println("event_id " + event_id + " to_url_id: " + str)
del_para_map += (("url_id", str))
} else {
val str: String = get_url_id(from_url)
//println("event_id " + event_id + " from_url_id: " + str)
del_para_map += (("url_id", str))
}
if (utm_detail != "") {
del_para_map += (("utm", utm_detail))
}
var propertise = ""
if (del_para_map.nonEmpty) {
propertise += "{"
for ((key, value) <- del_para_map) {
propertise += s"""\"${key}\":\"${value}\",""".stripMargin
}
if (offset.toString.nonEmpty) {
val strOffset = offset.toString
propertise += s"""\"offset\":\"${strOffset}\",""".stripMargin
}
if (propertise.lastIndexOf(",") >= 0) {
propertise = propertise.substring(0, propertise.lastIndexOf(","))
}
propertise += "}"
//println(propertise)
}
etl_date = event_time.substring(0, 10) //py341
parseData.put("s_user_id", s_user_id)
parseData.put("product_type", product_type)
parseData.put("id", id)
parseData.put("app_key", app_key)
parseData.put("os_version", os_version)
parseData.put("os_type", os_type)
parseData.put("screem_size", screem_size)
parseData.put("s_user_type", str_decode(s_user_type))
parseData.put("to_url", to_url)
parseData.put("event_time", event_time)
parseData.put("type", str_decode(type1))
parseData.put("from_url", from_url)
parseData.put("browser", str_decode(browser))
parseData.put("event_id", event_id)
parseData.put("etl_date", etl_date)
parseData.put("outer_channel", outer_channel)
parseData.put("open_channel", open_channel)
parseData.put("ip", ip)
parseData.put("to_url_json", to_url_json)
parseData.put("from_url_json", from_url_json)
parseData.put("pre_object_id", pre_object_id)
parseData.put("properties", propertise)
parseData.put("partition", "")
parseData.put("offset", offset.toString)
parseData
} catch {
case e: Exception =>
println("json解析异常")
//println(word)
//e.printStackTrace()
parseData
}
}