当StreamTask中的Runnable任务中的OperatorSnapshotFutures执行完成后,就要将ACK消息发送给TaskStateManager。
/*** 整个Checkpoint操作完成后,发送ACK消息给TaskStateManager。*/
private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot,TaskStateSnapshot localTaskStateSnapshot,long asyncDurationMillis) {// TaskStateManager是专门用来提供报告和检索任务状态的接口TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();// 省略部分代码...// 对外提供报告(所属Task中运行的Operator实例的状态快照),TaskStateManager的实现子类TaskStateManagerImpl提供了具体实现逻辑taskStateManager.reportTaskStateSnapshots(checkpointMetaData,checkpointMetrics,hasAckState ? acknowledgedTaskStateSnapshot : null,hasLocalState ? localTaskStateSnapshot : null);// 省略部分代码...
}
TaskStateManager收到ACK消息后,将其传递给CheckpointResponder
/*** TaskStateManager将ACK消息传递给CheckpointResponder*/
@Override
public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);// 通过CheckpointResponder接口,将ack消息发送出去(实现子类RpcCheckpointResponder,提供了具体的实现方法)checkpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);
}
CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,它的子接口是JobMasterGateway,子接口的实现就是JobMaster
/*** CheckpointResponder将ACK消息传递给CheckpointCoordinatorGateway,也就是JobMaster*/
@Override
public void acknowledgeCheckpoint(JobID jobID,ExecutionAttemptID executionAttemptID,long checkpointId,CheckpointMetrics checkpointMetrics,TaskStateSnapshot subtaskState) {// 通过CheckpointCoordinatorGateway接口发送ack消息,接口的子接口是JobMasterGateway接口,基本实现就是JobMastercheckpointCoordinatorGateway.acknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,subtaskState);
}
于是,ACK消息就名正言顺的到了JobMaster手里。
/*** 经过层层接口、实现子类,StreamTask最终将执行Checkpoint形成State Snapshot的ACK消息传递给了JobMaster。* 而JobMaster会将其通知给schedulerNG*/
@Override
public void acknowledgeCheckpoint(final JobID jobID,final ExecutionAttemptID executionAttemptID,final long checkpointId,final CheckpointMetrics checkpointMetrics,final TaskStateSnapshot checkpointState) {// JobMaster将ACK消息传递给SchedulerNG调度器(默认实现为SchedulerBase)schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
JobMaster收到ACK消息后,会传递给SchedulerNG。SchedulerNG收到后,会将ACK消息包装成AcknowledgeCheckpoint,并将其交给传递给CheckpointCoordinator组件处理。
/*** JobMaster将执行Checkpoint形成State Snapshot的ACK消息传递给SchedulerNG调度器*/
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {mainThreadExecutor.assertRunningInMainThread();final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();// 将ACK消息包装成AcknowledgeCheckpointfinal AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,checkpointState);final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);if (checkpointCoordinator != null) {ioExecutor.execute(() -> {try {// 调度器包装后的ack,会被传递给CheckpointCoordinator组件继续处理checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);} catch (Throwable t) {log.warn("Error while processing checkpoint acknowledgement message", t);}});} else {String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";if (executionGraph.getState() == JobStatus.RUNNING) {log.error(errorMessage, jobGraph.getJobID());} else {log.debug(errorMessage, jobGraph.getJobID());}}
}
CheckpointCoordinator收到AcknowledgeCheckpoint后,会根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpoint,并判断是否收到了ACK消息。如果确认收到,那就将这个PendingCheckpoint转换为CompletedCheckpoint
/*** Checkpoint后的ACK消息被包装成AcknowledgeCheckpoint后,SchedulerNG会将其发送给CheckpointCoordinator处理。*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {// 省略部分代码...final long checkpointId = message.getCheckpointId();synchronized (lock) {if (shutdown) {return false;}// 根据checkpointId,从映射关系为“checkpointId:PendingCheckpoint”的Map集合中取出对应的PendingCheckpointfinal PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if (checkpoint != null && !checkpoint.isDiscarded()) {switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {case SUCCESS:LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);// 确认收到了ACK消息if (checkpoint.areTasksFullyAcknowledged()) {// 完成当前PendingCheckpoint操作,并从容纳它的Map集合中removecompletePendingCheckpoint(checkpoint);}break;// 省略部分代码...}return true;}// 省略部分代码...}
}
完成PendingCheckpoint操作,有2个核心操作:
/*** CheckpointCoordinator收到Task实例的ack消息后,会触发并完成Checkpoint操作:将PendingCheckpoint转换为CompletedCheckpoint*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;Map operatorStates = pendingCheckpoint.getOperatorStates();// SharedStateRegistry作为注册中心,状态会注册至此sharedStateRegistry.registerAll(operatorStates.values());try {try {/*** 完成当前的PendingCheckpoint操作,并生成CompletedCheckpoint。如果出现异常会throw new CheckpointException*/completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {if (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {// 将CompletedCheckpoint添加到“已完成Checkpoint”的集合中completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果添加到“已完成Checkpoint”的集合的过程中出现异常,就进行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最终,都会移除PendingCheckpoint集合中对应的那个PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 省略部分代码.../*** 遍历“当Checkpoint被确认时,需要发送消息的任务”的ExecutionVertex[]数组,* 向所有的Execution发送“CheckpointComplete消息”,通知Task实例本次Checkpoint操作已完成*/for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {/*** 通过Execution向TaskManagerGateway发送“CheckpointComplete消息”,通知所有的Task实例:本次Checkpoint操作结束。* TaskExecutor收到“CheckpointComplete消息”后,会从TaskSlotTable中取出对应的Task实例,并向其发送“CheckpointComplete消息”。* 所有实现了CheckpointListener监听器的观察者,在收到Checkpoint完成的消息后,都会进行各自的处理。*/ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}
当TaskExecutor收到“CheckpointComplete消息”后,会将其传递到指定的Task实例中。
/*** 将“CheckpointComplete消息”传递给TaskExecutor*/
public void notifyCheckpointComplete(long checkpointId, long timestamp) {final LogicalSlot slot = assignedResource;if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();// 利用TaskManagerGateway接口向TaskExecutor发送消息:通知所有的Task实例,本次Checkpoint操作结束// TaskManagerGateway接口的实现子类为RpcTaskManagerGatewaytaskManagerGateway.notifyCheckpointComplete(attemptId, getVertex().getJobId(), checkpointId, timestamp);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}
}/*** CheckpointCoordinator组件通过Execution,利用TaskManagerGateway接口发送确认消息,* 而TaskManagerGateway会利用TaskExecutorGateway,进一步传递ack。* 正好,TaskExecutor就是TaskExecutorGateway接口的实现子类,so,ack就传递给TaskExecutor了*/
@Override
public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {// 利用TaskExecutorGateway,进一步传递ack。// 这一步,就将ack传递给TaskExecutor了taskExecutorGateway.confirmCheckpoint(executionAttemptID, checkpointId, timestamp);
}/*** 经过层层接口、层层RPC调用,TaskExecutor最终收到了Checkpoint完成的ack消息。* TaskExecutor会将此消息转发给对应的Task实例中*/
@Override
public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp) {log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);// 从TaskSlotTable中取出对应的Task实例final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {// 将“Checkpoint完成”的消息,发送给指定的Task实例。// 所有实现了CheckpointListener接口的组件、算子,都会收到该消息,然后各自完成各自的后续操作task.notifyCheckpointComplete(checkpointId);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));}
}
Task收到“Checkpoint完成”的消息后,会将其传递给StreamTask中。
/*** TaskExecutor会将“Checkpoint完成”的消息,发送给指定的Task实例。* 而Task实例会将其通知给StreamTask中的算子*/
@Override
public void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {try {// 将(Checkpoint完成)的消息,发送给StreamTask中的StreamOperator// StreamTask为抽象类AbstractInvokable,提供了具体的实现逻辑invokable.notifyCheckpointCompleteAsync(checkpointID);}catch (RejectedExecutionException ex) {LOG.debug("Notify checkpoint complete {} for {} ({}) was rejected by the mailbox",checkpointID, taskNameWithSubtask, executionId);}catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}
}/*** Task实例将“Checkpoint完成”的消息,发送给StreamTask,* StreamTask会将notifyCheckpointComplete()提交到Mailbox中,以“获取、执行的优先级最高”的级别,使其在MailboxProcessor中运行*/
@Override
public Future notifyCheckpointCompleteAsync(long checkpointId) {// 将notifyCheckpointComplete()方法体转换成RunnableWithException线程后,放到MailboxProcessor中运行// 该线程的执行权限:最高!return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(// 核心逻辑:将“Checkpoint完成”的消息,发送给StreamTask内的Operator() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}
最后,由StreamTask负责通知给OperatorChain中的各个StreamOperator
/*** 将“Checkpoint完成”的消息,发送给StreamTask中的OperatorChain内的每一个StreamOperator*/
private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 遍历OperatorChain中的所有Operatorfor (StreamOperator> operator : operatorChain.getAllOperators()) {if (operator != null) {// 将“Checkpoint完成”的消息,发送给每个Operatoroperator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 将消息通知给TaskStateManagergetEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,直接完成当前Task,并终止Task实例if (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.// 将syncSavepointId置空resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}
StreamOperator收到“Checkpoint完成”的消息后,会根据自身需要进行后续处理。以AbstractUdfStreamOperator为例,它收到后,会判断自定义Function是否也实现了CheckpointListener接口。如果自定义Function也实现了该接口,就将“Checkpoint完成”的消息传递给自定义Function,让Function去处理它
/*** AbstractUdfStreamOperator收到“Checkpoint完成”的消息后,会判断自定义Function是否实现了CheckpointListener。* 如果实现了,就将收到的“Checkpoint完成”的消息通知给自定义Function自行处理。例如:FlinkKafkaConsumerBase会将Offset提交至Kafka集群*/
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);// 如果当前StreamOperator内部“合法持有”的自定义函数也实现了CheckpointListener接口,就顺带通知自定义函数// 比如FlinkKafkaConsumerBase会获取到这条消息,将offset提交到Kafka集群,确保消费的数据已经完全处理if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}