programming language/Kakfa

[Kafka 기초 학습] 카프카 기초 개념 4 - 메시지와 메시지 포맷 과 메시지 전송 및 분석 실습 + 파티셔닝 이해

공대키메라 2026. 6. 9. 21:52

지난 시간에는 하이워터마크와 리더에포크에 대해 알아보았다. (지난 내용이 궁금하면 여기 클릭!)

 

이번에는 메시지와 메시지 포맷, 그리고 로그와 컴팩션에 대해서 좀 더 자세하게 보려고 한다.

 

물론 지난글인 카프카 기초 개념 2 - 자세한 개념과 프로듀서의 작동 방식, 그리고 topic 상세 정보 보기에서 살짝 봤는데 뭔가 아직도 헷갈렸다.

 

그도 그런게 책을 읽으면 책의 내용이 뭔가 차근차근 한다기보다는 널뛰기 해서 갑자기 이거 봣다가, 저 뒤에 가서 아 이거 자세히 못봣으니 여기서 보자. 그리고 뒤에 가서 아 이거 또 설명해야해요 이런 방식이다.

 

그렇기 때문에 계속 글을 다시금 보면서 부족한 부분은 별도의 시리즈로 빼고, 앞에 기초 개념 내용도 재정비를 할 것이다. 

 

해당 글의 내용은 Kafka 공식 문서 4.3 버전으로 implementation 섹션을 중심으로 읽고 내용을 추가했다.

 


1. 메세지

프로듀서가 지정된 Topic 으로 메시지 혹은 레코드를 전송한다.

 

여기서 메시지를 Kafka 공식문서에서 어떻게 설명하는지 보려고 한다.

 

메시지는 가변 길이 헤더, 가변 길이 불투명 키 바이트 배열, 그리고 가변 길이 불투명 값 바이트 배열로 구성됩니다. 헤더 형식은 다음 섹션에서 설명합니다.

키와 값을 불투명하게 유지하는 것은 올바른 결정입니다: 현재 직렬화 라이브러리 개발이 활발히 진행되고 있으며, 특정 방식이 모든 용도에 적합한 것은 아니기 때문입니다.

Kafka를 사용하는 특정 애플리케이션은 사용 목적에 따라 특정 직렬화 방식을 요구할 가능성이 높습니다.

RecordBatch 인터페이스는 NIO channel에 대한 대량 읽기 및 쓰기를 위한 특수 메서드를 제공하는 메시지 반복자입니다.

출처 : https://kafka.apache.org/43/implementation/messages/

 

메세지는 결국 데이터 통신을 위한 객체 를 말한다.

 

여기서 키와 값을 불투명하게(opaque) 유지하는게 좋은 결정이라고 한다. 

 

카프카에서 키와 값을 불투명하게(Opaque) 정의한다는 것은, 카프카 브로커(서버)가 메시지의 실제 내용이나 데이터 타입(JSON, String, Integer, 커스텀 객체 등)이 무엇인지 전혀 알지 못하게 단순한 '바이트 배열(Byte Array)'이라는 블랙박스 형태로 취급한다는 뜻 이라고 한다.

 

이의 장점은 직렬화 포맷에 대한 완벽한 디커플링(Decoupling), 브로커의 CPU 연산 최소화, Zero-Copy를 통한 네트워크/디스크 I/O 극대화를 들 수 있다.

 

 

또한, 메시지는 배치로 전송된다는 사실을 기억한다면 RecordBatch 인터페이스가 그 역할을 하는 반복자임을 알 수 있다.

 

NIO 구조에서는 끝임없이 loop를 돌면서 최대한 적은 thread를 활요하는 구조임으로 반복되는 구조이니 반복자라고 한 듯 하다.

 

2. 메시지 포맷 그리고 실제 전송

메시지(레코드라고도 함)는 항상 일괄 처리(batch) 방식으로 기록됩니다. 메시지 일괄 처리를 레코드 일괄 처리(record batch)라고 하며, 레코드 일괄 처리에는 하나 이상의 레코드가 포함됩니다.

드물게 레코드 일괄 처리에 단일 레코드만 포함될 수도 있습니다. 레코드 일괄 처리와 레코드는 각각 고유한 헤더를 가지고 있으며, 각 헤더의 형식은 아래에 설명되어 있습니다.

출처: https://kafka.apache.org/43/implementation/message-format/

 

레코드 일괄 처리에 단일 레코드만 포함될 수 도 있다는 말이 무슨 말인가?

 

그것은 메시지를 일괄 처리 하는 과정에서 하나만 들어왓는데 그 다음 들어오지 않으면 영원히 전송되지 않는 경우 설정값을 통해 최대 대기 시간을 설정할 수 있다. (linger.ms)

 

다음은 공식문서에서 소개하는 RecordBatch의 on-disk 포맷이다. 필자는 무슨 말인지 잘 모르니 옆에 주석을 달았다.

 

Record Batch

baseOffset: int64                      // 이 배치에 포함된 첫 번째 메시지의 기준 오프셋 번호
batchLength: int32                     // 배치 전체의 바이트 크기 (Zero-copy I/O 처리를 위한 핵심 값)
partitionLeaderEpoch: int32            // 파티션 리더의 세대 번호 (스플릿 브레인 현상 방지용)
magic: int8 (current magic value is 2) // 메시지 포맷 버전 (버전 2부터 Exactly-Once 지원)
crc: uint32                            // 데이터가 네트워크나 디스크에서 오염되었는지 검증하는 체크섬
attributes: int16                      // 배치의 속성들을 저장하는 16비트 플래그 공간
    bit 0~2:                           // 16비트 중 우측 3개 비트를 사용해 어떤 압축을 썼는지 표시
        0: no compression              // 압축 안 함
        1: gzip                        // GZIP 압축
        2: snappy                      // Snappy 압축
        3: lz4                         // LZ4 압축
        4: zstd                        // ZSTD 압축
    bit 3: timestampType               // 기록된 시간이 프로듀서 생성 시간인지 브로커 적재 시간인지 구분
    bit 4: isTransactional (0 means not transactional)  // 트랜잭션으로 묶인 메시지인지 여부
    bit 5: isControlBatch (0 means not a control batch) // 데이터가 아닌 내부 제어용 마커인지 여부
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction) // 로그 컴팩션 시 삭제 기준점 작동 여부
    bit 7~15: unused                                    // 추후 확장을 위해 사용하지 않고 비워둔 공간
