Zookeeper 1대에 kafa 3대를 연결해서 사용해볼것이다.
Zookeeper란?
Zookeeper는 Kafka 클러스터의 핵심 구성 요소 중 하나로, 클러스터의 안정적인 운영과 데이터의 신뢰성을 보장하는 데 중요한 역할을 한다.
- 클러스터 구성 관리:
- Kafka 클러스터의 브로커, 토픽, 파티션 등의 구성 정보를 저장하고 관리
- 클러스터의 상태 변화를 감지하고 변경 사항을 클라이언트에 푸시하여 동기화
- 리더 및 파티션 할당:
- Kafka의 리더 및 ISR(In-Sync Replica)를 관리. ISR은 특정 파티션의 리더와 동기화된 복제본으로, 데이터의 내구성과 가용성을 보장
- Zookeeper는 리더와 ISR 목록을 유지하고, 필요할 때 리더를 재할당하거나 ISR을 업데이트
- 소비자 그룹 관리:
- 소비자 그룹의 구성원과 소비자가 읽은 오프셋 정보를 저장하고 관리
- Zookeeper를 통해 소비자 그룹의 리밸런싱(rebalancing)이 이루어지며, 파티션 할당이 변경
- 리더(Leader) 선출:
- 파티션의 리더를 선출하는 역할을 한다. 리더는 쓰기 요청을 받는 파티션의 주요 책임자이며, 데이터의 복제를 관리
- Zookeeper는 리더 후보 중 하나를 선출하여 해당 파티션의 리더로 지정
- 클러스터 상태 모니터링:
- Kafka 클러스터의 상태를 모니터링하고 감시. 브로커의 가용성, ISR의 상태, 소비자 그룹의 상태 등을 추적
- 클러스터의 상태가 변경되거나 장애가 발생할 때, 이를 감지하고 적절한 조치를 취함
- 일관성 보장:
- Zookeeper는 분산 시스템에서 일관성을 보장하기 위한 핵심 요소로서, 클러스터 내의 모든 노드 간에 데이터의 일관성을 유지
- 데이터의 변경 및 갱신을 원자적으로 처리하여 데이터의 일관성을 유지
yml 설정
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
user: "0:0"
ports:
- "2181:2181"
volumes:
- {host}:/var/lib/zookeeper/data
- {host}:/var/lib/zookeeper/log
kafka1:
image: confluentinc/cp-kafka:latest
user: "0:0"
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1 # kafka 클러스터 내에서 브로커를 식별하는 고유 ID
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka 브로커가 Zookeeper에 연결하기 위한 주소.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 #
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT # 보안 프로토콜 PLAINTEXT 사용
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT # Kafka 브로커 간 통신에 사용되는 리스너 이름, 브로커간 통신에 PLAINTEXT 리스너 사용
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 # 각 토픽 별 consumer offset 정보를 3개의 브로커에 복제함
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 # 트랜잭션 상태 로그를 3개의 브로커에 복제함
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 # 트랜잭션 상태 로그를 최소 2개의 동기화된 복제본을 가지도록 설정
volumes: #디렉토리 마운트file
- {host}:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:latest
user: "0:0"
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
volumes:
- {host}:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:latest
user: "0:0"
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
volumes: #디렉토리 마운트file
- {host}:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093, kafka3:9094
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://localhost:8083
depends_on:
- kafka1
- kafka2
- kafka3
environment 설명
user: "0:0" : 설정을 하지 않으면 volume으로 연결하려는 디렉터리가 Docker내부에서 접근을 하지 못하기 때문에
접근권한을 변경하여 접근할수 있도록 하기 위함.
주의!
만일 위의 설정을 하지 않으면 Docker가 실행되지 않을것이다. docker logs로 봐도 아래와 같은 에러가 출력될것이다.
===> User uid=1001 gid=0(root) groups=0(root)
===> Configuring ...
===> Running preflight checks ...
===> Check if /var/lib/zookeeper/data is writable ... Command [/usr/local/bin/dub path /var/lib/zookeeper/data writable] FAILED !
- ZOOKEEPER
ZOOKEEPER_CLIENT_PORT : 클라이언트가 Zookeeper와 통신할 때 사용하는 포트를 정의 (default :2181)
ZOOKEEPER_TICK_TIME : Zookeeper는 클라이언트와 연결된 상태를 유지하기 위해 주기적인 하트비트를 전송하는데
기본 타임아웃을 정의 (default : 2000ms)
- KAFKA
KAFKA_BROKER_ID : Kafa 클러스터 내에서 브로커를 식별하는 고유 ID
KAFKA_ZOOKEEPER_CONNECT : Kafka 브로커가 Zookeeper에 연결하기 위한 주소
KAFKA_ADVERTISED_LISTENERS : 클라이언트와 다른 브로커가 이 브로커에 연결할 수 있는 주소
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : 리스너 이름과 보안 프로토콜의 매핑을 정의
KAFKA_INTER_BROKER_LISTENER_NAME : 브로커 간 통신에 사용되는 리스너의 이름을 정의
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 각 토픽에 대한 consumer offset 정보를 정해진 개수의 브로커에 복제할 인자 수를 정의
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR : 트랜잭션 상태 로그의 복제 인자 수를 정의
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR : 트랜잭션 상태 로그 파티션의 최소 ISR(In-Sync Replicas) 수를 정의. 트랜잭션 상태 로그가 정상적으로 작동하기 위해 필요한 최소 동기화 복제본 수를 설정
Kafka 트랜잭션이란?
Kafka 트랜잭션은 메시지의 정확한 전달을 보장하는 기능을 제공한다. Kafka의 트랜잭션은 하나 이상의 프로듀서와 소비자 작업을 단일 트랜잭션으로 묶어 원자성을 보장한다. 즉, 모든 작업이 성공적으로 완료되거나 전부 실패하게 만든다. 이 과정에서 트랜잭션 상태 로그는 중요한 역할을 한다. (DB Transaction과 유사)
트랜잭션 상태 로그란?
트랜잭션 상태 로그는 Kafka 클러스터에서 트랜잭션의 상태를 저장하고 관리하는 특수한 토픽이다. 이는 트랜잭션이 커밋 또는 중단되는 과정을 추적하는 데 사용된다.
- 트랜잭션 상태 로그의 역할
1. 트랜잭션 ID 추적: 각 트랜잭션에는 고유한 트랜잭션 ID가 있는데 트랜잭션 상태 로그는 이러한 ID와 트랜잭션의
상태를 추적한다.
2. 트랜잭션 상태 저장: 트랜잭션이 진행되는 동안 각 단계(시작, 커밋, 중단 등)의 상태를 저장
3. 장애 복구: 브로커가 장애에서 복구될 때 트랜잭션 상태 로그를 참조하여 트랜잭션의 상태를 일관되게 유지
- 중요성
1. 원자성 보장: 트랜잭션 상태 로그는 Kafka가 원자성을 제공하는 데 필수적이다. 모든 프로듀서와 소비자 작업이
완전하게 완료되었는지 확인할 수 있다.
2. 데이터 일관성: 트랜잭션 상태 로그는 메시지가 정확히 한 번 전달되도록 보장하여 데이터 일관성을 유지한다.
트랜잭션과 관련된 설정
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:
- 설명: 트랜잭션 상태 로그 토픽의 복제 인자 수를 정의
- 역할: 트랜잭션 상태 로그가 여러 브로커에 복제되어, 단일 브로커 장애 시에도 트랜잭션의 상태를 복원할 수 있도록 보장
- 예시: 3으로 설정된 경우, 트랜잭션 상태 로그의 각 파티션이 3개의 브로커에 복제
즉, 트랜잭션 상태 로그의 내구성을 보장하기 위해 복제 인자를 설정- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:
- 설명: 트랜잭션 상태 로그 파티션의 최소 ISR(In-Sync Replicas) 수를 정의
- 역할: 트랜잭션이 커밋 또는 중단되기 위해 필요한 최소 동기화된 복제본의 수를 설정. 이 값이 설정된 수준보다 낮으면, 트랜잭션 커밋이 실패하여 데이터 일관성이 유지
- 예시: 2로 설정된 경우, 트랜잭션 커밋을 위해 최소 2개의 동기화된 복제본이 필요
즉, 트랜잭션 커밋을 위해 필요한 최소 동기화 복제본의 수를 설정하여 데이터 일관성을 유지
Docker Compose 실행 및 ui 확인
도커 컴포즈 실행 :
docker compose up -d
도커 실행 확인 : docker ps

