为什么在流式数据集上使用缓存失败并显示“AnalysisException:必须使用 writeStream.start() 执行具有流式源的查询"?

问题描述:

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

在 Spark 2.1.0 中执行此示例时出现错误.没有 .cache 选项,它按预期工作,但使用 .cache 选项,我得到了:

While executing this sample in Spark 2.1.0 I got error. Without the .cache option it worked as intended but with .cache option i got:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)

有什么想法吗?

您(非常有趣的)案例归结为以下行(您可以在 spark-shell 中执行):

Your (very interesting) case boils down to the following line (that you can execute in spark-shell):

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

原因很容易解释(与 Spark SQL 的 explain 意图无关).

The reason for this turned out quite simple to explain (no pun to Spark SQL's explain intended).

spark.readStream.text("files") 创建所谓的流数据集.

scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

流数据集是 Spark SQL 的结构化流的基础一>.

Streaming Datasets are the foundation of Spark SQL's Structured Streaming.

正如您在 Structured Streaming 的 快速示例:

As you may have read in Structured Streaming's Quick Example:

然后使用start()开始流式计算.

And then start the streaming computation using start().

引用 DataStreamWriter 的 开始:

Quoting the scaladoc of DataStreamWriter's start:

start(): StreamingQuery 开始执行流式查询,当新数据到达时,它会不断地将结果输出到给定的路径.

start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.

因此,您必须使用start(或foreach)来开始执行流式查询.你已经知道了.

So, you have to use start (or foreach) to start the execution of the streaming query. You knew it already.

但是...有不支持的操作 在结构化流中:

But...there are Unsupported Operations in Structured Streaming:

此外,还有一些数据集方法不适用于流式数据集.它们是将立即运行查询并返回结果的操作,这在流式数据集上没有意义.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.

如果您尝试任何这些操作,您将看到类似流式数据帧/数据集不支持操作 XYZ"之类的 AnalysisException.

If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".

这看起来很眼熟,是不是?

cache 不在 不支持的操作列表中,但那是因为它被简单地忽略了(我报告了 SPARK-20927 修复它.

cache is not in the list of the unsupported operations, but that's because it has simply been overlooked (I reported SPARK-20927 to fix it).

cache 应该在列表中,因为它确实在查询在 Spark SQL 的 CacheManager 中注册之前执行查询.

cache should have been in the list as it does execute a query before the query gets registered in Spark SQL's CacheManager.

让我们更深入地了解 Spark SQL...屏住呼吸...

Let's go deeper into the depths of Spark SQL...hold your breath...

cache persistpersist 请求当前CacheManager缓存查询::>

sparkSession.sharedState.cacheManager.cacheQuery(this)

缓存查询时 CacheManager 确实 执行它:

sparkSession.sessionState.executePlan(planToCache).executedPlan

我们知道是不允许的,因为它是start(或foreach)这样做的.

which we know is not allowed since it is start (or foreach) to do so.

问题解决了!