泛型参数上的 Flink Scala API 函数

问题描述:

这是关于Flink Scala API参数不足"的后续问题一>.

我希望能够传递 Flink 的 DataSet 并用它做一些事情,但是数据集的参数是通用的.

I'd like to be able to pass Flink's DataSets around and do something with it, but the parameters to the dataset are generic.

这是我现在遇到的问题:

Here's the problem I have now:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag

object TestFlink {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val split = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    id(split).print()

    env.execute()
  }

  def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}

ds.map(r => r) 出现此错误:

Multiple markers at this line
    - not enough arguments for method map: (implicit evidence$256: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit 
     evidence$257: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence$256, evidence$257.
    - not enough arguments for method map: (implicit evidence$4: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit evidence
     $5: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence$4, evidence$5.
    - could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[K]

当然,这里的 id 函数只是一个例子,我希望能够用它做一些更复杂的事情.

Of course, the id function here is just an example, and I'd like to be able to do something more complex with it.

如何解决?

您还需要将 TypeInformation 作为上下文绑定在 K 参数上,因此:

you also need to have TypeInformation as a context bound on the K parameter, so:

def id[K: ClassTag: TypeInformation](ds: DataSet[K]): DataSet[K] = ds.map(r => r)

原因是,Flink 会分析您在程序中使用的类型,并为您使用的每种类型创建一个 TypeInformation 实例.如果要创建通用操作,则需要通过添加上下文绑定来确保该类型的 TypeInformation 可用.这样,Scala 编译器将确保在泛型函数的调用点有一个实例可用.

The reason is, that Flink analyses the types that you use in your program and creates a TypeInformation instance for each type you use. If you want to create generic operations then you need to make sure a TypeInformation of that type is available by adding a context bound. This way, the Scala compiler will make sure an instance is available at the call site of the generic function.