sparkstreaming中使用dataframe写回数据库导致程序缓慢

sparkstreaming中使用dataframe写回数据库导致程序缓慢

问题描述:

第一次写sparkstreaming 代码基本逻辑是 spark读取kafka主题分区数据成array数组 然后用自己写的一个类 解析每条消息 拼装成一个arraybuffer 然后通过sc.parallelize注册成rdd转dataframe注册成临时表

现在的问题是一调用dataframe进行write写数据回数据库就变得很慢

这是没有使用dataframe写数据回数据库的UI界面

stage15就是解析消息的那一行

这是使用dataframe写数据回数据库的UI界面

图片说明

图片说明

图片说明

下面红框的就是使用jdbc的

图片说明

我发现在处理完数据最后 加上df写会数据库这个代码 会使前面的解析代码变得缓慢 最后会出现数据堆积实时进程直接挂掉 不加上的话单独解析成dataframe很快

这是提交参数

spark2-submit --class com.kafka.FastUtilTest  --driver-class-path /spark/test/postgresql-42.1.1.jar --driver-memory 2G --num-executors 10 --executor-cores 2 --executor-memory 1G  --master yarn --deploy-mode client --jars /spark/test/fastutil-8.1.0.jar  /spark/test/parse-jdbc.jar  1>/spark/parse-jdbc.log 2>&1 &

试了很久不知道哪里出了问题 求解答 急!!

贴上代码

主类方法

object KafkaReadJson extends Serializable {

  def main(args: Array[String]): Unit = {

    //1、创建StreamingContext
    val conf = new SparkConf()
      //.setMaster("local[*]")
      .setAppName("kafka").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc: SparkContext = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(20))

    ssc.sparkContext.setLogLevel("warn")

    //累加器
    val accum = sc.collectionAccumulator[Object2ObjectOpenHashMap[String, String]]("My Accumulator")
    val accum1 = sc.collectionAccumulator[String]("My Accumulator1")

    //创建SparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("etl")
      .config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
      .config("spark.sql.autoBroadcastJoinThreshold", ConfigUtils.SPARK_SQL_AUTOBROADCASTJOINTHRESHOLD)
      .config("spark.shuffle.compress", ConfigUtils.SPARK_SHUFFLE_COMPRESS)
      .config("spark.shuffle.io.maxRetries", ConfigUtils.SPARK_SHUFFLE_IO_MAXRETRIES)
      .config("spark.shuffle.io.retryWait", ConfigUtils.SPARK_SHUFFLE_IO_RETRYWAIT)
      .config("spark.broadcast.compress", ConfigUtils.SPARK_BROADCAST_COMPRESS)
      .config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
      .config("spark.memory.fraction", ConfigUtils.SPARK_MEMORY_FRACTION)
      .config("spark.memory.storageFraction", ConfigUtils.SPARK_MEMORY_STORAGEFRACTION)
      .config("spark.default.parallelism", ConfigUtils.SPARK_DEFAULT_PARALLELISM)
      .config("spark.speculation", ConfigUtils.SPARK_SPECULATION)
      .config("spark.speculation.multiplier", ConfigUtils.SPARK_SPECULATION_MULTIPLIER)
      .getOrCreate()

    import spark.implicits._
    //2、消费kafka数据
    //消费topic
    val topics = Array("web_event_log")