lastOffsetDelta: int32 // 마지막 메시지 오프셋과 baseOffset의 차이값 (용량 절약 목적)
baseTimestamp: int64   // 배치의 기준 시간 (주로 첫 메시지 생성 시간)
maxTimestamp: int64    // 배치에 포함된 메시지 중 가장 늦은 시간 (로그 보관 주기 계산용)
producerId: int64      // 메시지를 생성한 프로듀서의 고유 ID (메시지 중복 적재 방지용)
producerEpoch: int16   // 프로듀서의 세대 번호 (네트워크 지연된 예전 프로듀서의 요청 차단용)
baseSequence: int32    // 이 배치의 시작 시퀀스 번호 (재전송 시 중복 방지용)
recordsCount: int32    // 이 배치 안에 들어있는 실제 단건 메시지(Record)의 총 개수
records: [Record]      // 실제 직렬화되고 압축된 메시지들의 배열 (페이로드)

 

 

그렇구나... 그러면 내가 실제로 메시지를 보내보고 이게 정말로 Record Batch 명세서처럼 나도 찍힐지 나의 테스트 환경에서 실행해보았다.

 

kafka-1 환경에 들어가서 다음 명령어들을 실행했고 출력된 값들을 그대로 보여주겠다.

 

중간 중간 실제 명령어의 결과에 주석을 달아서 보도록 하겠다.

 

07951ea781ed:/tmp/kafka-logs$ ls
__cluster_metadata-0   __consumer_offsets-21  __consumer_offsets-35  __consumer_offsets-49
__consumer_offsets-0   __consumer_offsets-22  __consumer_offsets-36  __consumer_offsets-5
__consumer_offsets-1   __consumer_offsets-23  __consumer_offsets-37  __consumer_offsets-6
__consumer_offsets-10  __consumer_offsets-24  __consumer_offsets-38  __consumer_offsets-7
__consumer_offsets-11  __consumer_offsets-25  __consumer_offsets-39  __consumer_offsets-8
__consumer_offsets-12  __consumer_offsets-26  __consumer_offsets-4   __consumer_offsets-9
__consumer_offsets-13  __consumer_offsets-27  __consumer_offsets-40  bootstrap.checkpoint
__consumer_offsets-14  __consumer_offsets-28  __consumer_offsets-41  cleaner-offset-checkpoint
__consumer_offsets-15  __consumer_offsets-29  __consumer_offsets-42  cluster-test-0
__consumer_offsets-16  __consumer_offsets-3   __consumer_offsets-43  cluster-test-1
__consumer_offsets-17  __consumer_offsets-30  __consumer_offsets-44  cluster-test-2
__consumer_offsets-18  __consumer_offsets-31  __consumer_offsets-45  log-start-offset-checkpoint
__consumer_offsets-19  __consumer_offsets-32  __consumer_offsets-46  meta.properties
__consumer_offsets-2   __consumer_offsets-33  __consumer_offsets-47  recovery-point-offset-checkpoint
__consumer_offsets-20  __consumer_offsets-34  __consumer_offsets-48  replication-offset-checkpoint
07951ea781ed:/tmp/kafka-logs$ cd cluster-test-0
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ 00000000000000000000.log \
  --deep-iteration \ // 깊이 들어가서 개별 레코드까지 다 분해해서 보기
  --print-data-log   // 실제 메시지 내용을 터미널에 출력해라!
bash: 00000000000000000000.log: command not found
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ cd cluster-test-0
bash: cd: cluster-test-0: No such file or directory
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ ls
00000000000000000008.log  00000000000000000008.snapshot  leader-epoch-checkpoint  partition.metadata
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ 00000000000000000008.log --deep-iteration --print-data-log
bash: 00000000000000000008.log: command not found
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ --files 00000000000000000008.log --deep-iteration --print-data-log
bash: --files: command not found
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /usr/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log
bash: /usr/bin/kafka-dump-log.sh: No such file or directory
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log
bash: kafka-dump-log.sh: command not found
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log
Dumping 00000000000000000008.log
Log starting offset: 8
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000000.log --deep-iteration --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
Exception in thread "main" java.nio.file.NoSuchFileException: 00000000000000000000.log
        at java.base/sun.nio.fs.UnixException.translateToIOException(Unknown Source)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(Unknown Source)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(Unknown Source)
        at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(Unknown Source)
        at java.base/java.nio.channels.FileChannel.open(Unknown Source)
        at java.base/java.nio.channels.FileChannel.open(Unknown Source)
        at org.apache.kafka.common.record.FileRecords.openChannel(FileRecords.java:494)
        at org.apache.kafka.common.record.FileRecords.open(FileRecords.java:449)
        at kafka.tools.DumpLogSegments$.dumpLog(DumpLogSegments.scala:281)
        at kafka.tools.DumpLogSegments$.$anonfun$main$1(DumpLogSegments.scala:81)
        at kafka.tools.DumpLogSegments$.$anonfun$main$1$adapted(DumpLogSegments.scala:72)
        at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1324)
        at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:72)
        at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
        //아! 메시지가 현재 쌓여있지 않아서 들어가서 메시지 전송해버림.
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic cluster-test
>TEST MSG 1
>TEST MSG 2
>TEST MSG 3
>^C07951ea781ed:/tmp/kafka-logs/cluster-test-0$ LS
bash: LS: command not found
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ ls
00000000000000000008.index  00000000000000000008.snapshot   leader-epoch-checkpoint
00000000000000000008.log    00000000000000000008.timeindex  partition.metadata
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ ls -lh
total 16K
-rw-r--r-- 1 appuser appuser 10M Jun  9 06:05 00000000000000000008.index
-rw-r--r-- 1 appuser appuser 234 Jun  9 06:05 00000000000000000008.log
-rw-r--r-- 1 appuser appuser  10 Jun  7 14:20 00000000000000000008.snapshot
-rw-r--r-- 1 appuser appuser 10M Jun  9 05:08 00000000000000000008.timeindex
-rw-r--r-- 1 appuser appuser   8 Jun  9 06:05 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser  43 May 31 07:02 partition.metadata
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log
Dumping 00000000000000000008.log
Log starting offset: 8
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1780985132513 size: 78 magic: 2 compresscodec: none crc: 890345923 isvalid: true
| offset: 8 CreateTime: 1780985132513 keySize: -1 valueSize: 10 sequence: 0 headerKeys: [] payload: TEST MSG 1
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 78 CreateTime: 1780985134501 size: 78 magic: 2 compresscodec: none crc: 544469083 isvalid: true
| offset: 9 CreateTime: 1780985134501 keySize: -1 valueSize: 10 sequence: 1 headerKeys: [] payload: TEST MSG 2
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 156 CreateTime: 1780985137161 size: 78 magic: 2 compresscodec: none crc: 4198686687 isvalid: true
| offset: 10 CreateTime: 1780985137161 keySize: -1 valueSize: 10 sequence: 2 headerKeys: [] payload: TEST MSG 3

 

 

