1.토픽 생성 ,삭제 및 조회
토픽(Topic)이란 메시지를 구분하는 단위입니다. 먼저 위에서 실행한 카프카 브로커에 토픽(Topic)을 생성해 보도록 하겠습니다.
생성될 토픽 정보는 아래와 같습니다.
- 토픽이름 : test
- replication-factor : 복제 대상 파티션 개수 1개 (없음)
- partitions : 파티션 개수 3개
경로 : pwd ~/kafka_2.12-3.4.0/bin
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test
Created topic test
kafka_2.12-3.4.0/logs/controller.log
// 확인결과 로그에 나왔습니다.
[Controller id=0] New partition creation callback for test-2,test-1,test-0 (kafka.controller.KafkaController)
참고로 ..logs/controller.log 에서 카프카 로그를 확인해 보실 수 있습니다.
그럼 토픽이 정상적으로 생성되었는지 —describe 옵션을 통해 조회해 보겠습니다.
경로 : pwd ~/kafka_2.12-3.4.0/bin
./kafka-topics.sh --describe --bootstrap-server localhost:9092
Topic: test TopicId: pCNpxtZ2TSmOqoV3M-2a_A PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
정상적으로 test 토픽이 파티션 3개와 각각의 복제본(Replicas)은 없는 상태로 생성된 것을 확인하실 수 있습니다.
토픽 조회 명령어
bin/kafka-topics.sh --bootstrap-server=localhost:19092 --list
or
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
토픽 삭제 명령어
./kafka-topics.sh --delete --bootstrap-server 192.168.0.108:9092 --topic test
&
./kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic test
or
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
2. 이벤트 쓰기 (Producer - write)
Kafka 클라이언트는 이벤트 쓰기(+읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신하며 브로커는 이벤트를 수신하면 설정에 따라 필요한 기간 동안 이벤트를 저장합니다.(파일)
kafka-console-producer.sh 를 이용하여 생성했던 브로커내 test 토픽에 hello ifuwanna kafka 총 3개의 이벤트(레코드)를 발행하여 test 토픽(파티션)에 저장해 보겠습니다.
경로 : pwd ~/kafka_2.12-3.4.0/bin
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>hello
>ifuwanna
>kafka
#producer client를 종료하려면 Ctrl+C
3. 이벤트 읽기 (Consumer - read)
kafka-console-producer.sh 를 통해 test 토픽에 저장했던 이벤트(레코드)들을 읽어와 보도록 하겠습니다. 별도의 컨슈머 그룹 없이 --from-beginning 옵션을 통해 최초 시작지점의 오프셋부터 데이터를 가져오는 예제입니다.
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
hello
kafka
ifuwanna
- 컨슈머가 데이터를 polling()해서 Producer가 넣은 이벤트 3개 hello ifuwanna kafka 모두 조회해 온 것을 확인 하실 수 있습니다.
- 데이터가 순서대로 출력되지 않는 이유는 test토픽의 파티션이 3개이기 때문입니다. 컨슈머는 모든 파티션에 할당이 되었을 뿐이지 가져오는 것에는 순서가 없습니다. 순서를 유지하려면 파티션 하나를 지정하여 메시지를 관리해야 합니다.
- 이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.
4. 이벤트 읽기 (Consumer group)
여러개의 컨슈머의 논리그룹인 Consumer group은 이벤트를 처리할때 해당 그룹의 컨슈머들이 이벤트를 어디까지 처리했는지Consumer Offset을 통해 기억하고 그 이후 오프셋의 이벤트부터 처리해 나갑니다.
—group 옵션을 추가하여 아래 예제로 컨슈머 그룹이 어떻게 동작하는지 확인해 보겠습니다.
1. testgroup 이라는 컨슈머그룹을 지정하여 test 토픽에 들어있는 기존 이벤트 3개를 모두 읽어 들임
2. 다시 프로듀서를 통해 test 토픽에 1,2,3 이라는 이벤트를 신규로 추가
# 1.test 토픽의 기존 이벤트 읽기
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup --from-beginning --topic test
hello
kafka
ifuwanna
# 2. test 토픽에 1,2,3 이벤트 추가
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>1
>2
>3
현재 test 토픽에 총 6개의 이벤트를 넣었고 (hello ifuwanna kafka 1 2 3) testgroup 컨슈머 그룹에서 이중 3개를 읽어들인 상태입니다. 이상태에서 --describe 옵션을 통해 testgroup 그룹의 상태를 확인해 보겠습니다.
# 3. testgroup 컨슈머그룹의 현재 상태 확인
$ bin ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
Consumer group 'testgroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 1 1 0 - - -
testgroup test 1 2 2 0 - - -
testgroup test 2 0 3 3 - - -
- CURRENT-OFFSET LOG-END-OFFSET 모두 Partition#0은 1, Partition#1은 2인 것으로 보아 먼저 들어갔던 3개의 데이터는 파티션0,1에 할당되어 처리된 것을 알 수 있습니다.
- Partition#2의 CURRENT-OFFSET은 0이고 LOG-END-OFFSET 은 3인걸로 보아 추가 데이터(1,2,3)은 모두 Partition#2에 할당되었고 아직 해당 그룹에서 읽어 들이지 않았습니다.
해당 컨슈머그룹에서 위에서 추가로 넣었던 이벤트 1,2,3을 읽어 온 뒤 상태를 다시 확인해 보도록 하겠습니다.
# 4. test토픽의 나머지 1,2,3 이벤트 읽어들임
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup --from-beginning --topic test
1
2
3
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
# 5. testgroup 컨슈머그룹의 현재 상태 확인
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 1 1 0 - - -
testgroup test 1 2 2 0 - - -
testgroup test 2 3 3 0 - - -
- 해당 컨슈머그룹에서 기존에 읽었던 3개의 이벤트(hello ifuwanna kafka)는 제외하고 그 이후 오프셋의 이벤트부터 처리하여 (1,2,3) 된 것을 확인 하실 수 있습니다.
5. Consumer group 오프셋 재설정
컨슈머 그룹에서 구독중인 토픽(파티션)의 offset을 변경하고 싶은 경우 --reset-offsets 을 사용해여 offset을 재설정 할 수 있습니다.
# testgroup이 구독하는 test토픽의 모든 파티션의 오프셋을 가장 빠른 offset 으로 재설정
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup test 0 0
testgroup test 1 0
testgroup test 2 0
# testgroup이 구독하는 test 토픽 1번 파티션의 오프셋을 1로 재설정
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 1 --execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup test 1 1
6. KAFKA 환경 종료
카프카를 종료하려면 콘솔에서 Ctrl-C 로 아래 순서대로 종료해 주시면됩니다.
- 생산자 및 소비자 클라이언트 중지
- Kafka 브로커를 중지
- ZooKeeper 서버 중지
정리
지금까지 간단하게 콘솔을 통해 카프카를 실행하고 토픽을 생성한 뒤 Producer Client를 이용하여 이벤트(레코드)를 브로커의 토픽(파티션)에 저장하고 Consumer Client로 이벤트를 읽어와 봤습니다.
'서버작업 > Kafka' 카테고리의 다른 글
(Ubuntu)Kafka 설치 방법 (0) | 2023.04.03 |
---|