1.问题描述
flink同步任务,长期任务过多,某个任务停止保存checkpoint或者savepoint后,修改代码,使用命令行读取检查点重新启动需要人工去hdfs上找寻检查点保存位置。任务过多管理起来很不方便。
鉴于此,使用脚本编写了一套启停代码,可以自动的停止flink任务并保存savepoint,读取检查点启动,也可以实现批量的启停
废话不多说,脚本如下
2.问题解决
vim flink_task_start_and_stop.sh
#!/bin/bash
# HDFS配置
HDFS_URI=hdfs://mycluster
# 读取最新的checkpoint文件夹名
function get_latest_checkpoint_dir {
# 获取所有的checkpoint文件夹名
dirs=($(hdfs dfs -ls $CHECKPOINT_DIR | sort -r -k6,7 | awk '{print $NF}'))
# 遍历文件夹名
for dir in "${dirs[@]}"; do
# 获取最新的以chk开头的savepoint文件夹名
chk_dir=$(hdfs dfs -ls $dir | awk '{print $NF}' | grep -w '.*/savepoint-.*' | sort -r | head -n 1)
if [ ! -z "$chk_dir" ]; then
echo "$chk_dir"
return
fi
# 获取最新的以chk开头的checkpoint文件夹名
chk_dir=$(hdfs dfs -ls $dir | awk '{print $NF}' | grep -w '.*/chk-.*' | sort -r | head -n 1)
if [ ! -z "$chk_dir" ]; then
echo "$chk_dir"
return
fi
done
}
# 启动命令
function flink_start {
# 获取最新的savepoint或者checkpoint文件夹名
savepoint_dir=$(get_latest_checkpoint_dir)
if [ ! -z "$savepoint_dir" ]; then
# 使用最新的savepoint启动
command=" $FLINK_HOME/bin/flink run -s $HDFS_URI$savepoint_dir $START_COMMAND "
echo "Starting job with command: $command"
$command
else
# 没有可用的savepoint,则直接启动
echo "启动失败..."
fi
}
# 停止命令
function flink_stop {
application_id=$(yarn application -list | grep $JOB_NAME | awk '{print $1}')
SAVEPOINT_DIR1=$HDFS_URI$CHECKPOINT_DIR
#查看yarn中的job
raw=$(curl -k -i --negotiate -u : 'http://hadoop1:8088/proxy/'$application_id'/jobs')
st_line=$(echo "$raw" | tail -1)
flink_id=$(echo $st_line | jq -r '.jobs[].id')
#echo /bin/flink1 cancel -s $SAVEPOINT_DIR $flink_id -yid $application_id
echo flink1111 cancel -s $SAVEPOINT_DIR1 $flink_id -yid $application_id
SAVEPOINT_PATH=$($FLINK_HOME/bin/flink cancel -s $SAVEPOINT_DIR1 $flink_id -yid $application_id)
if [ -z "$SAVEPOINT_PATH" ]; then
echo "Failed to save savepoint"
#exit 1
else
echo "Savepoint saved to $SAVEPOINT_PATH"
fi
}
function flink_all {
case $1 in
"task_name1")
FLINK_HOME=flink_address
JOB_NAME=job_name
CHECKPOINT_DIR=HDFS_ck_address
START_COMMAND="
-t yarn-per-job \
-Dyarn.application.queue=queue_name \
-p 1 \
-d \
-Dyarn.application.name=job_name \
-Dyarn.application-attempts=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.managed.size=256mb \
-Dtaskmanager.memory.task.heap.size=917mb \
-c class_name \
jar_address \
"
case "$2" in
start)
flink_start
;;
stop)
flink_stop
;;
*)
echo "Usage: {start|stop}"
exit 1
;;
esac
;;
"task_name2")
FLINK_HOME=flink_address
JOB_NAME=job_name
CHECKPOINT_DIR=HDFS_ck_address
START_COMMAND="
-t yarn-per-job \
-Dyarn.application.queue=queue_name \
-p 1 \
-d \
-Dyarn.application.name=job_name \
-Dyarn.application-attempts=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.managed.size=256mb \
-Dtaskmanager.memory.task.heap.size=917mb \
-c class_name \
jar_address \
"
case "$2" in
start)
flink_start
;;
stop)
flink_stop
;;
*)
echo "Usage: {start|stop}"
exit 1
;;
esac
;;
esac
}
case $1 in
#批量启停
"all")
for i in task_name1 task_name2
do
echo ================== $i $2日期为 $do_date ==================
flink_all $i $2
done
;;
*)
flink_all $1 $2
;;
esac
3.脚本使用
#启动某个flink任务
./flink_task_start_and_stop.sh task_name2 start
#停止某个flink任务并保存savepoint
./flink_task_start_and_stop.sh task_name2 stop
#启停所有flink任务
./flink_task_start_and_stop.sh all start
./flink_task_start_and_stop.sh all stop