여기서 맨 마지막을 정리해서 보자.

 

[배치 1]
baseOffset: 8                             // 이 배치에 포함된 첫 번째 메시지의 고유 오프셋 번호
batchLength: 78                           // 배치 전체의 바이트 크기
partitionLeaderEpoch: 8                   // 파티션 리더의 세대 번호
magic: 2                                  // 메시지 포맷 버전 (v2)
crc: 890345923                            // 데이터 무결성 체크섬
attributes: 0                             // 배치의 속성을 담는 16비트 플래그
    bit 0~2: 0 (no compression)           // 압축 알고리즘 선택 (0: 압축 안 함)
    bit 3: 0                              // 타임스탬프 타입
    bit 4: 0 (isTransactional)            // 트랜잭션 여부
    bit 5: 0 (isControlBatch)             // 제어 메시지 여부
    bit 6: 0                              // 로그 컴팩션 삭제 기준점 여부
    bit 7~15: 0                           // 미사용 예비 비트
lastOffsetDelta: 0                        // 마지막 오프셋과 baseOffset의 차이값
baseTimestamp: 1780985132513              // 배치의 기준 시간
maxTimestamp: 1780985132513               // 배치 내 가장 최신 메시지 시간
producerId: 5000                          // 프로듀서의 고유 ID
producerEpoch: 0                          // 프로듀서의 세대 번호
baseSequence: 0                           // 이 배치의 시작 시퀀스 번호
recordsCount: 1                           // 배치 내 실제 레코드 개수
records:                                  // 실제 데이터 배열
    offset: 8                             // 개별 레코드의 오프셋
    CreateTime: 1780985132513             // 레코드 생성 시간
    keySize: -1                           // 키 크기 (-1: 없음)
    valueSize: 10                         // 값 크기
    sequence: 0                           // 레코드별 시퀀스 번호
    headerKeys: []                        // 메시지 헤더 키 배열
    payload: TEST MSG 1                   // 실제 데이터 본문

[배치 2]
baseOffset: 9                             // 이 배치에 포함된 첫 번째 메시지의 고유 오프셋 번호
batchLength: 78                           // 배치 전체의 바이트 크기
partitionLeaderEpoch: 8                   // 파티션 리더의 세대 번호
magic: 2                                  // 메시지 포맷 버전 (v2)
crc: 544469083                            // 데이터 무결성 체크섬
attributes: 0                             // 배치의 속성을 담는 16비트 플래그
    bit 0~2: 0 (no compression)           // 압축 알고리즘 선택 (0: 압축 안 함)
    bit 3: 0                              // 타임스탬프 타입
    bit 4: 0 (isTransactional)            // 트랜잭션 여부
    bit 5: 0 (isControlBatch)             // 제어 메시지 여부
    bit 6: 0                              // 로그 컴팩션 삭제 기준점 여부
    bit 7~15: 0                           // 미사용 예비 비트
lastOffsetDelta: 0                        // 마지막 오프셋과 baseOffset의 차이값
baseTimestamp: 1780985134501              // 배치의 기준 시간
maxTimestamp: 1780985134501               // 배치 내 가장 최신 메시지 시간
producerId: 5000                          // 프로듀서의 고유 ID
producerEpoch: 0                          // 프로듀서의 세대 번호
baseSequence: 1                           // 이 배치의 시작 시퀀스 번호
recordsCount: 1                           // 배치 내 실제 레코드 개수
records:                                  // 실제 데이터 배열
    offset: 9                             // 개별 레코드의 오프셋
    CreateTime: 1780985134501             // 레코드 생성 시간
    keySize: -1                           // 키 크기 (-1: 없음)
    valueSize: 10                         // 값 크기
    sequence: 1                           // 레코드별 시퀀스 번호
    headerKeys: []                        // 메시지 헤더 키 배열
    payload: TEST MSG 2                   // 실제 데이터 본문

[배치 3]
baseOffset: 10                            // 이 배치에 포함된 첫 번째 메시지의 고유 오프셋 번호
batchLength: 78                           // 배치 전체의 바이트 크기
partitionLeaderEpoch: 8                   // 파티션 리더의 세대 번호
magic: 2                                  // 메시지 포맷 버전 (v2)
crc: 4198686687                           // 데이터 무결성 체크섬
attributes: 0                             // 배치의 속성을 담는 16비트 플래그
    bit 0~2: 0 (no compression)           // 압축 알고리즘 선택 (0: 압축 안 함)
    bit 3: 0                              // 타임스탬프 타입
    bit 4: 0 (isTransactional)            // 트랜잭션 여부
    bit 5: 0 (isControlBatch)             // 제어 메시지 여부
    bit 6: 0                              // 로그 컴팩션 삭제 기준점 여부
    bit 7~15: 0                           // 미사용 예비 비트
