源码解析Checkpoint后续收尾流程
创始人
2025-05-29 13:24:25
0

当StreamTask中的Runnable任务中的OperatorSnapshotFutures执行完成后,就要将ACK消息发送给TaskStateManager。

/*** 整个Checkpoint操作完成后,发送ACK消息给TaskStateManager。*/
private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot,TaskStateSnapshot localTaskStateSnapshot,long asyncDurationMillis) {// TaskStateManager是专门用来提供报告和检索任务状态的接口TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();// 省略部分代码...// 对外提供报告(所属Task中运行的Operator实例的状态快照),TaskStateManager的实现子类TaskStateManagerImpl提供了具体实现逻辑taskStateManager.reportTaskStateSnapshots(checkpointMetaData,checkpointMetrics,hasAckState ? acknowledgedTaskStateSnapshot : null,hasLocalState ? localTaskStateSnapshot : null);// 省略部分代码...
}

TaskStateManager收到ACK消息后,将其传递给CheckpointResponder

/*** TaskStateManager将ACK消息传递给CheckpointResponder*/
@Override
public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);// 通过CheckpointResponder接口,将ack消息发送出去(实现子类RpcCheckpointResponder,提供了具体的实现方法)checkpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);
}

CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,它的子接口是JobMasterGateway,子接口的实现就是JobMaster

/*** CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,也就是JobMaster*/
@Override
public void acknowledgeCheckpoint(JobID jobID,ExecutionAttemptID executionAttemptID,long checkpointId,CheckpointMetrics checkpointMetrics,TaskStateSnapshot subtaskState) {// 通过CheckpointCoordinatorGateway接口发送ack消息,接口的子接口是JobMasterGateway接口,基本实现就是JobMastercheckpointCoordinatorGateway.acknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,subtaskState);
}

于是,ACK消息就名正言顺的到了JobMaster手里。

