2019. 3. 25. 15:24ㆍ[정리] 데이터베이스/[Message] Kafka
mac에서 카프카 설치하기
1. brew install kafka
2. brew install zookeeper
mac에서 카프카 실행
1. zkServer start
2. brew services start kafka
설정 변경 :
1. cd /usr/local/etc/kafka
2. open server.properties
# 변경 : advertised.listeners=PLAINTEXT://localhost:9092
실행
토픽 생성 : kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
토픽 Produce : kafka-console-producer --broker-list localhost:9092 --topic test
토픽 Produce : kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
# 2181 은 zookeeper의 포트다.
# 9092 는 Kafa의 포트다.
#
# replicas : partiton 의 복제본.
# 원본을 leader partition 이라고 부르고 replica를 follower partition 이라고 한다.
# ES의 primary shard와 replica shard와 같은 관계
# 하지만 카프카는 데이터베이스가 아님을 잊지말 것
강의 출처 : https://www.youtube.com/watch?v=udnX21__SuU&index=4&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON
ID |
Value |
설명 |
1 |
Producer |
카프카에게 데이터를 보내는 어플리케이션 |
2 |
Consumer |
카프카에서 데이터를 받는 어플리케이션 |
3 |
Broker |
카프카 서버 |
4 |
Cluster |
카프카 클러스터 |
5 |
Topic |
카프카 스트림을 식별하기 위한 이름 |
6 |
Partitions |
Topic을 분산 저장하는데 분산 저장되는 단위 |
7 |
Offset |
Partition 안에서 데이터의 위치를 찾기 위한 offset |
8 |
Consumer group |
같은 일을 처리하는 Consumer 들의 논리적 집합 이 덕분에 Consumer의 Scalable이 가능하다. |
즉 데이터를 찾기위해선 3개의 정보가 필요하다.
1. Topic 이름
2. Partition
3. Offset
카프카 자바 Producer api 예제
강의 : https://www.youtube.com/watch?v=twvdT6A1eeE&index=10&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON
String topicName = "test"; String key = "Key1"; String value = "Value-1"; // 카프카와 연결하는 서버 설정 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //카프카에 데이터 전송 Producer<String, String> producer = new KafkaProducer <>(props); ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key ,value); producer.send(record); producer.close(); |
* 동기 비동기 처리가 가능하도록 메소드가 존재한다.
메세지 생성시 작업흐름도
1. Producer Record의 데이터를 serializer로 맵핑함
2. Partitioner로 이동해서 Partition 번호를 받음 (라운드 로빈 방식으로 받음)
3. Partition Buffer로 이동
4. Kafka Broker로 이동
카프카 자바 Consumer api 예제
String topicName = "test"; String groupName = "new_test_topic_group"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.auto.commit", "false"); try (Kafka Consumer<String, String> consumer) { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName)); while (true){ ConsumerRecords<String, String records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(String.valueOf(record.key()) + record.value()); consumer.commitAsync(); } } catch(Exception e){ e.printStackTrace(); } finally{ consumer.commitSync(); consumer.close(); } |
'[정리] 데이터베이스 > [Message] Kafka' 카테고리의 다른 글
카프카 - 03 - 운영가이드 (0) | 2020.04.14 |
---|---|
카프카 - 02 -프로듀서 / 컨슈머 (0) | 2020.04.10 |
카프카 - 01 - 카프카의 디자인 (0) | 2020.04.10 |
카프카 - 00 - 개요 (0) | 2020.04.10 |
카프카 인트로 (0) | 2019.03.25 |