lastOffsetDelta: 0                        // 마지막 오프셋과 baseOffset의 차이값
baseTimestamp: 1780985137161              // 배치의 기준 시간
maxTimestamp: 1780985137161               // 배치 내 가장 최신 메시지 시간
producerId: 5000                          // 프로듀서의 고유 ID
producerEpoch: 0                          // 프로듀서의 세대 번호
baseSequence: 2                           // 이 배치의 시작 시퀀스 번호
recordsCount: 1                           // 배치 내 실제 레코드 개수
records:                                  // 실제 데이터 배열
    offset: 10                            // 개별 레코드의 오프셋
    CreateTime: 1780985137161             // 레코드 생성 시간
    keySize: -1                           // 키 크기 (-1: 없음)
    valueSize: 10                         // 값 크기
    sequence: 2                           // 레코드별 시퀀스 번호
    headerKeys: []                        // 메시지 헤더 키 배열
    payload: TEST MSG 3                   // 실제 데이터 본문

 

위에서 보면 이 Record Batch의 크기가 각각 78, 78, 78 byte로 ls -al 해서 보인 00000000000000000008.log 파일의 크기를 보면 정확히 일치한다.

 

여기서 단순하게 보내지 말고 뭔가 다양한 설정을 해서 보내보도록 하겠다.

 

07951ea781ed:/tmp/kafka-logs/cluster-test-0$ cat <<EOF > /tmp/kafka-logs/custom-producer.properties
compression.type=snappy
EOF
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|TEST MSG WITH HEADER FINAL" | /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic cluster-test \
  --command-config /tmp/kafka-logs/custom-producer.properties \
  --reader-property "parse.headers=true" \
  --reader-property "headers.delimiter=|" \
  --reader-property "headers.separator=," \
  --reader-property "headers.key.separator==" \
  --reader-property "ignore.error=false"
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ ls -l /tmp/kafka-logs/cluster-test-0/00000000000000000008.log
-rw-r--r-- 1 appuser appuser 315 Jun  9 06:37 /tmp/kafka-logs/cluster-test-0/00000000000000000008.log
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log | tail -n 15
Dumping 00000000000000000008.log
Log starting offset: 8
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1780985132513 size: 78 magic: 2 compresscodec: none crc: 890345923 isvalid: true
| offset: 8 CreateTime: 1780985132513 keySize: -1 valueSize: 10 sequence: 0 headerKeys: [] payload: TEST MSG 1
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 78 CreateTime: 1780985134501 size: 78 magic: 2 compresscodec: none crc: 544469083 isvalid: true
| offset: 9 CreateTime: 1780985134501 keySize: -1 valueSize: 10 sequence: 1 headerKeys: [] payload: TEST MSG 2
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 156 CreateTime: 17809851307951ea781ed:/tmp/kafka-logs/cluster-test-0$ cat <<EOF > /tmp/kafka-logs/custom-producer.properties
compression.type=snappy                                                                                                 EOF
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|TEST MSG WITH HEADER FINAL" | /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic cluster-test \
  --command-config /tmp/kafka-logs/custom-producer.properties \
  --reader-property "parse.headers=true" \
  --reader-property "key.separator=|" \
  --reader-property "headers.separator=," \
  --reader-property "headers.key.separator==" \
  --reader-property "ignore.error=false"
org.apache.kafka.common.KafkaException: No headers delimiter found on line number 1: 'traceId=abc-123,version=2|TEST MSG WITH HEADER FINAL'
        at org.apache.kafka.tools.LineMessageReader.parse(LineMessageReader.java:177)
        at org.apache.kafka.tools.LineMessageReader$1.hasNext(LineMessageReader.java:129)
        at org.apache.kafka.tools.ConsoleProducer.loopReader(ConsoleProducer.java:99)
        at org.apache.kafka.tools.ConsoleProducer.start(ConsoleProducer.java:71)
        at org.apache.kafka.tools.ConsoleProducer.main(ConsoleProducer.java:60)
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|TEST MSG WITH HEADER FINAL" | /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic cluster-test \
  --command-config /tmp/kafka-logs/custom-producer.properties \
  --reader-property "parse.headers=true" \
  --reader-property "headers.delimiter=|" \
  --reader-property "headers.separator=," \
  --reader-property "headers.key.separator==" \
  --reader-property "ignore.error=false"
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log | tail -n 15
Dumping 00000000000000000008.log
Log starting offset: 8
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1780985132513 size: 78 magic: 2 compresscodec: none crc: 890345923 isvalid: true
| offset: 8 CreateTime: 1780985132513 keySize: -1 valueSize: 10 sequence: 0 headerKeys: [] payload: TEST MSG 1
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 78 CreateTime: 1780985134501 size: 78 magic: 2 compresscodec: none crc: 544469083 isvalid: true
| offset: 9 CreateTime: 1780985134501 keySize: -1 valueSize: 10 sequence: 1 headerKeys: [] payload: TEST MSG 2
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 156 CreateTime: 1780985137161 size: 78 magic: 2 compresscodec: none crc: 4198686687 isvalid: true
| offset: 10 CreateTime: 1780985137161 keySize: -1 valueSize: 10 sequence: 2 headerKeys: [] payload: TEST MSG 3
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4002 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 234 CreateTime: 1780987030892 size: 81 magic: 2 compresscodec: none crc: 3658067560 isvalid: true
| offset: 11 CreateTime: 1780987030892 keySize: -1 valueSize: 13 sequence: 0 headerKeys: [] payload: VERIFY_DATA_4
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4013 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 315 CreateTime: 1780989866788 size: 120 magic: 2 compresscodec: none crc: 1433641583 isvalid: true
| offset: 12 CreateTime: 1780989866788 keySize: -1 valueSize: 26 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH HEADER FINAL
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000008.log --deep-iteration --print-data-log
Dumping 00000000000000000008.log
Log starting offset: 8
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1780985132513 size: 78 magic: 2 compresscodec: none crc: 890345923 isvalid: true
| offset: 8 CreateTime: 1780985132513 keySize: -1 valueSize: 10 sequence: 0 headerKeys: [] payload: TEST MSG 1
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 78 CreateTime: 1780985134501 size: 78 magic: 2 compresscodec: none crc: 544469083 isvalid: true
| offset: 9 CreateTime: 1780985134501 keySize: -1 valueSize: 10 sequence: 1 headerKeys: [] payload: TEST MSG 2
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5000 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 156 CreateTime: 1780985137161 size: 78 magic: 2 compresscodec: none crc: 4198686687 isvalid: true
| offset: 10 CreateTime: 1780985137161 keySize: -1 valueSize: 10 sequence: 2 headerKeys: [] payload: TEST MSG 3
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4002 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 234 CreateTime: 1780987030892 size: 81 magic: 2 compresscodec: none crc: 3658067560 isvalid: true
| offset: 11 CreateTime: 1780987030892 keySize: -1 valueSize: 13 sequence: 0 headerKeys: [] payload: VERIFY_DATA_4
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4013 producerEpoch: 0 partitionLeaderEpoch: 8 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 315 CreateTime: 1780989866788 size: 120 magic: 2 compresscodec: none crc: 1433641583 isvalid: true
| offset: 12 CreateTime: 1780989866788 keySize: -1 valueSize: 26 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH HEADER FINAL
07951ea781ed:/tmp/kafka-logs/cluster-test-0$

 

 

