Docker

Docker Compose 로 Kafka Cluster 구축하기

hwanguu 2024. 5. 15. 22:01

Zookeeper 1대에 kafa 3대를 연결해서 사용해볼것이다.

 

Zookeeper란?

Zookeeper는 Kafka 클러스터의 핵심 구성 요소 중 하나로, 클러스터의 안정적인 운영과 데이터의 신뢰성을 보장하는 데 중요한 역할을 한다.

  1. 클러스터 구성 관리:
    • Kafka 클러스터의 브로커, 토픽, 파티션 등의 구성 정보를 저장하고 관리
    • 클러스터의 상태 변화를 감지하고 변경 사항을 클라이언트에 푸시하여 동기화
  2. 리더 및 파티션 할당:
    • Kafka의 리더 및 ISR(In-Sync Replica)를 관리. ISR은 특정 파티션의 리더와 동기화된 복제본으로, 데이터의 내구성과 가용성을 보장
    • Zookeeper는 리더와 ISR 목록을 유지하고, 필요할 때 리더를 재할당하거나 ISR을 업데이트
  3. 소비자 그룹 관리:
    • 소비자 그룹의 구성원과 소비자가 읽은 오프셋 정보를 저장하고 관리
    • Zookeeper를 통해 소비자 그룹의 리밸런싱(rebalancing)이 이루어지며, 파티션 할당이 변경
  4. 리더(Leader) 선출:
    • 파티션의 리더를 선출하는 역할을 한다. 리더는 쓰기 요청을 받는 파티션의 주요 책임자이며, 데이터의 복제를 관리
    • Zookeeper는 리더 후보 중 하나를 선출하여 해당 파티션의 리더로 지정
  5. 클러스터 상태 모니터링:
    • Kafka 클러스터의 상태를 모니터링하고 감시. 브로커의 가용성, ISR의 상태, 소비자 그룹의 상태 등을 추적
    • 클러스터의 상태가 변경되거나 장애가 발생할 때, 이를 감지하고 적절한 조치를 취함
  6. 일관성 보장:
    • Zookeeper는 분산 시스템에서 일관성을 보장하기 위한 핵심 요소로서, 클러스터 내의 모든 노드 간에 데이터의 일관성을 유지
    • 데이터의 변경 및 갱신을 원자적으로 처리하여 데이터의 일관성을 유지

 

yml 설정

 

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    user: "0:0"
    ports:
      - "2181:2181"
    volumes:
      - {host}:/var/lib/zookeeper/data
      - {host}:/var/lib/zookeeper/log

  kafka1:
    image: confluentinc/cp-kafka:latest
    user: "0:0"
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1    #   kafka 클러스터 내에서 브로커를 식별하는 고유 ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181   #   kafka 브로커가 Zookeeper에 연결하기 위한 주소. 
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092   #   
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT #   보안 프로토콜 PLAINTEXT 사용
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT   #   Kafka 브로커 간 통신에 사용되는 리스너 이름, 브로커간 통신에 PLAINTEXT 리스너 사용
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3     #   각 토픽 별 consumer offset 정보를 3개의 브로커에 복제함
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 #   트랜잭션 상태 로그를 3개의 브로커에 복제함
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2    #   트랜잭션 상태 로그를 최소 2개의 동기화된 복제본을 가지도록 설정
    volumes:                          #디렉토리 마운트file
      - {host}:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:latest
    user: "0:0"
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
    volumes:
      - {host}:/var/lib/kafka/data
      
  kafka3:
    image: confluentinc/cp-kafka:latest
    user: "0:0"
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
    volumes:                          #디렉토리 마운트file
      - {host}:/var/lib/kafka/data
    
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093, kafka3:9094
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://localhost:8083
    depends_on:
      - kafka1
      - kafka2
      - kafka3

 

environment 설명 

 

user: "0:0" : 설정을 하지 않으면 volume으로 연결하려는 디렉터리가 Docker내부에서 접근을 하지 못하기 때문에 

       접근권한을 변경하여 접근할수 있도록 하기 위함.

주의!
만일 위의 설정을 하지 않으면 Docker가 실행되지 않을것이다. docker logs로 봐도 아래와 같은 에러가 출력될것이다.

===> User uid=1001 gid=0(root) groups=0(root)
===> Configuring ...
===> Running preflight checks ...
===> Check if /var/lib/zookeeper/data is writable ... Command [/usr/local/bin/dub path /var/lib/zookeeper/data writable] FAILED !

 

  • ZOOKEEPER

