kok202
카프카 - 02 -프로듀서 / 컨슈머

2020. 4. 10. 23:35[정리] 데이터베이스/[Message] Kafka

프로듀서의 주요 기능

메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것

* auto.create.topics.enable = true 로 설정 되어있는 경우 카프카에 존재하지 않는 토픽에 메시지로 보내면 자동으로 토픽이 생성된다.

 

 

 

자바를 이용한 카프카 프로듀서

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaBookProducer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "my-kafka001:9092,my-kafka002:9092,my-kafka003:9092");
    props.put("acks", "1");
    props.put("compression.type", "gzip");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform"));
    producer.close();
  }
}

producer.send().get() 을 하면 동기처리로 바꿀 수 있다.

producer.send() 에 key 를 지정하여 특정 파티션에만 메시지를 보낼 수도 있다.

 

 

 

프로듀서의 주요 옵션

  • bootstrap.servers 
    클러스터 내 모든 서버가 클라이언트 요청을 받을 수 있다.
    그렇다고 하나의 서버만 등록하면 해당 서버가 장애가 날 경우 클러스터의 장점을 못살릴 확률이 높다.
    그러므로 모든 카프카 서버를 나열하는 것이 좋다.
  • acks
    프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack의 수
    • akcs = 0
      성능이 제일 빠르다.
      프로듀서가 서버로부터 acks 를 기다리지 않는다.
      이 경우 서버가 데이터를 받았는지 보장하지 않는다.
      클라이언트는 전송 실패에 대한 결과를 알지 못하기 때문에 retry 설정도 적용되지 않는다.
      메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우 사용된다.
    • akcs = 1
      카프카가 리더의 데이터 기록이 끝나면 바로 ack 를 보내준다.
      요청, 응답을 하므로 acks = 0 보다는 2배의 시간이 걸린다고 보면 된다.
      리더가 ack 응답을 하고 팔로워가 리더를 복제하기 전에 죽어버리면 데이터 유실이 발생할 수도 있다.
      하지만 그럴 확률이 극히 드물다.
      특별한 경우가 아니라면 이 설정을 권장한다.
    • akcs = all (or -1)
      요청을 받은 리더는 ISR 에 있는 팔로워로부터 데이터에대한 ack 를 기다리고 ack 를 내려준다.
      무손실을 가장 강력하게 보장한다.
      전송 속도는 느리지만 메시지 손실이 없어야하는 경우 이 설정을 한다.
  • buffer.memory
    프로듀서가 카프카 서버로 데이터를 보내기전 잠시 대기할 수 있는 버퍼의 크기를 지정한다.
  • compression.type
    프로듀서가 데이터를 압축해서 보낼 때 어떤 타입으로 압축할지 결정한다.
  • retries
    전송에 실패한 데이터를 다시 보내는 횟수
  • batch.size
    프로듀서는 같은 파티션으로 보내는 여러 데이터를 배체로 함께 보내려한다.
    이 때 배치 크기 바이트 단위를 조정할 수 있다.
  • linger.ms
    배치 형태의 메시지를 보내기전 추가적인 메시지들을 위해 기다리는 시간을 명시한다.
  • max.request.size
    프로듀서가 보낼 수 있는 최대 메시지 바이트 사이즈. 기본값은 1MB 이다.

 

 

 

브로커의 min.insync.replicas 설정

프로듀서가 akcs=all 일 경우 프로듀서 요청에 akc 응답을 내려주기 위해 팔로워들의 akc 응답을 최소 몇개 기다릴지 지정하는 옵션이다.

 

 

 

컨슈머의 주요 기능

특정 파티션을 관리하고 있는 파티션 리더에게 메시지 가져오기 요청을 하는 것이다.

요청들은 로그의 오프셋을 명시하고 그 오프셋부터 로그 메시지를 수신한다.

컨슈머는 가져올 메시지의 위치를 조정할 수 있다.

컨슈머는 이미 읽은 데이터도 가져올 수 있다. (래빗엠큐는 불가능하다.)

* 오프셋의 저장위치가 주키퍼에서 카프카로 변경됨에 따라 올드 컨슈머와 뉴 컨슈머로 나뉘게 되었다.

 

 

 

콘솔 컨슈머로 메시지 가져오기

/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list myKafka001:9092,myKafka002:9092,myKafka003:9092 --topic my-topic --from-beginning

from-beginning 옵션은 토픽의 처음부터 메시지를 가져오라는 옵션이다.

컨슈머를 실핼할 때는 항상 컨슈머 그룹이 필요하다.

이를 지정하지 않으면 (위의 예시) console-consumer-xxxx 와 같이 자동으로 컨슈머 그룹이 생성된다.

 

 

 

자바를 이용한 카프카 컨슈머

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaBookConsumer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092");
    props.put("group.id", "peter-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("peter-topic"));
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
          System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
    } finally {
      consumer.close();
    }
  }
}

consumer.poll(100)에 주목하라

컨슈머는 카프카에 폴링을하는 것을 계속 유지해야한다. 그렇지 않으면 종료된 것으로 간주되어 컨슈머에게 할당한 파티션을 다른 컨슈머에게 전달한다. poll(n) 은 타임 아웃 주기이고 데이터가 컨슈머 버퍼에 없다면 poll(n) 은 얼마나 오랫동안 블락할지 조정한다.

 

 

 