Zookeeper, ui, kafka 서버가 정상적으로 떴는지 확인.
localhost:8080 접속가능한지 확인

Kafka Producer/Consumer Test
docker ps 후 kafka 서버 3대중 아무곳이나 접속한다
docker exec -it [컨테이너ID] /bin/bash
토픽 생성
kafka-topics --create --topic test-topic --bootstrap-server host.docker.internal:9092 --partitions 3 --replication-factor 3
위왕같이 test-topic을 생성한다
Consumer 실행
kafka-console-consumer --topic test-topic --bootstrap-server host.docker.internal:9094 host.docker.internal:9092 host.docker.internal:9093 --from-beginning
Producer 실행
kafka-console-producer --topic test-topic --bootstrap-server host.docker.internal:9093 host.docker.internal:9092 host.docker.internal:9094
Consumer, Producer는 같은 Kafka 컨테이너에서 확인해도 되고 다른 Kafka 컨테이너에서 확인해도 상관없다.
producer에서 메세지를 발생하면 consumer에서 확인이 가능하다.
consumer

producer

또한 Kafka UI에서도 확인 가능하다.

test-topic를 클릭하면 해당 토픽에대한 상세정보를 볼 수 있다.

위 정보는 아래 명령어로도 확인할 수 있다.
kafka-topics --describe --topic test-topic --bootstrap-server host.docker.internal:9093

