源码解析Checkpoint后续收尾流程
创始人
2025-05-29 13:24:25
0

当StreamTask中的Runnable任务中的OperatorSnapshotFutures执行完成后,就要将ACK消息发送给TaskStateManager。

/*** 整个Checkpoint操作完成后,发送ACK消息给TaskStateManager。*/
private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot,TaskStateSnapshot localTaskStateSnapshot,long asyncDurationMillis) {// TaskStateManager是专门用来提供报告和检索任务状态的接口TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();// 省略部分代码...// 对外提供报告(所属Task中运行的Operator实例的状态快照),TaskStateManager的实现子类TaskStateManagerImpl提供了具体实现逻辑taskStateManager.reportTaskStateSnapshots(checkpointMetaData,checkpointMetrics,hasAckState ? acknowledgedTaskStateSnapshot : null,hasLocalState ? localTaskStateSnapshot : null);// 省略部分代码...
}

TaskStateManager收到ACK消息后,将其传递给CheckpointResponder

/*** TaskStateManager将ACK消息传递给CheckpointResponder*/
@Override
public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);// 通过CheckpointResponder接口,将ack消息发送出去(实现子类RpcCheckpointResponder,提供了具体的实现方法)checkpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);
}

CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,它的子接口是JobMasterGateway,子接口的实现就是JobMaster

/*** CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,也就是JobMaster*/
@Override
public void acknowledgeCheckpoint(JobID jobID,ExecutionAttemptID executionAttemptID,long checkpointId,CheckpointMetrics checkpointMetrics,TaskStateSnapshot subtaskState) {// 通过CheckpointCoordinatorGateway接口发送ack消息,接口的子接口是JobMasterGateway接口,基本实现就是JobMastercheckpointCoordinatorGateway.acknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,subtaskState);
}

于是,ACK消息就名正言顺的到了JobMaster手里。