    //kafka参数配置
    val kafkaParams = Map[String, Object](
      //指明kafka borkerlist
      "bootstrap.servers" -> "111.111.11.111:1111",
      //kafka message key反序列化器
      "key.deserializer" -> classOf[StringDeserializer],
      //kafka message value反序列化器
      "value.deserializer" -> classOf[StringDeserializer],
      //消费者组
      "group.id" -> "hymtest5",
      //指明从什么位置开始消费kafka ,latest 从最新的offset开始消费, earliest 从小的offset开始消费
      "auto.offset.reset" -> "latest",
      //是否自动提交offset
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //获取当前时间点
    def NowDate(): String = {
      val now: Date = new Date()
      val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val date = dateFormat.format(now)
      date
    }

    def createTableJson100(jsonMap: util.List[Object2ObjectOpenHashMap[String, String]], arrayBuffer: ArrayBuffer[TableJson1]): ArrayBuffer[TableJson1] = {
      val array = arrayBuffer
      var res = ""
      for (i <- 0 until jsonMap.size()) {
        try {
          val map = jsonMap.get(i)
          val tsj = TableJson1(map("s_user_id"), map("product_type"), map("id"), map("app_key"), map("os_version"),
            map("os_type"), map("screem_size"), map("s_user_type"), map("to_url"), map("event_time"),
            map("type"), map("from_url"), map("browser"), map("event_id"), map("etl_date"),
            map("outer_channel"), map("open_channel"), map("ip"), map("to_url_json"),
            map("from_url_json"),
            map("pre_object_id"), map("properties"), map("partition"), map("offset"))
          array += tsj
        } catch {
          case e: Exception => {
            println("异常值为:" + jsonMap.get(i).toString)
            e.printStackTrace()
          }
        }
      }
      array
    }

    val source: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    //3、数据处理
    //数据处理、offset更新
    source.foreachRDD(rdd => {


      val nowDate = NowDate()
      //当该批次接收到的消息条数 > 0,才进入清洗流程
      if (rdd.count() > 0) {

        println("该批次接受到的数量条数为:   " + rdd.count())
        println("该批次处理的开始时间为:" + NowDate())


        def isIntByRegex(s: String): String = {
          val pattern = """^(\d+)$""".r
          s match {
            case pattern(_*) => {
              //println("true")
              s
            }
            case _ => {
              //println("false")
              "111111"
            }
          }
        }

        def parseInt(suserid: String): Int = {
          try {
            val intId = isIntByRegex(suserid).toInt
            intId
          } catch {
            case e: Exception => {
              //println("not int!!")
              111111
            }
          }
        }

        val array: RDD[String] = rdd.map(_.value())
        for (value <- array) {
          try {
            //自定义类解析消息取得所需值
            val data: Object2ObjectOpenHashMap[String, String] = new Object2ObjectOpenHashMap[String, String]
            val jsonMap: Object2ObjectOpenHashMap[String, String] = KafkaJsonParse1.parseJson(value, data)
            var suserid: String = KafkaJsonParse1.getUserID(value, data)

            suserid = parseInt(suserid).toString

            accum.add(jsonMap)
            accum1.add(suserid)

          } catch {
            case e: Exception => {
              println("异常值为:" + value)

            }
          }

        }

        println("该批次处理的结束时间为:" + NowDate())

        val properties = new Properties()
        properties.setProperty("user", "111")
        properties.setProperty("password", "111")

        println("accum.value.length  " + accum.value.toArray().length)
        println("sc.parallelize start:" + NowDate())
        val arr1: util.List[Object2ObjectOpenHashMap[String, String]] = accum.value
        val table = createTableJson100(arr1, arrayBuffer)


        sc.parallelize(table).toDF().createOrReplaceTempView("e_bxr_web_event_log_real")

        //累加器清空
        accum.reset()
        accum1.reset()
        arrayBuffer = ArrayBuffer[TableJson1]()

        println("sc.parallelize end:" + NowDate())

                //写数据回数据库 使程序变得缓慢
        sc.parallelize(table).toDF().write.mode(SaveMode.Append).jdbc("jdbc:postgresql://111.111.111.111:1111/edw", "edwreal.e_bxr_web_event_log_real_20200707_bigdata", properties)


        //获取当前消费的offset
        val offset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //提交offset
        source.asInstanceOf[CanCommitOffsets].commitAsync(offset)
        println("已提交偏移量")

      }
    }
    )
    //4、启动streaming
    ssc.start()
    //5、阻塞主线程
    ssc.awaitTermination()
  }
}

这个是主类里面的KafkaJsonParse1.parseJson方法

