为什么在流数据集上使用缓存失败并显示"AnalysisException:必须使用writeStream.start()执行带有流源的查询"?

问题描述:

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

在Spark 2.1.0中执行此示例时,出现错误. 没有.cache选项,它可以按预期工作,但是有了.cache选项,我得到了:

While executing this sample in Spark 2.1.0 I got error. Without the .cache option it worked as intended but with .cache option i got:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)

有什么主意吗?

您的(非常有趣的)案例归结为以下行(您可以在spark-shell中执行):

Your (very interesting) case boils down to the following line (that you can execute in spark-shell):

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

事实证明,原因很简单(对Spark SQL的explain毫无疑问).

The reason for this turned out quite simple to explain (no pun to Spark SQL's explain intended).

spark.readStream.text("files")创建一个所谓的流数据集.

scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

流数据集是Spark SQL 结构化流的基础. a>.

Streaming Datasets are the foundation of Spark SQL's Structured Streaming.

您可能已经阅读了结构化流媒体的快速示例:

As you may have read in Structured Streaming's Quick Example:

然后使用start()开始流计算.

引用DataStreamWriter的

Quoting the scaladoc of DataStreamWriter's start:

start():StreamingQuery 开始执行流查询,当新数据到达时,它将继续将结果输出到给定的路径.

start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.

因此,您必须使用start(或foreach)启动流查询的执行.你已经知道了.

So, you have to use start (or foreach) to start the execution of the streaming query. You knew it already.

但是...有不受支持的操作在结构化流中:

But...there are Unsupported Operations in Structured Streaming:

此外,有些数据集方法不适用于流数据集.它们是将立即运行查询并返回结果的操作,这对于流数据集没有意义.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.

如果您尝试这些操作中的任何一个,您将看到一个AnalysisException,例如流数据帧/数据集不支持操作XYZ".

If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".

看起来很熟悉,不是吗?

That looks familiar, doesn't it?

cache在不受支持的操作列表中,但这是因为它只是被忽略了(我报告了

cache is not in the list of the unsupported operations, but that's because it has simply been overlooked (I reported SPARK-20927 to fix it).

cache应该已经在列表中,因为它在查询被注册到Spark SQL的CacheManager之前会执行 .

cache should have been in the list as it does execute a query before the query gets registered in Spark SQL's CacheManager.

让我们深入到Spark SQL的深处... 屏住呼吸 ...

Let's go deeper into the depths of Spark SQL...hold your breath...

cache persist,而persist 在缓存查询时CacheManager 确实 不允许

我们知道,因为这样做是start(或foreach).

which we know is not allowed since it is start (or foreach) to do so.

问题已解决!