다음 메시지를 어떻게 보내는지 확인하자.

07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|TEST MSG WITH HEADER FINAL" | /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic cluster-test \
  --command-config /tmp/kafka-logs/custom-producer.properties \
  --reader-property "parse.headers=true" \
  --reader-property "key.separator=|" \
  --reader-property "headers.separator=," \
  --reader-property "headers.key.separator==

 

key는 | 로 분리하고, header는 , 로 분리해서 key로 넣어버린다. 그리고 key와 value는=로 분해한다. 

 

그리고  log 파일에서 마지막 11, 12 offset을 분석해보자.

 

[오프셋 11 - 단순 텍스트 전송]
baseOffset: 11                            // 배치의 시작 오프셋 번호
lastOffset: 11                            // 배치의 마지막 오프셋 번호
count: 1                                  // 배치 내 레코드 개수
baseSequence: 0                           // 이 배치의 시작 시퀀스 번호
lastSequence: 0                           // 이 배치의 마지막 시퀀스 번호
producerId: 4002                          // 프로듀서의 고유 ID
producerEpoch: 0                          // 프로듀서의 세대 번호
partitionLeaderEpoch: 8                   // 파티션 리더의 세대 번호
isTransactional: false                    // 트랜잭션 포함 여부
isControl: false                          // 제어 메시지(Commit/Abort) 여부
deleteHorizonMs: OptionalLong.empty       // 로그 컴팩션 삭제 기준 시간
position: 234                             // 로그 파일 내 물리적 바이트 위치
CreateTime: 1780987030892                 // 배치 생성 시간
size: 81                                  // 배치 전체의 바이트 크기
magic: 2                                  // 메시지 포맷 버전 (v2)
compresscodec: none                       // 압축 알고리즘 (적용 안 됨)
crc: 3658067560                           // 데이터 무결성 체크섬
isvalid: true                             // 데이터 정상 여부
| records:                                // 실제 데이터 부분
    offset: 11                            // 개별 레코드의 오프셋
    CreateTime: 1780987030892             // 레코드 생성 시간
    keySize: -1                           // 키 크기 (-1: 없음)
    valueSize: 13                         // 값(Payload) 크기
    sequence: 0                           // 레코드별 시퀀스 번호
    headerKeys: []                        // 메시지 헤더 (없음)
    payload: VERIFY_DATA_4                // 실제 데이터 본문

[오프셋 12 - 헤더 파싱 최종 성공]
baseOffset: 12                            // 배치의 시작 오프셋 번호
lastOffset: 12                            // 배치의 마지막 오프셋 번호
count: 1                                  // 배치 내 레코드 개수
baseSequence: 0                           // 이 배치의 시작 시퀀스 번호
lastSequence: 0                           // 이 배치의 마지막 시퀀스 번호
producerId: 4013                          // 프로듀서의 고유 ID
producerEpoch: 0                          // 프로듀서의 세대 번호
partitionLeaderEpoch: 8                   // 파티션 리더의 세대 번호
isTransactional: false                    // 트랜잭션 포함 여부
isControl: false                          // 제어 메시지(Commit/Abort) 여부
deleteHorizonMs: OptionalLong.empty       // 로그 컴팩션 삭제 기준 시간
position: 315                             // 로그 파일 내 물리적 바이트 위치
CreateTime: 1780989866788                 // 배치 생성 시간
size: 120                                 // 배치 전체의 바이트 크기 (헤더가 추가되어 81 -> 120으로 증가)
magic: 2                                  // 메시지 포맷 버전 (v2)
compresscodec: none                       // 압축 알고리즘 (적용 안 됨)
crc: 1433641583                           // 데이터 무결성 체크섬
isvalid: true                             // 데이터 정상 여부
| records:                                // 실제 데이터 부분
    offset: 12                            // 개별 레코드의 오프셋
    CreateTime: 1780989866788             // 레코드 생성 시간
    keySize: -1                           // 키 크기 (-1: 없음)
    valueSize: 26                         // 값(Payload) 크기
    sequence: 0                           // 레코드별 시퀀스 번호
    headerKeys: [traceId,version]         // 파싱된 메시지 헤더 키 (성공적으로 분리됨)
    payload: TEST MSG WITH HEADER FINAL   // 헤더가 제거된 실제 데이터 본문

 

 

 

이번에는 consumer에서 어떻게 쌓였는지 확인해보고, 연속으로 메시지를 특정 파티션으로 보내고 이를 출력하려고 한다.

 

