从代码中取消Apache Flink作业
我处于一种要停止/取消代码中的flink作业的情况。这是在集成测试中,在该测试中,我正在向flink作业提交任务并检查结果。随着工作的进行,异步地,即使测试失败/通过,它也不会停止。我想在测试结束后停下来。
I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.
我尝试了一些我在下面列出的东西:
I tried a few things which I am listing below :
- 获取职位经理演员
- 获取正在运行的职位
- 对于每个正在运行的职位,向其发送取消请求jobmanager
这当然不是在运行,但是我不确定jobmanager actorref是错误的还是缺少其他东西。
This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.
我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka:// flink / user / jobmanager_1]消息[org.apache从Actor [akka:// flink / temp / $ a]到Actor [akka:// flink / user / jobmanager_1]的.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $]未交付。 [1]遇到死信。可以使用配置设置 akka.log-dead-letters和 akka.log-dead-letters-during-shutdown关闭或调整该日志记录
The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'
其中
该代码如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
有人可以检查是否是正确的方法吗?
Can someone check if this is the correct approach ?
编辑:
要完全停止作业,必须先按TaskManager的顺序停止TaskManager和JobManager,然后再停止JobManager。
EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.
您要创建一个新的 ActorSystem
,然后尝试在同一个actor系统中查找名称为 / user / jobmanager_1
的actor。这将无法正常工作,因为实际的工作经理将在不同的 ActorSystem
中运行。
You're creating a new ActorSystem
and then try to find an actor with the name /user/jobmanager_1
in the same actor system. This won't work, since the actual job manager will run in a different ActorSystem
.
如果获取真实工作经理的 ActorRef
,您必须使用相同的 ActorSystem
进行选择(然后您可以使用本地地址),或者您已经找到了工作经理角色的远程地址。远程地址的格式为 akka.tcp:// flink @ [address_of_actor_system] / user / jobmanager_ [instance_number]
。如果您有权访问 FlinkMiniCluster
,则可以使用 leaderGateway
诺言获得当前领导者的 ActorGateway
。
If you want to obtain an ActorRef
to the real job manager, you either have to use the same ActorSystem
for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the format akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]
. If you have access to the FlinkMiniCluster
then you can use the leaderGateway
promise to obtain the current leader's ActorGateway
.