kok202
카프카 설치, 간단한 실행

2019. 3. 25. 15:24[정리] 데이터베이스/[Message] Kafka

mac에서 카프카 설치하기

1. brew install kafka

2. brew install zookeeper


mac에서 카프카 실행

1. zkServer start

2. brew services start kafka


설정 변경 : 

1. cd /usr/local/etc/kafka

2. open server.properties

# 변경 : advertised.listeners=PLAINTEXT://localhost:9092


실행

토픽 생성 : kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

토픽 Produce : kafka-console-producer --broker-list localhost:9092 --topic test

토픽 Produce : kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning


# 2181 은 zookeeper의 포트다.

# 9092 는 Kafa의 포트다.

#

# replicas : partiton 의 복제본. 

# 원본을 leader partition 이라고 부르고 replica를 follower partition 이라고 한다. 

# ES의 primary shard와 replica shard와 같은 관계

# 하지만 카프카는 데이터베이스가 아님을 잊지말 것








강의 출처 : https://www.youtube.com/watch?v=udnX21__SuU&index=4&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON

ID 

Value 

설명 

 1

 Producer

 카프카에게 데이터를 보내는 어플리케이션

 2

 Consumer

 카프카에서 데이터를 받는 어플리케이션

 3

 Broker

 카프카 서버

 4

 Cluster

 카프카 클러스터

 5

 Topic

 카프카 스트림을 식별하기 위한 이름

 6

 Partitions

 Topic을 분산 저장하는데 분산 저장되는 단위

 7

 Offset

 Partition 안에서 데이터의 위치를 찾기 위한 offset

 8

 Consumer group

 같은 일을 처리하는 Consumer 들의 논리적 집합

 이 덕분에 Consumer의 Scalable이 가능하다.


즉 데이터를 찾기위해선 3개의 정보가 필요하다.

1. Topic 이름

2. Partition

3. Offset








카프카 자바 Producer api 예제

강의 : https://www.youtube.com/watch?v=twvdT6A1eeE&index=10&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON

소스 : https://github.com/LearningJournal/ApacheKafkaTutorials/blob/master/ProducerExamples/SimpleProducer.java


String topicName = "test"; 

String key = "Key1"; 

String value = "Value-1"; 


// 카프카와 연결하는 서버 설정

Properties props = new Properties(); 

props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 


//카프카에 데이터 전송

Producer<String, String> producer = new KafkaProducer <>(props); 

ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key ,value); 

producer.send(record); 

producer.close(); 


* 동기 비동기 처리가 가능하도록 메소드가 존재한다.


메세지 생성시 작업흐름도

1. Producer Record의 데이터를 serializer로 맵핑함

2. Partitioner로 이동해서 Partition 번호를 받음 (라운드 로빈 방식으로 받음)

3. Partition Buffer로 이동 

4. Kafka Broker로 이동








카프카 자바 Consumer api 예제

소스 참조 : https://github.com/LearningJournal/ApacheKafkaTutorials/blob/master/ConsumerExample/ManualConsumer.java


String topicName = "test"; 

String groupName = "new_test_topic_group"; 


Properties props = new Properties(); 

props.put("bootstrap.servers", "localhost:9092,localhost:9093");

props.put("group.id", groupName); 

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer"); 

props.put("enable.auto.commit", "false"); 


try (Kafka Consumer<String, String> consumer) { 

consumer = new KafkaConsumer<>(props); 

consumer.subscribe(Arrays.asList(topicName)); 

while (true){ 

ConsumerRecords<String, String  records = consumer.poll(100); 

for (ConsumerRecord<String, String> record : records)

System.out.println(String.valueOf(record.key()) + record.value()); 

consumer.commitAsync(); 

} catch(Exception e){ 

e.printStackTrace(); 

} finally{ 

consumer.commitSync(); 

consumer.close();