.
- 一 .前言
- 二 .start-cluster.sh
- 2.1. 加载全局配置函数config.sh
- 2.2. 启动 jobManger
- 2.3. 启动TaskManager实例
- 2.4. 完整代码
- 三 .jobmanager.sh
- 四 .taskmanager.sh
- 五 .flink-daemon.sh
- 5.1.JobManager启动指令
- 5.2.TaskManger启动指令
- 六. yarn-session.sh
一 .前言
主要看一下Flink常用的几个脚本调用.
服务 | 入口类 | 备注 |
---|---|---|
taskexecutor | org.apache.flink.runtime.taskexecutor.TaskManagerRunner | xxx |
standalonesession | org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint | xxx |
standalonejob | org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint | xxx |
historyserver | org.apache.flink.runtime.webmonitor.history.HistoryServer | xxx |
zookeeper | org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer | xxx |
二 .start-cluster.sh
start-cluster.sh 是Flink的启动脚本,所以看一下这个脚本都干了啥
三步:
- 在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
- 启动JobManager实例.
- 3.启动TaskManager实例.
2.1. 加载全局配置函数config.sh
这里面主要是一些环境变量 和抽取的一些方法.
- 环境变量信息
### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
export FLINK_OPT_DIR
- readMasters 读取conf/masters配置文件,获取master的服务安装位置
readMasters() {
MASTERS_FILE="${FLINK_CONF_DIR}/masters"
if [[ ! -f "${MASTERS_FILE}" ]]; then
echo "No masters file. Please specify masters in 'conf/masters'."
exit 1
fi
MASTERS=()
WEBUIPORTS=()
MASTERS_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOSTWEBUIPORT=$( extractHostName $line)
if [ -n "$HOSTWEBUIPORT" ]; then
HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
MASTERS+=(${HOST})
if [ -z "$WEBUIPORT" ]; then
WEBUIPORTS+=(0)
else
WEBUIPORTS+=(${WEBUIPORT})
fi
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
MASTERS_ALL_LOCALHOST=false
fi
fi
done < "$MASTERS_FILE"
}
- readWorkers 读取conf/workers 配置文件获取worker节点的安装位置
readWorkers() {
WORKERS_FILE="${FLINK_CONF_DIR}/workers"
if [[ ! -f "$WORKERS_FILE" ]]; then
echo "No workers file. Please specify workers in 'conf/workers'."
exit 1
fi
WORKERS=()
WORKERS_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOST=$( extractHostName $line)
if [ -n "$HOST" ] ; then
WORKERS+=(${HOST})
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
WORKERS_ALL_LOCALHOST=false
fi
fi
done < "$WORKERS_FILE"
}
2.2. 启动 jobManger
根据master配置的信息,循环启动master .
如果是本地的话直接调用jobmanager.sh脚本, 否则的话通过ssh 命令远程执行jobmanager.sh启动…
for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}
if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
else
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
fi
done
- masters 配置文件内容
localhost:8081
2.3. 启动TaskManager实例
在 start-cluster.sh脚本启动TaskManager实例的代码就一行.
TMWorkers start
需要去config.sh里面找到对应的代码.
如果是本地的话直接调用taskmanager.sh脚本, 否则的话通过ssh 命令远程执行taskmanager.sh启动…
# 确定/停止所有的worker节点
# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers() {
CMD=$1
readWorkers
if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
# all-local setup
for worker in ${WORKERS[@]}; do
"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
done
else
# non-local setup
# start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for worker in ${WORKERS[@]}; do
ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
fi
fi
}
- workers 配置文件内容
localhost
2.4. 完整代码
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# 1. 加载公共配置属性&方法
. "$bin"/config.sh
# 2. 启动JobManager实例
# Start the JobManager instance(s)
# 通知 shell 忽略字符串匹配中的大小写
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
# HA Mode
# 读取 conf/master配置文件 获取master的信息
readMasters
echo "Starting HA cluster with ${#MASTERS[@]} masters."
for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}
if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
else
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
fi
done
else
echo "Starting cluster."
# Start single JobManager on this machine
"$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch
# 3.启动TaskManager实例
# Start TaskManager instance(s)
TMWorkers start
三 .jobmanager.sh
jobmanager.sh脚本是启动/停止JobManager服务的脚本
参数格式 :
Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
- 在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
- 加载各种参数,最终调用 flink-daemon.sh 脚本…
- 最终输出的脚本样例 :
${FLINK_HOME}/bin/flink-daemon.sh
start
standalonesession
--configDir /opt/tools/flink-1.12.2/conf
--executionMode cluster
-D jobmanager.memory.off-heap.size=134217728b
-D jobmanager.memory.jvm-overhead.min=201326592b
-D jobmanager.memory.jvm-metaspace.size=268435456b
-D jobmanager.memory.heap.size=1073741824b
-D jobmanager.memory.jvm-overhead.max=201326592b
完整代码 :
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
ENTRYPOINT=standalonesession
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
parseJmArgsAndExportLogs "${ARGS[@]}"
args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
fi
if [ ! -z $WEBUIPORT ]; then
args+=("--webui-port")
args+=("${WEBUIPORT}")
fi
if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
args+=(${DYNAMIC_PARAMETERS[@]})
fi
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
echo "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
## "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
四 .taskmanager.sh
taskmanager.sh脚本是启动/停止TaskManager服务的脚本
参数格式 :
Usage: taskmanager.sh (start|start-foreground|stop|stop-all)
- 在执行start-cluster.sh的时候,首先执行了config.sh. 加载需要用的函数/配置.
- 加载各种参数,最终调用 flink-daemon.sh 脚本…
- 最终输出的脚本样例 :
${FLINK_HOME}/bin/flink-daemon.sh
start taskexecutor
--configDir /opt/tools/flink-1.12.2/conf
-D taskmanager.memory.framework.off-heap.size=134217728b
-D taskmanager.memory.network.max=134217730b
-D taskmanager.memory.network.min=134217730b
-D taskmanager.memory.framework.heap.size=134217728b
-D taskmanager.memory.managed.size=536870920b
-D taskmanager.cpu.cores=1.0
-D taskmanager.memory.task.heap.size=402653174b
-D taskmanager.memory.task.off-heap.size=0b
-D taskmanager.memory.jvm-metaspace.size=268435456b
-D taskmanager.memory.jvm-overhead.max=201326592b
-D taskmanager.memory.jvm-overhead.min=201326592b
- 完整代码
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
STARTSTOP=$1
ARGS=("${@:2}")
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
ENTRYPOINT=taskexecutor
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# if no other JVM options are set, set the GC to G1
if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
# Startup parameters
java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")
logging_output=$(extractLoggingOutputs "${java_utils_output}")
params_output=$(extractExecutionResults "${java_utils_output}" 2)
if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
echo "[ERROR] Raw output from BashJavaUtils:"
echo "$java_utils_output"
exit 1
fi
jvm_params=$(echo "${params_output}" | head -n 1)
export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)
ARGS=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]} "${ARGS[@]}")
export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGS
TM_RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
dynamic_configs: $dynamic_configs
logs: $logging_output
"
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
# 启动单个 TaskManager
echo "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
# "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
done
fi
fi
五 .flink-daemon.sh
启动taskexecutor、 zookeeper、historyserver、standalonesession、standalonejob都需要的任务指令.
Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]
服务 | 入口类 | 备注 |
---|---|---|
taskexecutor | org.apache.flink.runtime.taskexecutor.TaskManagerRunner | xxx |
standalonesession | org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint | xxx |
standalonejob | org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint | xxx |
historyserver | org.apache.flink.runtime.webmonitor.history.HistoryServer | xxx |
zookeeper | org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer | xxx |
5.1.JobManager启动指令
${JAVA_HOME}/bin/java
-Xmx1073741824
-Xms1073741824
-XX:MaxMetaspaceSize=268435456
# 日志相关
-Dlog.file=${FLINK_HOME}/log/flink-sysadmin-standalonesession-1-BoYi-Pro.local.log
-Dlog4j.configuration=file:${FLINK_HOME}/conf/log4j.properties
-Dlog4j.configurationFile=file:${FLINK_HOME}/conf/log4j.properties
-Dlogback.configurationFile=file:${FLINK_HOME}/conf/logback.xml
# classpath类路径相关
-classpath ${FLINK_HOME}/lib/flink-csv-1.12.2.jar:${FLINK_HOME}/lib/flink-json-1.12.2.jar:${FLINK_HOME}/lib/flink-shaded-zookeeper-3.4.14.jar:${FLINK_HOME}/lib/flink-table-blink_2.12-1.12.2.jar:${FLINK_HOME}/lib/flink-table_2.12-1.12.2.jar:${FLINK_HOME}/lib/log4j-1.2-api-2.12.1.jar:${FLINK_HOME}/lib/log4j-api-2.12.1.jar:${FLINK_HOME}/lib/log4j-core-2.12.1.jar:${FLINK_HOME}/lib/log4j-slf4j-impl-2.12.1.jar:${FLINK_HOME}/lib/flink-dist_2.12-1.12.2.jar::/opt/tools/hadoop-3.2.1/etc/hadoop::/opt/tools/hbase-2.0.2/conf
# 启动类相关 [核心]
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
--configDir ${FLINK_HOME}/conf
--executionMode cluster
-D jobmanager.memory.off-heap.size=134217728b
-D jobmanager.memory.jvm-overhead.min=201326592b
-D jobmanager.memory.jvm-metaspace.size=268435456b
-D jobmanager.memory.heap.size=1073741824b
-D jobmanager.memory.jvm-overhead.max=201326592b
# 输出日志
> ${FLINK_HOME}/log/flink-sysadmin-standalonesession-1-BoYi-Pro.local.out 200<&- 2>&1 < /dev/null
5.2.TaskManger启动指令
${JAVA_HOME}/bin/java
-XX:+UseG1GC
-Xmx536870902
-Xms536870902
-XX:MaxDirectMemorySize=268435458
-XX:MaxMetaspaceSize=268435456
# 日志相关
-Dlog.file=${FLINK_HOME}/log/flink-sysadmin-taskexecutor-1-BoYi-Pro.local.log
-Dlog4j.configuration=file:${FLINK_HOME}/conf/log4j.properties
-Dlog4j.configurationFile=file:${FLINK_HOME}/conf/log4j.properties
-Dlogback.configurationFile=file:${FLINK_HOME}/conf/logback.xml
# classpath类路径相关
-classpath ${FLINK_HOME}/lib/flink-csv-1.12.2.jar:${FLINK_HOME}/lib/flink-json-1.12.2.jar:${FLINK_HOME}/lib/flink-shaded-zookeeper-3.4.14.jar:${FLINK_HOME}/lib/flink-table-blink_2.12-1.12.2.jar:${FLINK_HOME}/lib/flink-table_2.12-1.12.2.jar:${FLINK_HOME}/lib/log4j-1.2-api-2.12.1.jar:${FLINK_HOME}/lib/log4j-api-2.12.1.jar:${FLINK_HOME}/lib/log4j-core-2.12.1.jar:${FLINK_HOME}/lib/log4j-slf4j-impl-2.12.1.jar:${FLINK_HOME}/lib/flink-dist_2.12-1.12.2.jar::/opt/tools/hadoop-3.2.1/etc/hadoop::/opt/tools/hbase-2.0.2/conf
# 启动类相关 [核心]
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
--configDir ${FLINK_HOME}/conf
-D taskmanager.memory.framework.off-heap.size=134217728b
-D taskmanager.memory.network.max=134217730b
-D taskmanager.memory.network.min=134217730b
-D taskmanager.memory.framework.heap.size=134217728b
-D taskmanager.memory.managed.size=536870920b
-D taskmanager.cpu.cores=1.0
-D taskmanager.memory.task.heap.size=402653174b
-D taskmanager.memory.task.off-heap.size=0b
-D taskmanager.memory.jvm-metaspace.size=268435456b
-D taskmanager.memory.jvm-overhead.max=201326592b
-D taskmanager.memory.jvm-overhead.min=201326592b
# 输出日志
> ${FLINK_HOME}/log/flink-sysadmin-taskexecutor-1-BoYi-Pro.local.out 200<&- 2>&1 < /dev/null
六. yarn-session.sh
入口类: org.apache.flink.yarn.cli.FlinkYarnSessionCli
这个以后再细说. …