def parseJson(word: String, data: Object2ObjectOpenHashMap[String, String]): Object2ObjectOpenHashMap[String, String] = {
    val parseData: Object2ObjectOpenHashMap[String, String] = data
    try {
      //初始化对象
      var time = ""
      var beat = new JSONObject()
      var input_type = ""
      var message = ""
      var offset = 0
      var source = ""
      var msgtype = ""
      //pgJdbcUtils.insert(word)
      val jSONObject: JSONObject = new JSONObject(word)
      if (jSONObject.has("@timestamp")) {
        time = jSONObject.getString("@timestamp")
      }
      if (jSONObject.has("beat")) {
        beat = jSONObject.getJSONObject("beat")
      }
      if (jSONObject.has("input_type")) {
        input_type = jSONObject.getString("input_type")
      }
      if (jSONObject.has("message")) {
        message = jSONObject.getString("message")
      }
      if (jSONObject.has("offset")) {
        offset = jSONObject.getInt("offset")
      }
      if (jSONObject.has("source")) {
        source = jSONObject.getString("source")
      }
      if (jSONObject.has("type")) {
        msgtype = jSONObject.getString("type")
      }

      message = message.trim() //去除首位空字符串
        .replaceAll("\\?inner_channel", "&inner_channel")
        .replaceAll("\\?random", "&random")
        .replaceAll("User-agent=XRK", "User-agent-XRK") //处理ua中带有=号的数据,尤其是华为的ua


      val type_reg: String = seperate_type(message)
      //把msg中的type值去掉
      if (!type_reg.isEmpty) {
        message = message.replace(type_reg, "")
      }
      //println("匹配到的type  " + type_reg)

      val utm_detail = seperate_utm(message)
      //把msg中的utm值去掉
      if (!utm_detail.isEmpty) {
        message = message.replace(utm_detail, "")
      }
      //println("匹配到的utm_detail  " + utm_detail)

      //取得msg中键值对集合map
      val msg_para_map: Map[String, String] = get_para_map(message)
      //      println("--打印集合--")
      //      for ((key, value) <- msg_para_map) {
      //        println(key + ":" + value)
      //      }
      //      println("--打印集合--")

      //取值
      var s_user_id: String = msg_para_map.getOrElse("s_user_id", "111111")


      def isIntByRegex(s: String) = {
        val pattern = """^(\d+)$""".r
        s match {
          case pattern(_*) => true
          case _ => false
        }
      }

      if (!isIntByRegex(s_user_id)) {
        //println("error" + s_user_id)
        s_user_id = 111111.toString

      }


      var product_type = msg_para_map.getOrElse("product_type", "")
      val id = str_decode(msg_para_map.getOrElse("id", ""))
      val app_key = msg_para_map.getOrElse("app_key", "")
      val os_version = msg_para_map.getOrElse("os_version", "")
      val os_type = msg_para_map.getOrElse("os_type", "")
      val screem_size = msg_para_map.getOrElse("screem_size", "")
      val s_user_type = msg_para_map.getOrElse("s_user_type", "")

      val to = msg_para_map.getOrElse("to", "")
      var to_url = URLDecoder.decode(to, "UTF8") //to_url的中文字段要保留
      to_url = URLDecoder.decode(to_url, "UTF8") // add by zhangf --  2020-07-10

      var event_time = msg_para_map.getOrElse("server_time", "").replaceAll("T", " ") //T
      if (event_time.isEmpty) {
        event_time = msg_para_map.getOrElse("event_timer", "")
      }

      var etl_date = event_time.substring(0, 10)
      var type1 = msg_para_map.getOrElse("type", "")
      if (type1.isEmpty) {
        type1 = type_reg
        type1 = URLDecoder.decode(type1, "UTF8")
      }
      //println("type1  " + type1)

      val from = msg_para_map.getOrElse("from", "")
      var from_url: String = URLDecoder.decode(from, "UTF8") //to_url的中文字段要保留
      from_url = URLDecoder.decode(from_url, "UTF8") // add by zhangf --  2020-07-10
      //println("from_url " + from_url)

      val browser = msg_para_map.getOrElse("browser", "")
      val event_id: String = str_decode(msg_para_map.getOrElse("event_id", ""))
      val outer_channel = msg_para_map.getOrElse("outer_channel", "")
      val open_channel = msg_para_map.getOrElse("open_channel", "")
      val ip = msg_para_map.getOrElse("ip", "")

      val to_url_json = get_json_params(to_url)
      //println("to_url_json  " + to_url_json)
      val from_url_json = get_json_params(from_url)
      //println("from_url_json  " + from_url_json)

      val pre_object_id = msg_para_map.getOrElse("pre_object_id", "") //py296

      //去除已经筛选出来的字段
      var del_para_map: Map[String, String] = delete_exist_param(msg_para_map) //py313

      if (event_id == "pv" || event_id == "app-share_click" || event_id == "weixin-share_result" || event_id == "invite-share_scan"
        || event_id == "invite-share_friend" || event_id == "invite-share_million") {
        val str: String = get_url_id(to_url)
        //println("event_id  " + event_id + "  to_url_id:  " + str)
        del_para_map += (("url_id", str))
      } else {
        val str: String = get_url_id(from_url)
        //println("event_id  " + event_id + "  from_url_id:  " + str)
        del_para_map += (("url_id", str))
      }

      if (utm_detail != "") {
        del_para_map += (("utm", utm_detail))
      }

      var propertise = ""
      if (del_para_map.nonEmpty) {
        propertise += "{"
        for ((key, value) <- del_para_map) {
          propertise += s"""\"${key}\":\"${value}\",""".stripMargin
        }
        if (offset.toString.nonEmpty) {
          val strOffset = offset.toString
          propertise += s"""\"offset\":\"${strOffset}\",""".stripMargin
        }
        if (propertise.lastIndexOf(",") >= 0) {
          propertise = propertise.substring(0, propertise.lastIndexOf(","))
        }
        propertise += "}"
        //println(propertise)

      }

      etl_date = event_time.substring(0, 10) //py341

      parseData.put("s_user_id", s_user_id)
      parseData.put("product_type", product_type)
      parseData.put("id", id)
      parseData.put("app_key", app_key)
      parseData.put("os_version", os_version)

      parseData.put("os_type", os_type)
      parseData.put("screem_size", screem_size)
      parseData.put("s_user_type", str_decode(s_user_type))
      parseData.put("to_url", to_url)
      parseData.put("event_time", event_time)

      parseData.put("type", str_decode(type1))
      parseData.put("from_url", from_url)
      parseData.put("browser", str_decode(browser))
      parseData.put("event_id", event_id)
      parseData.put("etl_date", etl_date)

      parseData.put("outer_channel", outer_channel)
      parseData.put("open_channel", open_channel)
      parseData.put("ip", ip)
      parseData.put("to_url_json", to_url_json)
      parseData.put("from_url_json", from_url_json)

      parseData.put("pre_object_id", pre_object_id)
      parseData.put("properties", propertise)
      parseData.put("partition", "")
      parseData.put("offset", offset.toString)

      parseData
    } catch {
      case e: Exception =>
        println("json解析异常")
        //println(word)
        //e.printStackTrace()
        parseData
    }
  }