不许转载
kafka 部署
把 kafka 部署到 k8s 后,我们肯定是通过 service 从 k8s 外部访问 kafaka。这里的 service 要么是 NodePort, 要么是 LoadBalancer 类型。我们使用的方式是 LoadBalancer。
我们先看下面这张图,这是 kafka 在集群中的网络拓扑。当我们通过地址 12.345.67:31090 访问到 kafka 后,kafka 返回的访问地址是类似这样的 endpoint jettopro-kafka.jettopro-poc.svc.cluster.local:9092。这是 k8s 集群内部能访问的 headless service endpoint 地址,从集群外部自然使用这个地址是访问不通的。
所以,我们需要解决两个问题:
- kafka 不同的 pod 需要不通的对外能访问的地址
- 不能使用 kafka 默认的
advertised.listeners
解决方案
问题1,我们为每个 pod 创建类型是 LoadBalancer 的 service 并且监听不同的端口。这样通过 LB IP + port 就能找到特定的 kafka broker。
service 示例如下:
apiVersion: v1
kind: Service
metadata:
name: kafka-{index}
spec:
externalTrafficPolicy: Local
type: LoadBalancer
selector:
statefulset.kubernetes.io/pod-name: kafka-{index}
ports:
- protocol: TCP
port: {9092+index}
targetPort: 9092
这里如果不是云主机,也可以使用NodePort类型来暴露kafka服务。
问题2,我们在容器启动的时候,执行脚本动态获取 pod 编号,生成容器需要的环境变量 KAFKA_CFG_ADVERTISED_LISTENERS(对应 kafka broker 的配置 advertised.listener)
HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
ORD=${BASH_REMATCH[2]}
PORT=$((ORD + 9092))
#12.345.67.8 是 LB 的 ip
export KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"
else
echo "Failed to get index from hostname $HOST"
exit 1
fi
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: kafka
replicas: 3
updateStrategy:
type: RollingUpdate
podManagementPolicy: OrderedReady
template:
metadata:
labels:
app: kafka
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
containers:
- name: kafka
command:
- bash
- -ec
- |
HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
ORD=${BASH_REMATCH[2]}
PORT=$((ORD + 9092))
export KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"
else
echo "Failed to get index from hostname $HOST"
exit 1
fi
exec /entrypoint.sh /run.sh
imagePullPolicy: Always
image: "bitnami/kafka:2"
env:
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: "zookeeper-0.zookeeper-hs:2181,zookeeper-1.zookeeper-hs:2181,zookeeper-2.zookeeper-hs:2181"
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: "3"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
ports:
- containerPort: 9092
volumeMounts:
- name: kafka-data
mountPath: /bitnami
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: alicloud-disk-efficiency
resources:
requests:
storage: 20Gi