一背景
我们的一些Topic的数据特别多,基于大数据的服务也不是特别稳定,有时出现问题不能及时感知到,导致过了很长时间才能发现大数据Flink任务出现了问题。而这些问题一般都会伴随着Kafka订阅消费消息的积压,所以增加了对kafka的Group消费积压清空做监控,从而反向发现问题并通过钉钉机器人报警。
如果你们公司有Grafana+Prometheus监控,建议不要采用这种使用脚本的笨方法,在后期,我们已经废弃了这种人工维护的脚本,都是通过Grafana来监控积压并主动报警的。
二 监控原理与脚本
Kafka有自带的kafka-consumer-groups.sh
查询出所有group的情况,其中会包含Lag(即滞后消费的数量),查询单个的的例子如下:1
2
3
4
5
6/usr/local/kafka/kafka_2.13-2.8.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.0.43.250:9092 --describe --group xxx-test111111-dataGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
xxx-test111111-dataGroup xxx-test111111-data 0 10714753219 10714753246 27 consumer-3-90fde6a3-5952-4632-8e8e-25be77e51839 /10.0.42.20 consumer-3
xxx-test111111-dataGroup xxx-test111111-data 1 10829798621 10829798652 31 consumer-3-90fde6a3-5952-4632-8e8e-25be77e51839 /10.0.42.20 consumer-3
xxx-test111111-dataGroup xxx-test111111-data 2 10972915843 10972915866 23 consumer-3-90fde6a3-5952-4632-8e8e-25be77e51839 /10.0.42.20 consumer-3
而我们需要查询所有,故通过 --all-groups
参数即可,这个参数在比较低的kafka版本可能会不支持,可以通过比较高本本的来查。
具体脚本如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# kafka的所属目录 运行命令依赖其下bin中相关命令
FAKFA_DIR=/usr/local/kafka/kafka_2.13-2.8.0
# 默认积压达到1万报警
DEFAULT_LIMIT=10000
# group信息的临时文件
GROUP_INFO_FILE=monitor-group-infos.txt
# 钉钉提醒机器人token
DING_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# kafka的broker 多个逗号隔开
BROKER=10.0.43.250:9092
# 写入文件
${FAKFA_DIR}/bin/kafka-consumer-groups.sh --bootstrap-server ${BROKER} --describe --all-groups > ${GROUP_INFO_FILE}
error_infos=()
# 遍历每一行 判断
while read line
do
if [[ -n ${line} ]]; then
if [[ $line =~ CONSUMER-ID ]] || [[ $line =~ anonymous ]];then
# nothing
echo ""
else
group=`echo $line | awk -F' ' '{print $1}'`
topic=`echo $line | awk -F' ' '{print $2}'`
partition=`echo $line | awk -F' ' '{print $3}'`
lag=`echo $line | awk -F' ' '{print $6}'`
# echo "$topic $lag"
# lag为- 或者 group中包含test的 直接忽略
if [ "${lag}" = "-" ] || [[ "${group}" =~ "test" ]];then
# nothing
echo ""
elif [ $((lag)) -ge $((DEFAULT_LIMIT)) ];then
# 超过 1万则报警
# echo ">$((DEFAULT_LIMIT)) ${line}"
error_infos[${#error_infos[*]}]=">Topic:${topic},Group:${group},Partition:${partition},积压数:${lag} \n\n"
fi
fi
fi
done < ${GROUP_INFO_FILE}
# 非空的 发送钉钉消息提醒
if [[ -n ${error_infos[*]} ]]; then
curl -s -XPOST -H 'Content-Type: application/json' https://oapi.dingtalk.com/robot/send?access_token=${DING_TOKEN} -d "{\"msgtype\":\"markdown\",\"markdown\":{\"title\":\"生产Kafka-Consumer-Group报警\",\"text\":\"生产Kafka-Consumer-Group订阅积压报警 \\n\\n${error_infos[*]}\"}}"
fi
最后,加上 crontab 定时任务则大功告成。
三 钉钉报警示例
钉钉收到的报警示例如下: