为什么在流数据集上使用缓存失败并显示"AnalysisException:必须使用writeStream.start()执行带有流源的查询"?
SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
在Spark 2.1.0中执行此示例时,出现错误.
没有.cache
选项,它可以按预期工作,但是有了.cache
选项,我得到了:
While executing this sample in Spark 2.1.0 I got error.
Without the .cache
option it worked as intended but with .cache
option i got:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
有什么主意吗?
您的(非常有趣的)案例归结为以下行(您可以在spark-shell
中执行):
Your (very interesting) case boils down to the following line (that you can execute in spark-shell
):
scala> :type spark
org.apache.spark.sql.SparkSession
scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
... 48 elided
事实证明,原因很简单(对Spark SQL的explain
毫无疑问).
The reason for this turned out quite simple to explain (no pun to Spark SQL's explain
intended).
spark.readStream.text("files")
创建一个所谓的流数据集.
scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]
scala> files.isStreaming
res2: Boolean = true
流数据集是Spark SQL 结构化流的基础. a>.
Streaming Datasets are the foundation of Spark SQL's Structured Streaming.
您可能已经阅读了结构化流媒体的快速示例:
As you may have read in Structured Streaming's Quick Example:
然后使用
start()
开始流计算.
Quoting the scaladoc of DataStreamWriter's start:
start():StreamingQuery 开始执行流查询,当新数据到达时,它将继续将结果输出到给定的路径.
start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.
因此,您必须使用start
(或foreach
)启动流查询的执行.你已经知道了.
So, you have to use start
(or foreach
) to start the execution of the streaming query. You knew it already.
但是...有不受支持的操作在结构化流中:
But...there are Unsupported Operations in Structured Streaming:
此外,有些数据集方法不适用于流数据集.它们是将立即运行查询并返回结果的操作,这对于流数据集没有意义.
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.
如果您尝试这些操作中的任何一个,您将看到一个AnalysisException,例如流数据帧/数据集不支持操作XYZ".
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
看起来很熟悉,不是吗?
That looks familiar, doesn't it?
cache
在不受支持的操作列表中不,但这是因为它只是被忽略了(我报告了
cache
is not in the list of the unsupported operations, but that's because it has simply been overlooked (I reported SPARK-20927 to fix it).
cache
应该已经在列表中,因为它在查询被注册到Spark SQL的CacheManager之前会执行 .
cache
should have been in the list as it does execute a query before the query gets registered in Spark SQL's CacheManager.
让我们深入到Spark SQL的深处... 屏住呼吸 ...
Let's go deeper into the depths of Spark SQL...hold your breath...
我们知道,因为这样做是 which we know is not allowed since it is 问题已解决! cache
是 persist
,而persist
在缓存查询时CacheManager
确实
不允许start
(或foreach
).start
(or foreach
) to do so.