Flink使用Scala编写WordCount提交到服务器上报错

Flink使用Scala编写WordCount提交到服务器上报错

问题描述:

错误提示:

Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.scala.DataStream$$anon$6; local class incompatible: stream classdesc serialVersionUID = -6084935300065560467, local class serialVersionUID = -7748427108057891683
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    ... 7 more

代码:

def main(args: Array[String]): Unit = {
    //环境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    //数据源
    val lineDS = streamEnv.socketTextStream("hadoop011", 15177)
    //处理
    val resultDS = lineDS.flatMap(_.split(" "))
      .map(x => (x, 1))
      .keyBy(0)
      .sum(1)

    //输出
    resultDS.print("结果")

    //启动
    streamEnv.execute()
  }



hadoop的jar包和自己flink工程的jar包版本不一致就会导致这个问题

排除自己工程中的hadoop相关的jar, 打包的时候屏蔽掉

试一下本地能不能跑通,本地能跑通就打个最小jar包扔服务器上跑。按这个程序的依赖来看jar包里什么都不用加直接打包就行。
这种长得奇怪还跟自己代码无关的错误多半都是jar包冲突