当前位置:首页 » 《随便一记》 » 正文

【Flink源码】再谈Flink程序提交流程(中)

8 人参与  2022年11月07日 15:13  分类 : 《随便一记》  评论

点击全文阅读


书接上回,【Flink源码】再谈 Flink 程序提交流程(上) 一文中我们已经将程序从客户端提交给了 ResourceManager
接下来我们就去 ResourceManager 中一探究竟


创建 Dispatcher、ResourceManager

YarnJobClusterEntrypoint 类是 Yarn per-job 集群的入口,包含了我们想看的 main 方法

YarnJobClusterEntrypoint.java

public static void main(String[] args) {    LOG.warn(            "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");    // startup checks and logging    EnvironmentInformation.logEnvironmentInfo(            LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);    SignalHandler.register(LOG);    JvmShutdownSafeguard.installAsShutdownHook(LOG);    Map<String, String> env = System.getenv();    final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());    Preconditions.checkArgument(            workingDirectory != null,            "Working directory variable (%s) not set",            ApplicationConstants.Environment.PWD.key());    try {        YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);    } catch (IOException e) {        LOG.warn("Could not log YARN environment information.", e);    }    final Configuration dynamicParameters =            ClusterEntrypointUtils.parseParametersOrExit(                    args,                    new DynamicParametersConfigurationParserFactory(),                    YarnJobClusterEntrypoint.class);    final Configuration configuration =            YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);    YarnJobClusterEntrypoint yarnJobClusterEntrypoint =            new YarnJobClusterEntrypoint(configuration);    ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);}

main 方法中通过 ClusterEntrypoint.runClusterEntrypoint 方法以 YarnJobClusterEntrypoint 对象为参数加载运行入口

ClusterEntrypoint.java

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {    final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();    try {        clusterEntrypoint.startCluster();    } catch (ClusterEntrypointException e) {        LOG.error(                String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),                e);        System.exit(STARTUP_FAILURE_RETURN_CODE);    }    int returnCode;    Throwable throwable = null;    try {        returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();    } catch (Throwable e) {        throwable = ExceptionUtils.stripExecutionException(e);        returnCode = RUNTIME_FAILURE_RETURN_CODE;    }    LOG.info(            "Terminating cluster entrypoint process {} with exit code {}.",            clusterEntrypointName,            returnCode,            throwable);    System.exit(returnCode);}public void startCluster() throws ClusterEntrypointException {    LOG.info("Starting {}.", getClass().getSimpleName());    try {        FlinkSecurityManager.setFromConfiguration(configuration);        PluginManager pluginManager =                PluginUtils.createPluginManagerFromRootFolder(configuration);        configureFileSystems(configuration, pluginManager);        SecurityContext securityContext = installSecurityContext(configuration);        ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);        securityContext.runSecured(                (Callable<Void>)                        () -> {                            runCluster(configuration, pluginManager);                            return null;                        });    } catch (Throwable t) {        final Throwable strippedThrowable =                ExceptionUtils.stripException(t, UndeclaredThrowableException.class);        try {            // clean up any partial state            shutDownAsync(                            ApplicationStatus.FAILED,                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,                            ExceptionUtils.stringifyException(strippedThrowable),                            false)                    .get(                            INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),                            TimeUnit.MILLISECONDS);        } catch (InterruptedException | ExecutionException | TimeoutException e) {            strippedThrowable.addSuppressed(e);        }        throw new ClusterEntrypointException(                String.format(                        "Failed to initialize the cluster entrypoint %s.",                        getClass().getSimpleName()),                strippedThrowable);    }}private void runCluster(Configuration configuration, PluginManager pluginManager)        throws Exception {    synchronized (lock) {        initializeServices(configuration, pluginManager);        // write host information into configuration        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());        // 创建 dispatcher、ResourceManager 对象的工厂类        // 其中有从本地重新构建 JobGraph 的过程        final DispatcherResourceManagerComponentFactory                dispatcherResourceManagerComponentFactory =                        createDispatcherResourceManagerComponentFactory(configuration);        // 通过工厂类创建 dispatcher、ResourceManager 对象        // Entry 启动 RpcService、HAService、BlobServer、HeartbeatService、MetricRegistry、ExecutionGraphStore 等        clusterComponent =                dispatcherResourceManagerComponentFactory.create(                        configuration,                        resourceId.unwrap(),                        ioExecutor,                        commonRpcService,                        haServices,                        blobServer,                        heartbeatServices,                        delegationTokenManager,                        metricRegistry,                        executionGraphInfoStore,                        new RpcMetricQueryServiceRetriever(                                metricRegistry.getMetricQueryServiceRpcService()),                        this);        clusterComponent                .getShutDownFuture()                .whenComplete(                        (ApplicationStatus applicationStatus, Throwable throwable) -> {                            if (throwable != null) {                                shutDownAsync(                                        ApplicationStatus.UNKNOWN,                                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,                                        ExceptionUtils.stringifyException(throwable),                                        false);                            } else {                                // This is the general shutdown path. If a separate more                                // specific shutdown was                                // already triggered, this will do nothing                                shutDownAsync(                                        applicationStatus,                                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,                                        null,                                        true);                            }                        });    }}

我们继续看创建过程
找到 DispatcherResourceManagerComponentFactory 接口的实现类 DefaultDispatcherResourceManagerComponentFactory

DefaultDispatcherResourceManagerComponentFactory.java

public DispatcherResourceManagerComponent create(        Configuration configuration,        ResourceID resourceId,        Executor ioExecutor,        RpcService rpcService,        HighAvailabilityServices highAvailabilityServices,        BlobServer blobServer,        HeartbeatServices heartbeatServices,        DelegationTokenManager delegationTokenManager,        MetricRegistry metricRegistry,        ExecutionGraphInfoStore executionGraphInfoStore,        MetricQueryServiceRetriever metricQueryServiceRetriever,        FatalErrorHandler fatalErrorHandler)        throws Exception {    LeaderRetrievalService dispatcherLeaderRetrievalService = null;    LeaderRetrievalService resourceManagerRetrievalService = null;    WebMonitorEndpoint<?> webMonitorEndpoint = null;    ResourceManagerService resourceManagerService = null;    DispatcherRunner dispatcherRunner = null;    try {        dispatcherLeaderRetrievalService =                highAvailabilityServices.getDispatcherLeaderRetriever();        resourceManagerRetrievalService =                highAvailabilityServices.getResourceManagerLeaderRetriever();        final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =                new RpcGatewayRetriever<>(                        rpcService,                        DispatcherGateway.class,                        DispatcherId::fromUuid,                        new ExponentialBackoffRetryStrategy(                                12, Duration.ofMillis(10), Duration.ofMillis(50)));        final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =                new RpcGatewayRetriever<>(                        rpcService,                        ResourceManagerGateway.class,                        ResourceManagerId::fromUuid,                        new ExponentialBackoffRetryStrategy(                                12, Duration.ofMillis(10), Duration.ofMillis(50)));        final ScheduledExecutorService executor =                WebMonitorEndpoint.createExecutorService(                        configuration.getInteger(RestOptions.SERVER_NUM_THREADS),                        configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),                        "DispatcherRestEndpoint");        final long updateInterval =                configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);        final MetricFetcher metricFetcher =                updateInterval == 0                        ? VoidMetricFetcher.INSTANCE                        : MetricFetcherImpl.fromConfiguration(                                configuration,                                metricQueryServiceRetriever,                                dispatcherGatewayRetriever,                                executor);                // 创建接收前端 Rest 请求的节点        webMonitorEndpoint =                restEndpointFactory.createRestEndpoint(                        configuration,                        dispatcherGatewayRetriever,                        resourceManagerGatewayRetriever,                        blobServer,                        executor,                        metricFetcher,                        highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),                        fatalErrorHandler);        log.debug("Starting Dispatcher REST endpoint.");        webMonitorEndpoint.start();        final String hostname = RpcUtils.getHostname(rpcService);                // 创建 ResourceManager 对象,返回的是 new YarnResourceManager        // 调度过程:AbstractDispatcherResourceManagerComponentFactory        //                  -> ActiveResourceManagerFactory        //                  -> YarnResourceManagerFactory        resourceManagerService =                ResourceManagerServiceImpl.create(                        resourceManagerFactory,                        configuration,                        resourceId,                        rpcService,                        highAvailabilityServices,                        heartbeatServices,                        delegationTokenManager,                        fatalErrorHandler,                        new ClusterInformation(hostname, blobServer.getPort()),                        webMonitorEndpoint.getRestBaseUrl(),                        metricRegistry,                        hostname,                        ioExecutor);        final HistoryServerArchivist historyServerArchivist =                HistoryServerArchivist.createHistoryServerArchivist(                        configuration, webMonitorEndpoint, ioExecutor);        final DispatcherOperationCaches dispatcherOperationCaches =                new DispatcherOperationCaches(                        configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));        final PartialDispatcherServices partialDispatcherServices =                new PartialDispatcherServices(                        configuration,                        highAvailabilityServices,                        resourceManagerGatewayRetriever,                        blobServer,                        heartbeatServices,                        () ->                                JobManagerMetricGroup.createJobManagerMetricGroup(                                        metricRegistry, hostname),                        executionGraphInfoStore,                        fatalErrorHandler,                        historyServerArchivist,                        metricRegistry.getMetricQueryServiceGatewayRpcAddress(),                        ioExecutor,                        dispatcherOperationCaches);                // 创建 dispatcherRunner 对象并启动        log.debug("Starting Dispatcher.");        dispatcherRunner =                dispatcherRunnerFactory.createDispatcherRunner(                        highAvailabilityServices.getDispatcherLeaderElectionService(),                        fatalErrorHandler,                        new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),                        ioExecutor,                        rpcService,                        partialDispatcherServices);                // 启动 ResourceManager        log.debug("Starting ResourceManagerService.");        resourceManagerService.start();        resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);        dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);        return new DispatcherResourceManagerComponent(                dispatcherRunner,                resourceManagerService,                dispatcherLeaderRetrievalService,                resourceManagerRetrievalService,                webMonitorEndpoint,                fatalErrorHandler,                dispatcherOperationCaches);    } catch (Exception exception) {        // clean up all started components        if (dispatcherLeaderRetrievalService != null) {            try {                dispatcherLeaderRetrievalService.stop();            } catch (Exception e) {                exception = ExceptionUtils.firstOrSuppressed(e, exception);            }        }        if (resourceManagerRetrievalService != null) {            try {                resourceManagerRetrievalService.stop();            } catch (Exception e) {                exception = ExceptionUtils.firstOrSuppressed(e, exception);            }        }        final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);        if (webMonitorEndpoint != null) {            terminationFutures.add(webMonitorEndpoint.closeAsync());        }        if (resourceManagerService != null) {            terminationFutures.add(resourceManagerService.closeAsync());        }        if (dispatcherRunner != null) {            terminationFutures.add(dispatcherRunner.closeAsync());        }        final FutureUtils.ConjunctFuture<Void> terminationFuture =                FutureUtils.completeAll(terminationFutures);        try {            terminationFuture.get();        } catch (Exception e) {            exception = ExceptionUtils.firstOrSuppressed(e, exception);        }        throw new FlinkException(                "Could not create the DispatcherResourceManagerComponent.", exception);    }}

至此,我们找到了 dispatcher、ResouceManager 创建和启动方法,接下来我们有必要深入看看具体的过程

创建 YarnResourceManager

首先我们看 YarnResourceManager 创建过程

ResourceManagerFactory.java

public ResourceManager<T> createResourceManager(        ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {    final ResourceManagerRuntimeServices resourceManagerRuntimeServices =            createResourceManagerRuntimeServices(                    context.getRmRuntimeServicesConfig(),                    context.getRpcService(),                    context.getHighAvailabilityServices(),                    SlotManagerMetricGroup.create(                            context.getMetricRegistry(), context.getHostname()));    return createResourceManager(            context.getRmConfig(),            context.getResourceId(),            context.getRpcService(),            leaderSessionId,            context.getHeartbeatServices(),            context.getDelegationTokenManager(),            context.getFatalErrorHandler(),            context.getClusterInformation(),            context.getWebInterfaceUrl(),            ResourceManagerMetricGroup.create(                    context.getMetricRegistry(), context.getHostname()),            resourceManagerRuntimeServices,            context.getIoExecutor());}

这里的 createResourceManager 是一个抽象方法,我们找到 ResourceManagerFactory 的 Yarn 实现类 YarnResourceManagerFactory

YarnResourceManagerFactory.java

public ResourceManager<YarnWorkerNode> createResourceManager( Configuration configuration,        ResourceID resourceId,        RpcService rpcService,        HighAvailabilityServices highAvailabilityServices,        HeartbeatServices heartbeatServices,        FatalErrorHandler fatalErrorHandler,        ClusterInformation clusterInformation,        @Nullable String webInterfaceUrl,        ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) {        return new YarnResourceManager( rpcService,        resourceId,        configuration,        System.getenv(),        highAvailabilityServices,        heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation,        fatalErrorHandler, webInterfaceUrl, resourceManagerMetricGroup);}

创建 YarnResourceManager 时,创建了 SlotManager
我们再继续看一下 SlotManager 是如何创建的

ResourceManagerFactory.java

private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(        ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,        RpcService rpcService,        HighAvailabilityServices highAvailabilityServices,        SlotManagerMetricGroup slotManagerMetricGroup) {        return ResourceManagerRuntimeServices.fromConfiguration(                rmRuntimeServicesConfig,                highAvailabilityServices,                rpcService.getScheduledExecutor(),                slotManagerMetricGroup);}

ResourceManagerRuntimeServices.java

public static ResourceManagerRuntimeServices fromConfiguration(            ResourceManagerRuntimeServicesConfiguration configuration,            HighAvailabilityServices highAvailabilityServices,            ScheduledExecutor scheduledExecutor,            SlotManagerMetricGroup slotManagerMetricGroup) {        final SlotManager slotManager =                createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);        final JobLeaderIdService jobLeaderIdService =                new DefaultJobLeaderIdService(                        highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());        return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);}

到这里,我们找到了创建 YarnResouceManager 的方法 createSlotManager

创建并启动 Dispatcher

接下来我们看 Dispatcher 的创建和启动过程
找到接口 DispatcherRunnerFactory 的实现类 DefaultDispatcherRunnerFactory

DefaultDispatcherRunnerFactory.java

public DispatcherRunner createDispatcherRunner(        LeaderElectionService leaderElectionService,        FatalErrorHandler fatalErrorHandler,        JobPersistenceComponentFactory jobPersistenceComponentFactory,        Executor ioExecutor,        RpcService rpcService,        PartialDispatcherServices partialDispatcherServices)        throws Exception {        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =                dispatcherLeaderProcessFactoryFactory.createFactory(                        jobPersistenceComponentFactory,                        ioExecutor,                        rpcService,                        partialDispatcherServices,                        fatalErrorHandler);        return DefaultDispatcherRunner.create(                leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);}

再看 DefaultDispatcherRunner 类

DefaultDispatcherRunner.java

public static DispatcherRunner create(            LeaderElectionService leaderElectionService,            FatalErrorHandler fatalErrorHandler,            DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)            throws Exception {        final DefaultDispatcherRunner dispatcherRunner =                new DefaultDispatcherRunner(                        leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);        return DispatcherRunnerLeaderElectionLifecycleManager.createFor(                dispatcherRunner, leaderElectionService);}

DispatcherRunnerLeaderElectionLifecycleManager.java

public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {        return new DispatcherRunnerLeaderElectionLifecycleManager<>(                dispatcherRunner, leaderElectionService);}private DispatcherRunnerLeaderElectionLifecycleManager(            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {        this.dispatcherRunner = dispatcherRunner;        this.leaderElectionService = leaderElectionService;                // 启动 dispatcher 的 leader 选举        leaderElectionService.start(dispatcherRunner);}

找到 LeaderElectionService 接口的实现类 StandaloneLeaderElectionService

StandaloneLeaderElectionService.java

public void start(LeaderContender newContender) throws Exception {        if (contender != null) {            // Service was already started            throw new IllegalArgumentException(                    "Leader election service cannot be started multiple times.");        }        contender = Preconditions.checkNotNull(newContender);        // directly grant leadership to the given contender        contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}

DefaultDispatcherRunner.java

public void grantLeadership(UUID leaderSessionID) {        runActionIfRunning(                () -> {                    LOG.info(                            "{} was granted leadership with leader id {}. Creating new {}.",                            getClass().getSimpleName(),                            leaderSessionID,                            DispatcherLeaderProcess.class.getSimpleName());                    startNewDispatcherLeaderProcess(leaderSessionID);                });}private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {        stopDispatcherLeaderProcess();        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;        FutureUtils.assertNoException(                previousDispatcherLeaderProcessTerminationFuture.thenRun(                        newDispatcherLeaderProcess::start));}

AbstractDispatcherLeaderProcess.java

public final void start() {        runIfStateIs(State.CREATED, this::startInternal);}private void startInternal() {        log.info("Start {}.", getClass().getSimpleName());        state = State.RUNNING;        onStart();}

再往下找实现了 onStart 方法的实现类

JobDispatcherLeaderProcess.java

protected void onStart() {        final DispatcherGatewayService dispatcherService =                dispatcherGatewayServiceFactory.create(                        DispatcherId.fromUuid(getLeaderSessionId()),                        CollectionUtil.ofNullable(jobGraph),                        CollectionUtil.ofNullable(recoveredDirtyJobResult),                        ThrowingJobGraphWriter.INSTANCE,                        jobResultStore);        completeDispatcherSetup(dispatcherService);}

DefaultDispatcherGatewayServiceFactory.java

public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(            DispatcherId fencingToken,            Collection<JobGraph> recoveredJobs,            Collection<JobResult> recoveredDirtyJobResults,            JobGraphWriter jobGraphWriter,            JobResultStore jobResultStore) {        final Dispatcher dispatcher;        try {            dispatcher =                    dispatcherFactory.createDispatcher(                            rpcService,                            fencingToken,                            recoveredJobs,                            recoveredDirtyJobResults,                            (dispatcherGateway, scheduledExecutor, errorHandler) ->                                    new NoOpDispatcherBootstrap(),                            PartialDispatcherServicesWithJobPersistenceComponents.from(                                    partialDispatcherServices, jobGraphWriter, jobResultStore));        } catch (Exception e) {            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);        }        // 启动 dispatcher        dispatcher.start();        return DefaultDispatcherGatewayService.from(dispatcher);}

至此,dispatcher 启动完毕

启动 ResourceManager

下面我们来看 ResourceManager 启动过程

ResourceManager.java

public final void onStart() throws Exception {        try {            log.info("Starting the resource manager.");            startResourceManagerServices();            startedFuture.complete(null);        } catch (Throwable t) {            final ResourceManagerException exception =                    new ResourceManagerException(                            String.format("Could not start the ResourceManager %s", getAddress()),                            t);            onFatalError(exception);            throw exception;        }}private void startResourceManagerServices() throws Exception {        try {            jobLeaderIdService.start(new JobLeaderIdActionsImpl());            registerMetrics();            startHeartbeatServices();            slotManager.start(                    getFencingToken(),                    getMainThreadExecutor(),                    new ResourceActionsImpl(),                    blocklistHandler::isBlockedTaskManager);            delegationTokenManager.start();            initialize();        } catch (Exception e) {            handleStartResourceManagerServicesException(e);        }}

在 startResourceManagerServices 方法中,包含了初始化、心跳开启、slotManager 开启等操作
到这里,我们总算探究完成了 dispatcher 和 ResourceManager 的创建和启动过程
现在我们回到最开始,继续看 Flink 程序提交流程的下一个步骤

Dispatcher 启动 JobManager

在启动了 dispatcher 和 ResourceManager 后,Dispatcher 启动了 JobManager
要一探究竟首先我们先进入 dispatcher 的实现类 Dispatcher

Dispatcher.java

public void onStart() throws Exception {        try {            // 启动 Dispatcher            startDispatcherServices();        } catch (Throwable t) {            final DispatcherException exception =                    new DispatcherException(                            String.format("Could not start the Dispatcher %s", getAddress()), t);            onFatalError(exception);            throw exception;        }        startCleanupRetries();        // 启动 Job        startRecoveredJobs();        this.dispatcherBootstrap =                this.dispatcherBootstrapFactory.create(                        getSelfGateway(DispatcherGateway.class),                        this.getRpcService().getScheduledExecutor(),                        this::onFatalError);}private void startRecoveredJobs() {        for (JobGraph recoveredJob : recoveredJobs) {            runRecoveredJob(recoveredJob);        }        recoveredJobs.clear();}private void runRecoveredJob(final JobGraph recoveredJob) {        checkNotNull(recoveredJob);        try {            runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);        } catch (Throwable throwable) {            onFatalError(                    new DispatcherException(                            String.format(                                    "Could not start recovered job %s.", recoveredJob.getJobID()),                            throwable));        }}private void runJob(JobGraph jobGraph, ExecutionType executionType) {         ... ...        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);        ... ... }CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, longinitializationTimestamp) {        final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync(        () -> {                try {                JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner( jobGraph,                configuration,                rpcService,                highAvailabilityServices,                heartbeatServices,                jobManagerSharedServices,                new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler,                initializationTimestamp);        // 启动 JobManagerRunner         runner.start();        return runner;        }        ... ...}

JobManagerRunnerImpl.java

public void start() throws Exception {         try {                leaderElectionService.start(this);         } catch (Exception e) {                log.error("Could not start the JobManager because the leader election service did not start.", e);                throw new Exception("Could not start the leader election service.", e);         }}

StandaloneLeaderElectionService.java

public void start(LeaderContender newContender) throws Exception {        ... ...        contender.grantLeadership(HighAvaliabilityServices.DEFAULT_LEADER_ID);}

JobManagerRunnerImpl.java

public void grantLeadership(final UUID leaderSessionID) {         synchronized (lock) {                if (shutdown) {                        log.debug("JobManagerRunner cannot be granted leadership because it is already shut                        down.");                        return;                }                leadershipOperation = leadershipOperation.thenCompose( (ignored) -> {                        synchronized (lock) {                                // 校验作业的调度状态然后启动作业管理器                                return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);                         }                });                handleException(leadershipOperation, "Could not start the job manager.");         }}private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {         final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture =        getJobSchedulingStatus();        return jobSchedulingStatusFuture.thenCompose(                jobSchedulingStatus -> {                        if (jobSchedulingStatus == JobSchedulingStatus.DONE) {                                return jobAlreadyDone();                         } else {                                return startJobMaster(leaderSessionId);                         }        }); }private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {        ... ...        startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));        ... ...}

JobMaster.java

public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {         // make sure we receive RPC and async calls        start();        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);}private void startJobExecution() throws Exception {        validateRunsInMainThread();        JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);        shuffleMaster.registerJob(context);                // 启动 JobMaster        startJobMasterServices();        log.info(                "Starting execution of job '{}' ({}) under job master id {}.",                jobGraph.getName(),                jobGraph.getJobID(),                getFencingToken());        // 开始调度        startScheduling();}

最终,由 Dispatcher 类经过层层调用找到 JobMaster 类调用了其启动方法。

ResourceManager 启动 SlotManager

在创建了 ResourceManager 和 Dispatcher 之后,Dispatcher 启动了 JobManager,而 ResourceManager 则启动了 SlotManager
下面我们就具体来看这一过程
故事还要从 ResouceManager 类的 onStart 方法说起

ResourceManager.java

public final void onStart() throws Exception {        try {            log.info("Starting the resource manager.");            startResourceManagerServices();            startedFuture.complete(null);        } catch (Throwable t) {            final ResourceManagerException exception =                    new ResourceManagerException(                            String.format("Could not start the ResourceManager %s", getAddress()),                            t);            onFatalError(exception);            throw exception;        }}// 开启 ResourceManager 服务    private void startResourceManagerServices() throws Exception {        try {            jobLeaderIdService.start(new JobLeaderIdActionsImpl());            registerMetrics();            startHeartbeatServices();                        // 开启 SlotManager            slotManager.start(                    getFencingToken(),                    getMainThreadExecutor(),                    new ResourceActionsImpl(),                    blocklistHandler::isBlockedTaskManager);            delegationTokenManager.start();            // 初始化 ResourceManager            initialize();        } catch (Exception e) {            handleStartResourceManagerServicesException(e);        }}

由源码可知,该过程分为了两个重要步骤:开启 SlotManager 和初始化 ResourceManager,即创建 Yarn 的 ResourceManager 和 NodeManager 客户端
start 为 SlotManager 接口的方法,找到该接口的实现类 FineGrainedSlotManager,该类中的 start 方法根据给定的 leader id 和 ResourceManager 行为来实现开启 SlotManager

FineGrainedSlotManager.java

public void start(            ResourceManagerId newResourceManagerId,            Executor newMainThreadExecutor,            ResourceActions newResourceActions,            BlockedTaskManagerChecker newBlockedTaskManagerChecker) {        LOG.info("Starting the slot manager.");        resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);        resourceActions = Preconditions.checkNotNull(newResourceActions);        // slot 状态同步        slotStatusSyncer.initialize(                taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);        blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);        started = true;        // TaskManager 超时检查        taskManagerTimeoutsCheck =                scheduledExecutor.scheduleWithFixedDelay(                        () -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts),                        0L,                        taskManagerTimeout.toMilliseconds(),                        TimeUnit.MILLISECONDS);        registerSlotManagerMetrics();}

步骤已经很明显了

JobManager 申请 Slot

在创建了 JobManager 和 SlotManager 之后,下一步 JobManager 申请了 slot

启动 SlotPool

在 JobMaster 启动之时,同时启动了 SlotPool,向 ResourceManager 注册

JobMaster.java

private void startJobMasterServices() throws Exception {        try {            // 启动 TaskManager 心跳服务            this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);            // 启动 ResourceManager 心跳服务            this.resourceManagerHeartbeatManager =                    createResourceManagerHeartbeatManager(heartbeatServices);            // start the slot pool make sure the slot pool now accepts messages for this leader            // 启动 slotPool            slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());            // job is ready to go, try to establish connection with resource manager            //   - activate leader retrieval for the resource manager            //   - on notification of the leader, the connection will be established and            //     the slot pool will start requesting slots            // 启动后 slot pool 开始向 slot manager 请求 slot            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());        } catch (Exception e) {            handleStartJobMasterServicesError(e);        }}

向 ResourceManager 注册

经过下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManagerLeader()
-> tryConnectToResourceManager()
-> connectToResourceManager()

private void connectToResourceManager() {        ... ...        resourceManagerConnection = new ResourceManagerConnection(                log,                jobGraph.getJobID(),                resourceId,                getAddress(),                getFencingToken(),                resourceManagerAddress.getAddress(),                resourceManagerAddress.getResourceManagerId(),                scheduledExecutorService        );        resourceManagerConnection.start();}

RegisteredRpcConnection.java

public void start() {        checkState(!closed, "The RPC connection is already closed");        checkState(                !isConnected() && pendingRegistration == null,                "The RPC connection is already started");        final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();        if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {            newRegistration.startRegistration();        } else {            // concurrent start operation            newRegistration.cancel();        }}private RetryingRegistration<F, G, S, R> createNewRegistration() {        RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());        CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =                newRegistration.getFuture();        future.whenCompleteAsync(                (RetryingRegistration.RetryingRegistrationResult<G, S, R> result,                        Throwable failure) -> {                    if (failure != null) {                        if (failure instanceof CancellationException) {                            // we ignore cancellation exceptions because they originate from                            // cancelling                            // the RetryingRegistration                            log.debug(                                    "Retrying registration towards {} was cancelled.",                                    targetAddress);                        } else {                            // this future should only ever fail if there is a bug, not if the                            // registration is declined                            onRegistrationFailure(failure);                        }                    } else {                        if (result.isSuccess()) {                            targetGateway = result.getGateway();                            onRegistrationSuccess(result.getSuccess());                        } else if (result.isRejection()) {                            onRegistrationRejection(result.getRejection());                        } else {                            throw new IllegalArgumentException(                                    String.format(                                            "Unknown retrying registration response: %s.", result));                        }                    }                },                executor);        return newRegistration;}

TaskExecutorToResourceManagerConnection.java

protected RetryingRegistration<                    ResourceManagerId,                    ResourceManagerGateway,                    TaskExecutorRegistrationSuccess,                    TaskExecutorRegistrationRejection>            generateRegistration() {        return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(                log,                rpcService,                getTargetAddress(),                getTargetLeaderId(),                retryingRegistrationConfiguration,                taskExecutorRegistration);}ResourceManagerRegistration(                Logger log,                RpcService rpcService,                String targetAddress,                ResourceManagerId resourceManagerId,                RetryingRegistrationConfiguration retryingRegistrationConfiguration,                TaskExecutorRegistration taskExecutorRegistration) {            super(                    log,                    rpcService,                    "ResourceManager",                    ResourceManagerGateway.class,                    targetAddress,                    resourceManagerId,                    retryingRegistrationConfiguration);            this.taskExecutorRegistration = taskExecutorRegistration;}

SlotPool 申请 slot

注册成功调用 onRegistrationSuccess(),向 ResourceManager 进行 slot 的申请

JobMaster.java 的内部类 ResourceManagerConnection

protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {            runAsync(                    () -> {                        // filter out outdated connections                        //noinspection ObjectEquality                        if (this == resourceManagerConnection) {                            establishResourceManagerConnection(success);                        }                    });}private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {        final ResourceManagerId resourceManagerId = success.getResourceManagerId();        // verify the response with current connection        if (resourceManagerConnection != null                && Objects.equals(                        resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {            log.info(                    "JobManager successfully registered at ResourceManager, leader id: {}.",                    resourceManagerId);            final ResourceManagerGateway resourceManagerGateway =                    resourceManagerConnection.getTargetGateway();            final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();            establishedResourceManagerConnection =                    new EstablishedResourceManagerConnection(                            resourceManagerGateway, resourceManagerResourceId);            blocklistHandler.registerBlocklistListener(resourceManagerGateway);            // 连接到 ResourceManager            slotPoolService.connectToResourceManager(resourceManagerGateway);            partitionTracker.connectToResourceManager(resourceManagerGateway);            resourceManagerHeartbeatManager.monitorTarget(                    resourceManagerResourceId,                    new ResourceManagerHeartbeatReceiver(resourceManagerGateway));        } else {            log.debug(                    "Ignoring resource manager connection to {} because it's duplicated or outdated.",                    resourceManagerId);        }}

DeclarativeSlotPoolService.java

public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);        // work on all slots waiting for this connection        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {        // 向 ResourceManager 申请 slot        requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); }        // all sent off        waitingForResourceManager.clear();}private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) {        ... ...        CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(                jobMasterId,                new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),                rpcTimeout);        ... ...         }

ResourceManager.java:由 ResourceManager 里的 SlotManager 处理请求

public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId,SlotRequest slotRequest, final Time timeout) {        ... ...        try {                // SlotManager 处理 slot 请求                 slotManager.registerSlotRequest(slotRequest);        }        ... ...}
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {         checkInit();        ... ...        PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);         pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);        try {                internalRequestSlot(pendingSlotRequest);         }        ... ... }private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throwsResourceManagerException {        final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();        OptionalConsumer.of(findMatchingSlot(resourceProfile))                .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent(() ->                fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)); }private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {        ... ...        if(!pendingTaskManagerSlotOptional.isPresent()) {                pendingTaskManagerSlotOptional = allocateResource(resourceProfile);        }        ... ...}

点击全文阅读


本文链接:http://zhangshiyu.com/post/47275.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1