Spark + Json4s序列化问题

问题描述:

我在Spark 2.2.0闭包内部使用Json4s类.无法序列化DefaultFormats的解决方法"是包括其定义

I am using Json4s classes inside of a Spark 2.2.0 closure. The "workaround" for a failure to serialize DefaultFormats is to include their definition inside every closure executed by Spark that needs them. I believe I have done more than I needed to below but still get the serialization failure.

使用Spark 2.2.0,Scala 2.11,Json4s 3.2.x(无论Spark中有什么),还尝试通过使用sbt将其引入我的工作中来使用Json4s 3.5.3.在所有情况下,我都使用下面显示的解决方法.

Using Spark 2.2.0, Scala 2.11, Json4s 3.2.x (whatever is in Spark) and also tried using Json4s 3.5.3 by pulling it into my job using sbt. In all cases I used the workaround shown below.

有人知道我在做什么错吗?

Does anyone know what I'm doing wrong?

logger.info(s"Creating an RDD for $actionName")
implicit val formats = DefaultFormats
val itemProps = df.rdd.map[(ItemID, ItemProps)](row => { <--- error points to this line
  implicit val formats = DefaultFormats
  val itemId = row.getString(0)
  val correlators = row.getSeq[String](1).toList
  (itemId, Map(actionName -> JArray(correlators.map { t =>
    implicit val formats = DefaultFormats
    JsonAST.JString(t)
  })))
})

我还尝试了另一种建议,那就是在类构造函数区域而不是在闭包区域设置DefaultFormats隐式设置,在任何地方都没有运气.

I have also tried another suggestion, which is to set the DefaultFormats implicit in the class constructor area and not in the closure, no luck anywhere.

JVM错误跟踪来自Spark,抱怨该任务无法序列化并指向上面的行(无论如何,我的代码中的最后一行),然后通过以下方式解释了根本原因:

The JVM error trace is from Spark complaining that the task is not serializable and pointing to the line above (last line in my code anyway) then the root cause is explained with:

Serialization stack:
- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@7fdd29f3)
- field (class: com.actionml.URAlgorithm, name: formats, type: class org.json4s.DefaultFormats$)
- object (class com.actionml.URAlgorithm, com.actionml.URAlgorithm@2dbfa972)
- field (class: com.actionml.URAlgorithm$$anonfun$udfLLR$1, name: $outer, type: class com.actionml.URAlgorithm)
- object (class com.actionml.URAlgorithm$$anonfun$udfLLR$1, <function3>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, name: func$4, type: interface scala.Function3)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$4, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[2, bigint, false], input[3, bigint, false], input[5, bigint, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 128 more

有趣.一个典型的问题是您遇到了implicit val formats的序列化问题,但是在循环中定义它们时应该没问题.

Interesting. One typical problem is that you run into serialization issues with the implicit val formats, but as you define them inside your loop this should be ok.

我知道这有点棘手,但是您可以尝试以下操作:

I know that this is bit hacky, but you could try the following:

  1. 使用@transient implicit val
  2. 也许做一个最小的测试JsonAST.JString(t)是否可序列化
  1. using @transient implicit val
  2. maybe do a minimal test whether JsonAST.JString(t) is serializable