컨슈머의 주요 옵션

  • bootstrap.servers
    프로듀서와 동일하다.
  • fetch.min.bytes
    한번에 가져올 수 있는 최소 데이터 사이즈
  • fetch.max.bytes
    한번에 가져올 수 있는 최대 데이터 사이즈
  • fetch.max.wait.ms
    가져온 데이터가 fetch.min.bytes 에 설정된 데이터보다 데이터가 적은 경우 요청에 응답을 기다리는 최대 시간
  • group.id
    컨슈머가 속한 컨슈머 그룹의 식별자
  • enable.auto.commit
    백드라운드로 주기적으로 오프셋을 커밋하라.
  • auto.offset.reset
    카프카에서 초기 오프셋이 없거나 오프셋이 존재하지 않을 경우 리셋한다.
  • auto.commit.interval.ms
    주기적으로 오프셋을 커밋하는 시간
  • request.timeout.ms
    요청에 대해 응답을 기다리는 최대 시간
  • session.timeout.ms
    컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않을 경우, 장애로 판단하고 리밸런스하기까지 기다리는 시간
  • heartbeat.interval.ms
    컨슈머가 그룹 코디네이터에게 얼마나 자주 하트비트를 보낼지 조정하는 시간 (poll() 메소드) 일반적으로 session.timeout.ms 의 1/3
  • max.poll.records
    단일 호출 poll 에대한 최대 레코드 수
  • max.poll.interval.ms
    컨슈머가 계속 하트 비트는 보내는데 이전에 가져간 데이터를 처리하느라 데이터를 안가져갈 경우, 해당 컨슈머가 특정 파티션을 점유할 수 없도록 장애라고 판단한다. 제대로 데이터를 가져가는지 체크하는 시간

 

 

 

토픽 하나를 파티션을 3개로 분할한경우

컨슈머를 이용해 메시지를 가져올 때 메시지 순서가 프로듀서가 생성한 순서가 아닐 수 있다.

왜냐하면 프로듀서가 데이터를 생성할 때 메시지는 파티션에 라운드 로빈으로 들어가고 컨슈머는 파티션에서 데이터를 가져오기 때문이다.

파티션 내부에서는 순서가 보장되지만 파티션과 팣티션 사이에서는 순서가 보장되지 않는다.

프로듀스 순서와 컨슘 순서를 보장받고 싶다면 토픽의 파티션 수를 1로 지정하라.

다만 이 경우 분산 처리는 불가능하다.

 

 

 

컨슈머 그룹

카프카는 동일한 토픽에 여러 컨슈머 그룹이 붙을 수 있다. (my-topic.my-consumer.groupId)

컨슈머 그룹안에는 여러 개의 컨슈머가 생성 될 수 있다. (my-topic.my-consumer.concurrency)

컨슈머 그룹들은 파티션의 오프셋을 그룹마다 각자 관리한다.

같은 컨슈머 그룹안의 컨슈머들은 파티션(+오프셋) 에대한 소유권을 공유한다.

이 방식의 가장 큰 장점은 하나의 데이터를 다양한 용도로 사용할 수 있다는 점이다.

 

동일한 토픽에 여러개의 컨슈머 그룹이 붙을 수 있는 것은 기존의 메시징 큐와는 다른 완전히 새로운 기능이다.

기존의 메시징 큐는 메시지를 읽고나면 데이터가 큐에서 삭제되기 때문에 불가능했다.

효과적인 자원 활용을 위해선 토픽안의 파티션 수와 consumer.concurrency 를 잘 조정해야한다.

 

 

 

poll

컨슈머가 poll 할 때마다 컨슈머 그룹은 아직 읽지 않은 메시지를 가져온다.

이렇게 동작할 수 있는 이유는 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알 수 있기 때문이다.

컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 오프셋을 기록하고 있다.

각 파티션에 현재 위치를 다읽었고 이를 알리는 동작으로 커밋이라고 한다.

 

 

 

자동 커밋

오프셋을 사용자가 다루지않고 주기적으로 자동 커밋하는 방식이다.

이 경우 중간에 리밸런스가 발생하면 커밋하지 않은 내용을 다른 컨슈머에서 처리하게되서 메시지가 중복 처리될 수도 있다.

자동 커밋 환경에서는 중복을 완벽하게 제거하는 것은 불간으하다.

 

 

수동 커밋

메시지가 완전히 처리될 때까지 메시지가 가져간 것으로 간주되어서는 안되는 경우에 사용한다.

이 경우 컨슈머 단에서 메시지 처리 완료후 consumer.commitSync() 로 커밋을 직접해준다.

 

 

 

특정 파티션 / 오프셋 할당

컨슈머는 토픽을 subscribe 하고 카프카가 컨슈머 그룹의 커슈머들에게 직접 파티션을 공평하게 분배한다.

하지만 특정 파티션을 세밀하게 제어하기를 원할 경우 TopicPartition 을 이용해 특정 파티션만을 바라보도록 할 수 있다.

consumer.seek(partition0, 2) 와 같이 사용해서 특정 파티션의 특정 오프셋부터 읽을 수도 있다.