/*** 经过层层接口、实现子类,StreamTask最终将执行Checkpoint形成State Snapshot的ACK消息传递给了JobMaster。* 而JobMaster会将其通知给schedulerNG*/
@Override
public void acknowledgeCheckpoint(final JobID jobID,final ExecutionAttemptID executionAttemptID,final long checkpointId,final CheckpointMetrics checkpointMetrics,final TaskStateSnapshot checkpointState) {// JobMaster将ACK消息传递给SchedulerNG调度器(默认实现为SchedulerBase)schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}

JobMaster收到ACK消息后,会传递给SchedulerNG。SchedulerNG收到后,会将ACK消息包装成AcknowledgeCheckpoint,并将其交给传递给CheckpointCoordinator组件处理。

/*** JobMaster将执行Checkpoint形成State Snapshot的ACK消息传递给SchedulerNG调度器*/
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {mainThreadExecutor.assertRunningInMainThread();final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();// 将ACK消息包装成AcknowledgeCheckpointfinal AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,checkpointState);final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);if (checkpointCoordinator != null) {ioExecutor.execute(() -> {try {// 调度器包装后的ack,会被传递给CheckpointCoordinator组件继续处理checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);} catch (Throwable t) {log.warn("Error while processing checkpoint acknowledgement message", t);}});} else {String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";if (executionGraph.getState() == JobStatus.RUNNING) {log.error(errorMessage, jobGraph.getJobID());} else {log.debug(errorMessage, jobGraph.getJobID());}}
}

CheckpointCoordinator收到AcknowledgeCheckpoint后,会根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpoint,并判断是否收到了ACK消息。如果确认收到,那就将这个PendingCheckpoint转换为CompletedCheckpoint

/*** Checkpoint后的ACK消息被包装成AcknowledgeCheckpoint后,SchedulerNG会将其发送给CheckpointCoordinator处理。*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {// 省略部分代码...final long checkpointId = message.getCheckpointId();synchronized (lock) {if (shutdown) {return false;}// 根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpointfinal PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if (checkpoint != null && !checkpoint.isDiscarded()) {switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {case SUCCESS:LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);// 确认收到了ACK消息if (checkpoint.areTasksFullyAcknowledged()) {// 完成当前PendingCheckpoint操作,并从容纳它的Map集合中removecompletePendingCheckpoint(checkpoint);}break;// 省略部分代码...}return true;}// 省略部分代码...}
}

完成PendingCheckpoint操作,有2个核心操作:

  • 1.生成CompletedCheckpoint并添加到集合中
  • 2.让每个Execution都向TaskManagerGateway发送“CheckpointComplete消息”,通知所有的Task实例:本次Checkpoint操作结束
/*** CheckpointCoordinator收到Task实例的ack消息后,会触发并完成Checkpoint操作:将PendingCheckpoint转换为CompletedCheckpoint*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;Map operatorStates = pendingCheckpoint.getOperatorStates();// SharedStateRegistry作为注册中心,状态会注册至此sharedStateRegistry.registerAll(operatorStates.values());try {try {/*** 完成当前的PendingCheckpoint操作,并生成CompletedCheckpoint。如果出现异常会throw new CheckpointException*/completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {if (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {// 将CompletedCheckpoint添加到“已完成Checkpoint”的集合中completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果添加到“已完成Checkpoint”的集合的过程中出现异常,就进行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最终,都会移除PendingCheckpoint集合中对应的那个PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 省略部分代码.../*** 遍历“当Checkpoint被确认时,需要发送消息的任务”的ExecutionVertex[]数组,* 向所有的Execution发送“CheckpointComplete消息”,通知Task实例本次Checkpoint操作已完成*/for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {/*** 通过Execution向TaskManagerGateway发送“CheckpointComplete消息”,通知所有的Task实例:本次Checkpoint操作结束。* TaskExecutor收到“CheckpointComplete消息”后,会从TaskSlotTable中取出对应的Task实例,并向其发送“CheckpointComplete消息”。* 所有实现了CheckpointListener监听器的观察者,在收到Checkpoint完成的消息后,都会进行各自的处理。*/ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}

当TaskExecutor收到“CheckpointComplete消息”后,会将其传递到指定的Task实例中。

/*** 将“CheckpointComplete消息”传递给TaskExecutor*/
public void notifyCheckpointComplete(long checkpointId, long timestamp) {final LogicalSlot slot = assignedResource;if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();// 利用TaskManagerGateway接口向TaskExecutor发送消息:通知所有的Task实例,本次Checkpoint操作结束// TaskManagerGateway接口的实现子类为RpcTaskManagerGatewaytaskManagerGateway.notifyCheckpointComplete(attemptId, getVertex().getJobId(), checkpointId, timestamp);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}
}/*** CheckpointCoordinator组件通过Execution,利用TaskManagerGateway接口发送确认消息,* 而TaskManagerGateway会利用TaskExecutorGateway,进一步传递ack。* 正好,TaskExecutor就是TaskExecutorGateway接口的实现子类,so,ack就传递给TaskExecutor了*/
@Override
public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {// 利用TaskExecutorGateway,进一步传递ack。// 这一步,就将ack传递给TaskExecutor了taskExecutorGateway.confirmCheckpoint(executionAttemptID, checkpointId, timestamp);
}/*** 经过层层接口、层层RPC调用,TaskExecutor最终收到了Checkpoint完成的ack消息。* TaskExecutor会将此消息转发给对应的Task实例中*/
@Override
public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp) {log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);// 从TaskSlotTable中取出对应的Task实例final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {// 将“Checkpoint完成”的消息,发送给指定的Task实例。// 所有实现了CheckpointListener接口的组件、算子,都会收到该消息,然后各自完成各自的后续操作task.notifyCheckpointComplete(checkpointId);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));}
}

Task收到“Checkpoint完成”的消息后,会将其传递给StreamTask中。

/*** TaskExecutor会将“Checkpoint完成”的消息,发送给指定的Task实例。* 而Task实例会将其通知给StreamTask中的算子*/
@Override
public void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {try {// 将(Checkpoint完成)的消息,发送给StreamTask中的StreamOperator// StreamTask为抽象类AbstractInvokable,提供了具体的实现逻辑invokable.notifyCheckpointCompleteAsync(checkpointID);}catch (RejectedExecutionException ex) {LOG.debug("Notify checkpoint complete {} for {} ({}) was rejected by the mailbox",checkpointID, taskNameWithSubtask, executionId);}catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}
}/*** Task实例将“Checkpoint完成”的消息,发送给StreamTask,* StreamTask会将notifyCheckpointComplete()提交到Mailbox中,以“获取、执行的优先级最高”的级别,使其在MailboxProcessor中运行*/
@Override
public Future notifyCheckpointCompleteAsync(long checkpointId) {// 将notifyCheckpointComplete()方法体转换成RunnableWithException线程后,放到MailboxProcessor中运行// 该线程的执行权限:最高!return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(// 核心逻辑:将“Checkpoint完成”的消息,发送给StreamTask内的Operator() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}

最后,由StreamTask负责通知给OperatorChain中的各个StreamOperator

/*** 将“Checkpoint完成”的消息,发送给StreamTask中的OperatorChain内的每一个StreamOperator*/
private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 遍历OperatorChain中的所有Operatorfor (StreamOperator operator : operatorChain.getAllOperators()) {if (operator != null) {// 将“Checkpoint完成”的消息,发送给每个Operatoroperator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 将消息通知给TaskStateManagergetEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,直接完成当前Task,并终止Task实例if (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.// 将syncSavepointId置空resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}

StreamOperator收到“Checkpoint完成”的消息后,会根据自身需要进行后续处理。以AbstractUdfStreamOperator为例,它收到后,会判断自定义Function是否也实现了CheckpointListener接口。如果自定义Function也实现了该接口,就将“Checkpoint完成”的消息传递给自定义Function,让Function去处理它

/*** AbstractUdfStreamOperator收到“Checkpoint完成”的消息后,会判断自定义Function是否实现了CheckpointListener。* 如果实现了,就将收到的“Checkpoint完成”的消息通知给自定义Function自行处理。例如:FlinkKafkaConsumerBase会将Offset提交至Kafka集群*/
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);// 如果当前StreamOperator内部“合法持有”的自定义函数也实现了CheckpointListener接口,就顺带通知自定义函数// 比如FlinkKafkaConsumerBase会获取到这条消息,将offset提交到Kafka集群,确保消费的数据已经完全处理if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}

相关内容

热门资讯

英杰电气:公司暂无应用于机器人... 12月29日消息,英杰电气在互动平台表示,公司暂无应用于机器人领域的电源产品。(科股宝播报)
光伏概念震荡走强,国晟科技涨停... 12月29日消息,午后光伏概念震荡走强,异质结、钙钛矿电池方向领涨,国晟科技涨停创历史新高,此前迈为...
现货钯金日内暴跌10% 12月29日消息,现货钯金日内暴跌10%,现报1712.93美元/盎司。(广角观察)
沪深两市成交额突破2万亿 12月29日消息,数据显示,沪深两市成交额连续第2个交易日突破2万亿,较上一日此时缩量超200亿,预...
极兔速递:拟收购非全资附属公司... 12月28日消息,极兔速递港交所公告,公司于12月25日订立两份股份转让协议,根据协议,公司附属公司...
公募主动权益基金年内最高收益率... 12月29日消息,截至12月28日,公募主动权益基金年内最高收益率达到236.88%,不仅锁定了年度...
抢滩2026年,47只新基金整... 12月29日消息,据统计,2026年元旦假期后计划启动发行的基金数量达到47只(不同份额分开计算)。...
黑龙江开行首趟冰雪环线旅游列车 12月29日消息,日前,在哈尔滨开往亚布力的Y999/998次“悠享龙江·银旅号”旅游列车上,来自江...
现货铂金短线暴跌逾100美元,... 12月29日消息,现货铂金短线暴跌逾100美元,跌破2350美元/盎司,跌幅超7%。现货钯金跌近7%...
俄罗斯发射“一箭52星” 12月28日消息,携带2颗“Aist-2T”航天器和50颗小型卫星的俄罗斯“联盟-2.1b”运载火箭...