어떤 메세지를 전송했는지도 타임스탬프와 함께 Offset 내용을 볼 수 있다.
+ 추가)
안정적인 연결을 위해 zookeeper 3대, kafka 3대, ui로 구성하는 docker-compose.yml
version: '3'
services:
zookeeper1:
image: confluentinc/cp-zookeeper:7.3.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
user: "0:0"
ports:
- "2181:2181"
volumes:
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data1:/var/lib/zookeeper/data
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log1:/var/lib/zookeeper/log
zookeeper2:
image: confluentinc/cp-zookeeper:7.3.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
user: "0:0"
ports:
- "2182:2181"
volumes:
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data2:/var/lib/zookeeper/data
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log2:/var/lib/zookeeper/log
zookeeper3:
image: confluentinc/cp-zookeeper:7.3.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
user: "0:0"
ports:
- "2183:2181"
volumes:
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data3:/var/lib/zookeeper/data
- C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log3:/var/lib/zookeeper/log
kafka1:
image: confluentinc/cp-kafka:7.3.1
user: "0:0"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1 # kafka 클러스터 내에서 브로커를 식별하는 고유 ID
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183 # kafka 브로커가 Zookeeper에 연결하기 위한 주소.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 #
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT # 보안 프로토콜 PLAINTEXT 사용
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT # Kafka 브로커 간 통신에 사용되는 리스너 이름, 브로커간 통신에 PLAINTEXT 리스너 사용
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 # 각 토픽 별 consumer offset 정보를 3개의 브로커에 복제함
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 # 트랜잭션 상태 로그를 3개의 브로커에 복제함
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 # 트랜잭션 상태 로그를 최소 2개의 동기화된 복제본을 가지도록 설정
KAFKA_LOG_DIRS: /kafka/logs
volumes: #디렉토리 마운트file
- C:\Users\user\Documents\myProject\kafka\docker\kafka\data1:/kafka/logs
kafka2:
image: confluentinc/cp-kafka:7.3.1
user: "0:0"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- C:\Users\user\Documents\myProject\kafka\docker\kafka\data2:/kafka/logs
kafka3:
image: confluentinc/cp-kafka:7.3.1
user: "0:0"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /kafka/logs
volumes: #디렉토리 마운트file
- C:\Users\user\Documents\myProject\kafka\docker\kafka\data3:/kafka/logs
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093,kafka3:9094
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://localhost:8083
depends_on:
- kafka1
- kafka2
- kafka3'Docker' 카테고리의 다른 글
| +번외) Kafka 명령어 정리 (0) | 2024.05.18 |
|---|---|
| VueJs Docker를 이용해서 node 20.12.2 버전 빌드하기(with jenkins,ssh) (0) | 2024.05.07 |
| Grafana에서 PostgreSql 모니터링 하기(postgresql_exporter연동) (0) | 2024.04.28 |
| Springboot와 Prometheus, Grafana Docker Compose로 연동하기 (0) | 2024.03.31 |
| Docker 명령어 (0) | 2024.02.09 |