/*** 经过层层接口、实现子类,StreamTask最终将执行Checkpoint形成State Snapshot的ACK消息传递给了JobMaster。* 而JobMaster会将其通知给schedulerNG*/
@Override
public void acknowledgeCheckpoint(final JobID jobID,final ExecutionAttemptID executionAttemptID,final long checkpointId,final CheckpointMetrics checkpointMetrics,final TaskStateSnapshot checkpointState) {// JobMaster将ACK消息传递给SchedulerNG调度器(默认实现为SchedulerBase)schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}

JobMaster收到ACK消息后,会传递给SchedulerNG。SchedulerNG收到后,会将ACK消息包装成AcknowledgeCheckpoint,并将其交给传递给CheckpointCoordinator组件处理。

/*** JobMaster将执行Checkpoint形成State Snapshot的ACK消息传递给SchedulerNG调度器*/
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {mainThreadExecutor.assertRunningInMainThread();final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();// 将ACK消息包装成AcknowledgeCheckpointfinal AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,checkpointState);final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);if (checkpointCoordinator != null) {ioExecutor.execute(() -> {try {// 调度器包装后的ack,会被传递给CheckpointCoordinator组件继续处理checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);} catch (Throwable t) {log.warn("Error while processing checkpoint acknowledgement message", t);}});} else {String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";if (executionGraph.getState() == JobStatus.RUNNING) {log.error(errorMessage, jobGraph.getJobID());} else {log.debug(errorMessage, jobGraph.getJobID());}}
}

CheckpointCoordinator收到AcknowledgeCheckpoint后,会根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpoint,并判断是否收到了ACK消息。如果确认收到,那就将这个PendingCheckpoint转换为CompletedCheckpoint

/*** Checkpoint后的ACK消息被包装成AcknowledgeCheckpoint后,SchedulerNG会将其发送给CheckpointCoordinator处理。*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {// 省略部分代码...final long checkpointId = message.getCheckpointId();synchronized (lock) {if (shutdown) {return false;}// 根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpointfinal PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if (checkpoint != null && !checkpoint.isDiscarded()) {switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {case SUCCESS:LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);// 确认收到了ACK消息if (checkpoint.areTasksFullyAcknowledged()) {// 完成当前PendingCheckpoint操作,并从容纳它的Map集合中removecompletePendingCheckpoint(checkpoint);}break;// 省略部分代码...}return true;}// 省略部分代码...}
}

完成PendingCheckpoint操作,有2个核心操作:

  • 1.生成CompletedCheckpoint并添加到集合中
  • 2.让每个Execution都向TaskManagerGateway发送“CheckpointComplete消息”,通知所有的Task实例:本次Checkpoint操作结束
/*** CheckpointCoordinator收到Task实例的ack消息后,会触发并完成Checkpoint操作:将PendingCheckpoint转换为CompletedCheckpoint*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;Map operatorStates = pendingCheckpoint.getOperatorStates();// SharedStateRegistry作为注册中心,状态会注册至此sharedStateRegistry.registerAll(operatorStates.values());try {try {/*** 完成当前的PendingCheckpoint操作,并生成CompletedCheckpoint。如果出现异常会throw new CheckpointException*/completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {if (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {// 将CompletedCheckpoint添加到“已完成Checkpoint”的集合中completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果添加到“已完成Checkpoint”的集合的过程中出现异常,就进行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最终,都会移除PendingCheckpoint集合中对应的那个PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 省略部分代码.../*** 遍历“当Checkpoint被确认时,需要发送消息的任务”的ExecutionVertex[]数组,* 向所有的Execution发送“CheckpointComplete消息”,通知Task实例本次Checkpoint操作已完成*/for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {/*** 通过Execution向TaskManagerGateway发送“CheckpointComplete消息”,通知所有的Task实例:本次Checkpoint操作结束。* TaskExecutor收到“CheckpointComplete消息”后,会从TaskSlotTable中取出对应的Task实例,并向其发送“CheckpointComplete消息”。* 所有实现了CheckpointListener监听器的观察者,在收到Checkpoint完成的消息后,都会进行各自的处理。*/ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}

当TaskExecutor收到“CheckpointComplete消息”后,会将其传递到指定的Task实例中。

/*** 将“CheckpointComplete消息”传递给TaskExecutor*/
public void notifyCheckpointComplete(long checkpointId, long timestamp) {final LogicalSlot slot = assignedResource;if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();// 利用TaskManagerGateway接口向TaskExecutor发送消息:通知所有的Task实例,本次Checkpoint操作结束// TaskManagerGateway接口的实现子类为RpcTaskManagerGatewaytaskManagerGateway.notifyCheckpointComplete(attemptId, getVertex().getJobId(), checkpointId, timestamp);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}
}/*** CheckpointCoordinator组件通过Execution,利用TaskManagerGateway接口发送确认消息,* 而TaskManagerGateway会利用TaskExecutorGateway,进一步传递ack。* 正好,TaskExecutor就是TaskExecutorGateway接口的实现子类,so,ack就传递给TaskExecutor了*/
@Override
public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {// 利用TaskExecutorGateway,进一步传递ack。// 这一步,就将ack传递给TaskExecutor了taskExecutorGateway.confirmCheckpoint(executionAttemptID, checkpointId, timestamp);
}/*** 经过层层接口、层层RPC调用,TaskExecutor最终收到了Checkpoint完成的ack消息。* TaskExecutor会将此消息转发给对应的Task实例中*/
@Override
public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp) {log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);// 从TaskSlotTable中取出对应的Task实例final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {// 将“Checkpoint完成”的消息,发送给指定的Task实例。// 所有实现了CheckpointListener接口的组件、算子,都会收到该消息,然后各自完成各自的后续操作task.notifyCheckpointComplete(checkpointId);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));}
}

Task收到“Checkpoint完成”的消息后,会将其传递给StreamTask中。

/*** TaskExecutor会将“Checkpoint完成”的消息,发送给指定的Task实例。* 而Task实例会将其通知给StreamTask中的算子*/
@Override
public void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {try {// 将(Checkpoint完成)的消息,发送给StreamTask中的StreamOperator// StreamTask为抽象类AbstractInvokable,提供了具体的实现逻辑invokable.notifyCheckpointCompleteAsync(checkpointID);}catch (RejectedExecutionException ex) {LOG.debug("Notify checkpoint complete {} for {} ({}) was rejected by the mailbox",checkpointID, taskNameWithSubtask, executionId);}catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}
}/*** Task实例将“Checkpoint完成”的消息,发送给StreamTask,* StreamTask会将notifyCheckpointComplete()提交到Mailbox中,以“获取、执行的优先级最高”的级别,使其在MailboxProcessor中运行*/
@Override
public Future notifyCheckpointCompleteAsync(long checkpointId) {// 将notifyCheckpointComplete()方法体转换成RunnableWithException线程后,放到MailboxProcessor中运行// 该线程的执行权限:最高!return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(// 核心逻辑:将“Checkpoint完成”的消息,发送给StreamTask内的Operator() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}

最后,由StreamTask负责通知给OperatorChain中的各个StreamOperator

/*** 将“Checkpoint完成”的消息,发送给StreamTask中的OperatorChain内的每一个StreamOperator*/
private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 遍历OperatorChain中的所有Operatorfor (StreamOperator operator : operatorChain.getAllOperators()) {if (operator != null) {// 将“Checkpoint完成”的消息,发送给每个Operatoroperator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 将消息通知给TaskStateManagergetEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,直接完成当前Task,并终止Task实例if (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.// 将syncSavepointId置空resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}

StreamOperator收到“Checkpoint完成”的消息后,会根据自身需要进行后续处理。以AbstractUdfStreamOperator为例,它收到后,会判断自定义Function是否也实现了CheckpointListener接口。如果自定义Function也实现了该接口,就将“Checkpoint完成”的消息传递给自定义Function,让Function去处理它

/*** AbstractUdfStreamOperator收到“Checkpoint完成”的消息后,会判断自定义Function是否实现了CheckpointListener。* 如果实现了,就将收到的“Checkpoint完成”的消息通知给自定义Function自行处理。例如:FlinkKafkaConsumerBase会将Offset提交至Kafka集群*/
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);// 如果当前StreamOperator内部“合法持有”的自定义函数也实现了CheckpointListener接口,就顺带通知自定义函数// 比如FlinkKafkaConsumerBase会获取到这条消息,将offset提交到Kafka集群,确保消费的数据已经完全处理if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}

相关内容

热门资讯

日本本州东部附近海域发生6.0... 据中国地震台网正式测定,12月31日22时26分在日本本州东部附近海域发生6.0级地震,震源深度20...
德马科技:违规使用募投金支付土... 12月31日,德马科技(688360.SH)发布关于收到浙江证监局行政监管措施决定书的公告。公告称,...
全国三成“喵星人”超重,700... “猪咪”听到这消息,会高兴吗?作者 | 方璐编辑丨于婞来源 | 野马财经家养肥猫想抬腿挠痒,跷起后腿...
油价继续跌,“中国海油”有风险... 看上石油的,基本上只有两类人,一类是吃股息的,另一类是赌周期的,或者两者兼顾吧。在中国的股市,能源确...
18家顶级资本认购,天数智芯启... 打造完善生态体系,80%募资继续加码研发。作者 | 孙晓编辑丨高远山来源 | 野马财经又一家国产通用...
2025年收官!北证50全年涨... 新京报贝壳财经讯(记者黄鑫宇)12月31日,北证市场迎来了2025年收官日。Wind显示,2025年...
创新科技金融服务驱动“科技—产... 2026年是“十五五”开局之年。坚持创新驱动,加紧培育壮大新动能,是2026年经济工作要抓好的重点任...
Kimi杨植麟:账上现金超10... 文 / 郭静来源 / 节点财经2025 年的岁末,当智谱 AI 与 MiniMax 相继在港股启动招...
资本市场投融资改革“动刀” |... 从“深化”到“持续深化”,资本市场投融资综合改革已连续两年跻身中央经济工作会议重点任务清单。这一表述...
基金经理转岗研究员,长城基金翁... 文 | 刘振涛长城基金一则基金经理卸任公告引来市场关注。近日,长城基金发布公告,旗下的长城久源灵活配...