ZOOKEEPER_CLIENT_PORT : 클라이언트가 Zookeeper와 통신할 때 사용하는 포트를 정의 (default :2181)

 

ZOOKEEPER_TICK_TIME : Zookeeper는 클라이언트와 연결된 상태를 유지하기 위해 주기적인 하트비트를 전송하는데

                                               기본 타임아웃을 정의  (default : 2000ms) 

        

 

  • KAFKA

KAFKA_BROKER_ID : Kafa 클러스터 내에서 브로커를 식별하는 고유 ID

 

KAFKA_ZOOKEEPER_CONNECT : Kafka 브로커가 Zookeeper에 연결하기 위한 주소

 

KAFKA_ADVERTISED_LISTENERS : 클라이언트와 다른 브로커가 이 브로커에 연결할 수 있는 주소

 

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : 리스너 이름과 보안 프로토콜의 매핑을 정의

 

KAFKA_INTER_BROKER_LISTENER_NAME : 브로커 간 통신에 사용되는 리스너의 이름을 정의

 

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 각 토픽에 대한 consumer offset 정보를 정해진 개수의 브로커에 복제할 인자 수를 정의

 

KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR : 트랜잭션 상태 로그의 복제 인자 수를 정의

 

KAFKA_TRANSACTION_STATE_LOG_MIN_ISR : 트랜잭션 상태 로그 파티션의 최소 ISR(In-Sync Replicas) 수를 정의. 트랜잭션 상태 로그가 정상적으로 작동하기 위해 필요한 최소 동기화 복제본 수를 설정

 

 

Kafka 트랜잭션이란?

Kafka 트랜잭션은 메시지의 정확한 전달을 보장하는 기능을 제공한다. Kafka의 트랜잭션은 하나 이상의 프로듀서와 소비자 작업을 단일 트랜잭션으로 묶어 원자성을 보장한다. 즉, 모든 작업이 성공적으로 완료되거나 전부 실패하게 만든다. 이 과정에서 트랜잭션 상태 로그는 중요한 역할을 한다. (DB Transaction과 유사)

 

 

트랜잭션 상태 로그란?

트랜잭션 상태 로그는 Kafka 클러스터에서 트랜잭션의 상태를 저장하고 관리하는 특수한 토픽이다. 이는 트랜잭션이 커밋 또는 중단되는 과정을 추적하는 데 사용된다.

- 트랜잭션 상태 로그의 역할
  1. 트랜잭션 ID 추적: 각 트랜잭션에는 고유한 트랜잭션 ID가 있는데 트랜잭션 상태 로그는 이러한 ID와 트랜잭션의
     상태를 추적한다.
  2. 트랜잭션 상태 저장: 트랜잭션이 진행되는 동안 각 단계(시작, 커밋, 중단 등)의 상태를 저장
  3. 장애 복구: 브로커가 장애에서 복구될 때 트랜잭션 상태 로그를 참조하여 트랜잭션의 상태를 일관되게 유지

- 중요성
   1. 원자성 보장: 트랜잭션 상태 로그는 Kafka가 원자성을 제공하는 데 필수적이다. 모든 프로듀서와 소비자 작업이
       완전하게 완료되었는지 확인할 수 있다.
   2. 데이터 일관성: 트랜잭션 상태 로그는 메시지가 정확히 한 번 전달되도록 보장하여 데이터 일관성을 유지한다.

 

 

트랜잭션과 관련된 설정
  1. KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:
    • 설명: 트랜잭션 상태 로그 토픽의 복제 인자 수를 정의
    • 역할: 트랜잭션 상태 로그가 여러 브로커에 복제되어, 단일 브로커 장애 시에도 트랜잭션의 상태를 복원할 수 있도록 보장
    • 예시: 3으로 설정된 경우, 트랜잭션 상태 로그의 각 파티션이 3개의 브로커에 복제
      즉, 트랜잭션 상태 로그의 내구성을 보장하기 위해 복제 인자를 설정

  2. KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:
    • 설명: 트랜잭션 상태 로그 파티션의 최소 ISR(In-Sync Replicas) 수를 정의
    • 역할: 트랜잭션이 커밋 또는 중단되기 위해 필요한 최소 동기화된 복제본의 수를 설정. 이 값이 설정된 수준보다 낮으면, 트랜잭션 커밋이 실패하여 데이터 일관성이 유지
    • 예시: 2로 설정된 경우, 트랜잭션 커밋을 위해 최소 2개의 동기화된 복제본이 필요
      즉, 트랜잭션 커밋을 위해 필요한 최소 동기화 복제본의 수를 설정하여 데이터 일관성을 유지

 

 

 