07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic cluster-test \  
--from-beginning \ 
--formatter-property \ 
print.partition=true \
--formatter-property \
print.key=true \
--formatter-property print.offset=true
Partition:2     Offset:0        null    TEST MSG WITH HEADER 1
Partition:2     Offset:1        null    TEST MSG WITH HEADER 2
Partition:2     Offset:2        null    TEST MSG WITH HEADER 3
Partition:2     Offset:3        null    TEST MSG WITH HEADER 1
Partition:2     Offset:4        null    TEST MSG WITH HEADER 1
Partition:2     Offset:5        null    TEST MSG WITH HEADER 2
Partition:2     Offset:6        null    TEST MSG WITH HEADER 2
Partition:2     Offset:7        null    TEST MSG WITH REAL SNAPPY
Partition:0     Offset:8        null    TEST MSG 1
Partition:0     Offset:9        null    TEST MSG 2
Partition:0     Offset:10       null    TEST MSG 3
Partition:0     Offset:11       null    VERIFY_DATA_4
Partition:0     Offset:12       null    TEST MSG WITH HEADER FINAL
Partition:1     Offset:0        null    TEST MSG WITH HEADER 1
Partition:1     Offset:1        null    TEST MSG WITH HEADER 2
Partition:1     Offset:2        null    TEST MSG WITH HEADER FINAL
Partition:1     Offset:3        null    TEST MSG WITH REAL SNAPPY
Partition:1     Offset:4        ROOM-101        TEST MSG WITH REAL SNAPPY
^[[A^[[A^[[A^CProcessed a total of 18 messages
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|ROOM-101#TEST MSG WITH REAL SNAPPY" \
| /opt/kafka/bin/kafka-console-producer.sh  \
--bootstrap-server localhost:9092 \ 
--topic cluster-test \
--producer-property \ 
compression.type=snappy \
--reader-property "parse.key=true" \ 
--reader-property "key.separator=#" \ 
--reader-property "parse.headers=true" \
--reader-property "headers.delimiter=|" \
--reader-property "headers.separator=," \
--reader-property "headers.key.separator==" \ 
--reader-property "ignore.error=false"
Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead.
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|ROOM-101#TEST MSG WITH REAL SNAPPY" \
| /opt/kafka/bin/kafka-console-producer.sh  \
--bootstrap-server localhost:9092 \ 
--topic cluster-test \
--producer-property \ 
compression.type=snappy \
--reader-property "parse.key=true" \ 
--reader-property "key.separator=#" \ 
--reader-property "parse.headers=true" \
--reader-property "headers.delimiter=|" \
--reader-property "headers.separator=," \
--reader-property "headers.key.separator==" \ 
--reader-property "ignore.error=false"
Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead.
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ echo "traceId=abc-123,version=2|ROOM-101#TEST MSG WITH REAL SNAPPY" \
| /opt/kafka/bin/kafka-console-producer.sh  \
--bootstrap-server localhost:9092 \ 
--topic cluster-test \
--producer-property \ 
compression.type=snappy \
--reader-property "parse.key=true" \ 
--reader-property "key.separator=#" \ 
--reader-property "parse.headers=true" \
--reader-property "headers.delimiter=|" \
--reader-property "headers.separator=," \
--reader-property "headers.key.separator==" \ 
--reader-property "ignore.error=false"
Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead.
07951ea781ed:/tmp/kafka-logs/cluster-test-0$ /opt/kafka/bin/kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic cluster-test   --from-beginning   --formatter-property print.partition=true   --formatter-property print.key=true   --formatter-property print.offset=true
Partition:1     Offset:0        null    TEST MSG WITH HEADER 1
Partition:1     Offset:1        null    TEST MSG WITH HEADER 2
Partition:1     Offset:2        null    TEST MSG WITH HEADER FINAL
Partition:1     Offset:3        null    TEST MSG WITH REAL SNAPPY
Partition:1     Offset:4        ROOM-101        TEST MSG WITH REAL SNAPPY
Partition:1     Offset:5        ROOM-101        TEST MSG WITH REAL SNAPPY
Partition:1     Offset:6        ROOM-101        TEST MSG WITH REAL SNAPPY
Partition:1     Offset:7        ROOM-101        TEST MSG WITH REAL SNAPPY
Partition:0     Offset:8        null    TEST MSG 1
Partition:0     Offset:9        null    TEST MSG 2
Partition:0     Offset:10       null    TEST MSG 3
Partition:0     Offset:11       null    VERIFY_DATA_4
Partition:0     Offset:12       null    TEST MSG WITH HEADER FINAL
Partition:2     Offset:0        null    TEST MSG WITH HEADER 1
Partition:2     Offset:1        null    TEST MSG WITH HEADER 2
Partition:2     Offset:2        null    TEST MSG WITH HEADER 3
Partition:2     Offset:3        null    TEST MSG WITH HEADER 1
Partition:2     Offset:4        null    TEST MSG WITH HEADER 1
Partition:2     Offset:5        null    TEST MSG WITH HEADER 2
Partition:2     Offset:6        null    TEST MSG WITH HEADER 2
Partition:2     Offset:7        null    TEST MSG WITH REAL SNAPPY

 

자. 필자가 이번에는 특정 파티션으로 가도록 설정했다.

 

그러면 이거 거기에 정말 쌓여 있는지 볼 수 있나?

 

07951ea781ed:/tmp/kafka-logs$ cd cluster-test-1
07951ea781ed:/tmp/kafka-logs/cluster-test-1$ /opt/kafka/bin/kafka-dump-log.sh --files 00000000000000000000.log --deep-iteration --print-data-log | tail -n 20
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4004 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1780987235254 size: 116 magic: 2 compresscodec: none crc: 1064063112 isvalid: true
| offset: 0 CreateTime: 1780987235254 keySize: -1 valueSize: 22 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH HEADER 1
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4009 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 116 CreateTime: 1780989467128 size: 116 magic: 2 compresscodec: none crc: 1310233403 isvalid: true
| offset: 1 CreateTime: 1780989467128 keySize: -1 valueSize: 22 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH HEADER 2
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4011 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 232 CreateTime: 1780989737752 size: 120 magic: 2 compresscodec: none crc: 3042542737 isvalid: true
| offset: 2 CreateTime: 1780989737752 keySize: -1 valueSize: 26 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH HEADER FINAL
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4014 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 352 CreateTime: 1780990281749 size: 141 magic: 2 compresscodec: snappy crc: 1108105268 isvalid: true
| offset: 3 CreateTime: 1780990281749 keySize: -1 valueSize: 25 sequence: 0 headerKeys: [traceId,version] payload: TEST MSG WITH REAL SNAPPY
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4017 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 493 CreateTime: 1780990861022 size: 128 magic: 2 compresscodec: none crc: 3704284927 isvalid: true
| offset: 4 CreateTime: 1780990861022 keySize: 8 valueSize: 25 sequence: 0 headerKeys: [traceId,version] key: ROOM-101 payload: TEST MSG WITH REAL SNAPPY
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4018 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 621 CreateTime: 1780990935124 size: 128 magic: 2 compresscodec: none crc: 1338254084 isvalid: true
| offset: 5 CreateTime: 1780990935124 keySize: 8 valueSize: 25 sequence: 0 headerKeys: [traceId,version] key: ROOM-101 payload: TEST MSG WITH REAL SNAPPY
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4019 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 749 CreateTime: 1780990940520 size: 128 magic: 2 compresscodec: none crc: 826279710 isvalid: true
| offset: 6 CreateTime: 1780990940520 keySize: 8 valueSize: 25 sequence: 0 headerKeys: [traceId,version] key: ROOM-101 payload: TEST MSG WITH REAL SNAPPY
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: 0 lastSequence: 0 producerId: 4020 producerEpoch: 0 partitionLeaderEpoch: 4 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 877 CreateTime: 1780990944970 size: 128 magic: 2 compresscodec: none crc: 2875652434 isvalid: true
| offset: 7 CreateTime: 1780990944970 keySize: 8 valueSize: 25 sequence: 0 headerKeys: [traceId,version] key: ROOM-101 payload: TEST MSG WITH REAL SNAPPY

 


이걸 보기 좋게 정리하면 다음과 같다.

 

[Offset 0]
- Batch Metadata
  baseOffset: 0, lastOffset: 0, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4004, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 0, CreateTime: 1780987235254, size: 116, magic: 2
  compresscodec: none, crc: 1064063112, isvalid: true
- Record Data
  offset: 0, CreateTime: 1780987235254, sequence: 0
  keySize: -1, valueSize: 22
  headerKeys: [traceId,version]
  payload: TEST MSG WITH HEADER 1

--------------------------------------------------

[Offset 1]
- Batch Metadata
  baseOffset: 1, lastOffset: 1, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4009, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 116, CreateTime: 1780989467128, size: 116, magic: 2
  compresscodec: none, crc: 1310233403, isvalid: true
- Record Data
  offset: 1, CreateTime: 1780989467128, sequence: 0
  keySize: -1, valueSize: 22
  headerKeys: [traceId,version]
  payload: TEST MSG WITH HEADER 2

--------------------------------------------------

[Offset 2]
- Batch Metadata
  baseOffset: 2, lastOffset: 2, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4011, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 232, CreateTime: 1780989737752, size: 120, magic: 2
  compresscodec: none, crc: 3042542737, isvalid: true
- Record Data
  offset: 2, CreateTime: 1780989737752, sequence: 0
  keySize: -1, valueSize: 26
  headerKeys: [traceId,version]
  payload: TEST MSG WITH HEADER FINAL

--------------------------------------------------

[Offset 3] (Snappy 압축 적용됨)
- Batch Metadata
  baseOffset: 3, lastOffset: 3, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4014, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 352, CreateTime: 1780990281749, size: 141, magic: 2
  compresscodec: snappy, crc: 1108105268, isvalid: true
- Record Data
  offset: 3, CreateTime: 1780990281749, sequence: 0
  keySize: -1, valueSize: 25
  headerKeys: [traceId,version]
  payload: TEST MSG WITH REAL SNAPPY

--------------------------------------------------

[Offset 4] (메시지 Key 할당됨)
- Batch Metadata
  baseOffset: 4, lastOffset: 4, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4017, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 493, CreateTime: 1780990861022, size: 128, magic: 2
  compresscodec: none, crc: 3704284927, isvalid: true
- Record Data
  offset: 4, CreateTime: 1780990861022, sequence: 0
  keySize: 8, key: ROOM-101, valueSize: 25
  headerKeys: [traceId,version]
  payload: TEST MSG WITH REAL SNAPPY

--------------------------------------------------

[Offset 5]
- Batch Metadata
  baseOffset: 5, lastOffset: 5, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4018, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 621, CreateTime: 1780990935124, size: 128, magic: 2
  compresscodec: none, crc: 1338254084, isvalid: true
- Record Data
  offset: 5, CreateTime: 1780990935124, sequence: 0
  keySize: 8, key: ROOM-101, valueSize: 25
  headerKeys: [traceId,version]
  payload: TEST MSG WITH REAL SNAPPY

--------------------------------------------------

[Offset 6]
- Batch Metadata
  baseOffset: 6, lastOffset: 6, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4019, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 749, CreateTime: 1780990940520, size: 128, magic: 2
  compresscodec: none, crc: 826279710, isvalid: true
- Record Data
  offset: 6, CreateTime: 1780990940520, sequence: 0
  keySize: 8, key: ROOM-101, valueSize: 25
  headerKeys: [traceId,version]
  payload: TEST MSG WITH REAL SNAPPY

--------------------------------------------------

[Offset 7]
- Batch Metadata
  baseOffset: 7, lastOffset: 7, count: 1, baseSequence: 0, lastSequence: 0
  producerId: 4020, producerEpoch: 0, partitionLeaderEpoch: 4
  isTransactional: false, isControl: false, deleteHorizonMs: OptionalLong.empty
  position: 877, CreateTime: 1780990944970, size: 128, magic: 2
  compresscodec: none, crc: 2875652434, isvalid: true
- Record Data
  offset: 7, CreateTime: 1780990944970, sequence: 0
  keySize: 8, key: ROOM-101, valueSize: 25
  headerKeys: [traceId,version]
  payload: TEST MSG WITH REAL SNAPPY

 

다시 현재 파티션의 상태를 봐보자.

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic cluster-test

 

07951ea781ed:/tmp/kafka-logs/cluster-test-1$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic cluster-test
Topic: cluster-test     TopicId: CXJ_chL2SwOcbd1qxm1j7w PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=1
        Topic: cluster-test     Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3      Elr:    LastKnownElr:
        Topic: cluster-test     Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr:
        Topic: cluster-test     Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3      Elr:    LastKnownElr:

 

그렇다는 말은 현재 Partitions 1의 리더가 1번 브로커 혹은 노드에 있다는 말로, partition 0, partition2 에는 각각 replication이 위치해 있어서 서버가 내려가도 서비스가 중단되지 않도록 한다는 말이다.

 

파티션 번호 리더 브로커 (대장) 상태
cluster-test-0 Node 3 Node 3이 이 파티션의 쓰기/읽기 책임자
cluster-test-1 Node 1 Node 1이 이 파티션의 쓰기/읽기 책임자
cluster-test-2 Node 2 Node 2이 이 파티션의 쓰기/읽기 책임자

 

3. 파티셔닝 이해하기

사실 파티셔닝에 대해서는 간간히 나오기는 했는데, 필자는 이를 명확히 정리하지는 않았다.

 

메시지만 정리하니 필자가 재미가 없다. 

 

그래서 파티션과 파티셔닝, 그리고 파티셔닝 전략에 대해 추가 정리하려고 한다.

 

파티션

하나의 토픽을 여러 개로 나누어 분산 저장하는 '파일'

 

파티셔닝

파티셔닝은 파티션을 하는 것이다(?)

즉, 데이터를 각 파티션에 분산시키는 과정을 의미한다.

 

근데 이 파티셔닝이 왜 필요하지? 우선 다음 그림을 보자.

 

https://www.youtube.com/watch?v=8L-1HSTf97E&list=PLf38f5LhQtheK16nwnCYFqH23WUUvZfSb&index=3

 

해당 그림은 Apache Kafka 101에서 캡쳐한 화면이다. (여기 클릭)

특정 ID를 가진 온도계들이 온도를 측정해서 전송했다.

 

그런데 시간에 따른 온도를 측정하기 위해서 봣는데 메시지가 각 파티션으로 들어가버리고 만다.

 

이것은 현재 메시지를 전송하는데 키가 없기 때문이다.

 

그러면 우리는 시간에 따른 특정 기기의 온도 변화를 보기 위해서 어떻게 해야하는가?

 

시간에 따라서 메시지를 순서대로 정렬하기 위해서는 결국 하나의 파티션에 넣어야 한다. 

 

kafka에서의 순서 보장은 무조건 파티션 단위로만 가능하다. 글로벌한 순서 보장은 제공하지 않는다.

 

 

그러면 어떤 메시지가 순서를 보장하기 위해서 특정 파티션에 들어갈 수 있게 설정할 수 있나?

 

message에 키가 있으면 그게 가능하다!

 

다만 위의 예시는 현재 키가 없기 때문에 특정 아이디에 대한 온도값이 특정 파티션으로 순차적으로 쌓이지 않았다.

 

 

kafka에서는 내부적으로 key를 hash 처리해서 어느 파티션으로 넣을 수 있도록 설정할 수 있다.

 

 

 

그래서? 파티션과 파티셔닝의 장점이 뭐야?

 

파티션  → 물리적 분산 구조
           ├── 병렬 처리로 처리량 향상
           ├── 브로커 분산으로 수평 확장
           └── 복제로 장애 대응

파티셔닝 → 메시지를 어디에 넣을지 전략
           ├── 키 기반으로 순서 보장
           ├── 데이터 지역성으로 Consumer 단순화
           └── 커스텀으로 비즈니스 로직 반영

 

여기서 필자는 파티션이 어떻게 해서 병렬처리로 처리량이 향상되는지 이해를 못했다.

 

그건 아직 consumer쪽을 제대로 이해하지 못햇기 때문인 것 같다.

 

 

찾아보니 kafka는 각 파티션을 consumer 하나에게 전담으로 맡긴다. 

파티션이 3개라면 consumer 3개가 각자 맡은 파티션만 읽는다. 그러니 서로 같은 데이터를 두고 경쟁할 일이 없으니 동시처리가 가능한 것이다. 

 

일반적인 큐 시스템은 여러 Consumer가 하나의 큐에서 메시지를 꺼내려고 경쟁한다. 이러면 "내가 이미 읽은 거야, 네가 읽어" 하는 충돌을 막으려고 락이 필요하고, 락이 생기면 결국 순서대로 처리하게 되면서 병렬의 의미가 퇴색된다.

 

Kafka는 아예 구조적으로 이 문제를 피한다. 파티션을 나눠가지는 순간 담당 영역이 분리되기 때문에, 각 Consumer는 자기 파티션만 보면 되고 다른 Consumer가 뭘 하는지 신경 쓸 필요가 없다. 락도 없고 경합도 없으니 진짜 동시에 돌아간다.

 

결국 병렬 처리가 가능한 이유는 "파티션 수만큼 독립적인 작업 단위가 생기기 때문"이고, 처리량을 늘리고 싶으면 파티션과 Consumer를 같이 늘리면 된다.


 

이번 시간에는 공식 문서에서 알려주는 Message와 Message Format을 알아봤고 이를 직접 메시지를 보내고 출력하는 실습을 해보았다.

 

특히나 실습을 하면서 중요했던 부분은 이를 직접 실행하고 해보는것 자체가 어려웠다.

 

그리고 특정 파티션으로 메시지를 보내는 것도 해보았는데 파티션과 파티셔닝이 이해가 안돼서 이를 좀 정리했다.

 

 

무진장 어렵네... 

 

스스로 정보를 찾아서 글을 정리하고 실습도 해보니 좀 저자의 마음을 알 것 같다. 

 

이게 이렇게 개념과 쓰임이 여러군데 엮여 있어서 중구난방(?)으로 할 수 밖에 없구나...

 

 

다음 글에서는 프로듀서와 컨슈머에 대해서 조사하고 실습해볼 예정이다.

 

 

참고:

https://kafka.apache.org/43/implementation/message-format/

https://www.youtube.com/watch?v=8L-1HSTf97E&list=PLf38f5LhQtheK16nwnCYFqH23WUUvZfSb&index=3