Docker Compose 실행 및 ui 확인

 

도커 컴포즈 실행 :

docker compose up -d

 

 

도커 실행 확인 : docker ps

 

Zookeeper, ui, kafka 서버가 정상적으로 떴는지 확인.

 

localhost:8080 접속가능한지 확인

 

 

 

Kafka Producer/Consumer Test

docker ps 후 kafka 서버 3대중 아무곳이나 접속한다

docker exec -it [컨테이너ID] /bin/bash

 

토픽 생성

kafka-topics --create --topic test-topic --bootstrap-server host.docker.internal:9092 --partitions 3 --replication-factor 3

 

위왕같이 test-topic을 생성한다

 

 

Consumer 실행

kafka-console-consumer --topic test-topic --bootstrap-server host.docker.internal:9094 host.docker.internal:9092 host.docker.internal:9093 --from-beginning

 

Producer 실행

kafka-console-producer --topic test-topic --bootstrap-server host.docker.internal:9093 host.docker.internal:9092 host.docker.internal:9094

 

Consumer, Producer는 같은 Kafka 컨테이너에서 확인해도 되고 다른 Kafka 컨테이너에서 확인해도 상관없다.

 

producer에서 메세지를 발생하면 consumer에서 확인이 가능하다.

 

consumer 

 

producer

 

 

또한 Kafka UI에서도 확인 가능하다.

 

test-topic를 클릭하면 해당 토픽에대한 상세정보를 볼 수 있다.

 

 

위 정보는 아래 명령어로도 확인할 수 있다.

kafka-topics --describe --topic test-topic --bootstrap-server host.docker.internal:9093

 

어떤 메세지를 전송했는지도 타임스탬프와 함께 Offset 내용을 볼 수 있다.

 

 

 

+ 추가)

안정적인 연결을 위해 zookeeper 3대, kafka 3대, ui로 구성하는 docker-compose.yml

version: '3'
services:
  zookeeper1:
    image: confluentinc/cp-zookeeper:7.3.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    user: "0:0"
    ports:
      - "2181:2181"
    volumes:
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data1:/var/lib/zookeeper/data
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log1:/var/lib/zookeeper/log
      
  zookeeper2:
    image: confluentinc/cp-zookeeper:7.3.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    user: "0:0"
    ports:
      - "2182:2181"
    volumes:
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data2:/var/lib/zookeeper/data
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log2:/var/lib/zookeeper/log
      
  zookeeper3:
    image: confluentinc/cp-zookeeper:7.3.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    user: "0:0"
    ports:
      - "2183:2181"
    volumes:
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\data3:/var/lib/zookeeper/data
      - C:\Users\user\Documents\myProject\kafka\docker\zookeeper\log3:/var/lib/zookeeper/log

  kafka1:
    image: confluentinc/cp-kafka:7.3.1
    user: "0:0"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1    #   kafka 클러스터 내에서 브로커를 식별하는 고유 ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183   #   kafka 브로커가 Zookeeper에 연결하기 위한 주소. 
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092   #   
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT #   보안 프로토콜 PLAINTEXT 사용
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT   #   Kafka 브로커 간 통신에 사용되는 리스너 이름, 브로커간 통신에 PLAINTEXT 리스너 사용
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3     #   각 토픽 별 consumer offset 정보를 3개의 브로커에 복제함
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 #   트랜잭션 상태 로그를 3개의 브로커에 복제함
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2    #   트랜잭션 상태 로그를 최소 2개의 동기화된 복제본을 가지도록 설정
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:                          #디렉토리 마운트file
      - C:\Users\user\Documents\myProject\kafka\docker\kafka\data1:/kafka/logs

  kafka2:
    image: confluentinc/cp-kafka:7.3.1
    user: "0:0"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:
      - C:\Users\user\Documents\myProject\kafka\docker\kafka\data2:/kafka/logs
      
  kafka3:
    image: confluentinc/cp-kafka:7.3.1
    user: "0:0"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2182,zookeeper3:2183
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:                          #디렉토리 마운트file
      - C:\Users\user\Documents\myProject\kafka\docker\kafka\data3:/kafka/logs
    
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9093,kafka3:9094
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://localhost:8083
    depends_on:
      - kafka1
      - kafka2
      - kafka3