1. 개요

Apache Kafka 는 분산 및 내결함성 스트림 처리 시스템입니다.

이 튜토리얼에서는 Kafka에 대한 Spring 지원과 네이티브 Kafka Java 클라이언트 API를 통해 제공하는 추상화 수준을 다룹니다.

Spring 카프카는 함께 간단하고 전형적인 Spring 템플릿 프로그래밍 모델을 제공 KafkaTemplate 를 통해 및 메시지 기반 POJO를 @KafkaListener의 어노테이션을.

2. 설치 및 설정

Kafka를 다운로드하고 설치하려면 여기 에서 공식 가이드를 참조 하십시오 .

또한 pom.xml에 spring-kafka 의존성을 추가해야합니다 .

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.8.RELEASE</version>
</dependency>

이 유물의 최신 버전은 여기 에서 찾을 수 있습니다 .

예제 애플리케이션은 Spring Boot 애플리케이션입니다.

이 문서에서는 서버가 기본 구성을 사용하여 시작되고 서버 포트가 변경되지 않는다고 가정합니다.

3. 토픽 구성

이전에는 명령 줄 도구를 실행하여 Kafka에서 토픽을 만들었습니다.

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

그러나 Kafka AdminClient 가 도입됨에 따라 이제 프로그래밍 방식으로 토픽를 만들 수 있습니다.

우리는 추가 할 필요가 KafkaAdmin 자동 유형의 모든 Bean에 대한 토픽를 추가합니다 스프링 빈, NewTopic를 :

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("baeldung", 1, (short) 1);
    }
}

4. 메시지 생성

메시지를 생성하려면 먼저 ProducerFactory 를 구성해야합니다 . 이것은 Kafka Producer 인스턴스 를 만들기위한 전략을 설정합니다 .

그런 다음 Producer 인스턴스 를 래핑하고 Kafka 토픽에 메시지를 보내는 편리한 메서드를 제공 하는 KafkaTemplate이 필요 합니다.

생산자 인스턴스는 스레드로부터 안전합니다. 따라서 애플리케이션 컨텍스트 전체에서 단일 인스턴스를 사용하면 성능이 향상됩니다. 따라서 KakfaTemplate 인스턴스도 스레드로부터 안전하며 하나의 인스턴스를 사용하는 것이 좋습니다.

4.1. 생산자 구성

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. 메시지 게시

KafkaTemplate 클래스를 사용하여 메시지를 보낼 수 있습니다 .

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

보내기 API는 반환 ListenableFuture의 개체를. 송신 스레드를 차단하고 송신 된 메시지에 대한 결과를 얻으려면 ListenableFuture 객체 get API를 호출 할 수 있습니다 . 스레드는 결과를 기다리지 만 생산자의 속도를 늦 춥니 다.

Kafka는 빠른 스트림 처리 플랫폼입니다. 따라서 후속 메시지가 이전 메시지의 결과를 기다리지 않도록 결과를 비동기 적으로 처리하는 것이 좋습니다.

콜백을 통해이를 수행 할 수 있습니다.

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. 메시지 소비

5.1. 소비자 구성

메시지를 사용하려면 ConsumerFactoryKafkaListenerContainerFactory 를 구성해야합니다 . Spring Bean Factory에서 이러한 Bean을 사용할 수있게되면 @KafkaListener 어노테이션을 사용하여 POJO 기반 소비자를 구성 할 수 있습니다 .

스프링 관리 Bean에서 @KafkaListener 어노테이션을감지하려면 구성 클래스에 @EnableKafka 어노테이션이 필요합니다.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. 메시지 소비

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

토픽 에 대해 각각 다른 그룹 ID를 가진 여러 리스너를 구현할 수 있습니다 . 또한 한 소비자는 다양한 토픽의 메시지를 수신 할 수 있습니다.

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring은 또한 리스너에서 @Header 어노테이션을 사용하여 하나 이상의 메시지 헤더 검색을 지원합니다 .

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. 특정 파티션에서 메시지 사용

파티션이 하나만있는 baeldung 항목을 만들었습니다 .

그러나 파티션이 여러 개인 Topic의 경우 @KafkaListener 는 초기 오프셋을 사용하여 Topic의 특정 파티션을 명시 적으로 구독 할 수 있습니다.

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

때문에 initialOffset는 청취자 0으로 설정되어이 수신기가 초기화 될 때마다 소모 재, 파티션 0 및 3에서 모든 이전 메시지 소모 될 것이다.

오프셋을 설정할 필요가없는 경우 @TopicPartition 어노테이션 partitions 속성을 사용 하여 오프셋이없는 파티션 만 설정할 수 있습니다.

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. 리스너를위한 메시지 필터 추가

사용자 지정 필터를 추가하여 특정 유형의 메시지를 사용하도록 리스너를 구성 할 수 있습니다. RecordFilterStrategyKafkaListenerContainerFactory 로 설정 하면 됩니다 .

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

그런 다음이 컨테이너 팩토리를 사용하도록 리스너를 구성 할 수 있습니다.

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

이 리스너 에서 필터와 일치하는 모든 메시지가 삭제됩니다.

6. 사용자 지정 메시지 변환기

지금까지 우리는 문자열을 메시지로 보내고받는 것에 대해서만 다루었습니다. 그러나 사용자 지정 Java 개체를 보내고받을 수도 있습니다. 이를 위해서는 ProducerFactory 에서 적절한 serializer를 구성 하고 ConsumerFactory 에서 deserializer를 구성해야합니다 .

간단한 빈 클래스에서 살펴 보자 , 우리는 메시지로 전송합니다 :

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

6.1. 사용자 지정 메시지 생성

이 예에서는 JsonSerializer 를 사용 합니다 .

ProducerFactoryKafkaTemplate 의 코드를 살펴 보겠습니다 .

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

이 새로운 KafkaTemplate사용 하여 인사말 메시지 를 보낼 수 있습니다 .

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. 사용자 지정 메시지 사용

마찬가지로, ConsumerFactoryKafkaListenerContainerFactory수정 하여 Greeting 메시지를 올바르게 역 직렬화 해 보겠습니다 .

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSON serializer 및 deserializer는 Spring-kafka 프로젝트에 대한 선택적 Maven 의존성이기도 한 Jackson 라이브러리를 사용합니다 .

따라서 pom.xml에 추가해 보겠습니다 .

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

최신 버전의 Jackson을 사용하는 대신 spring-kafka pom.xml추가 된 버전을 사용하는 것이 좋습니다 .

마지막으로 인사말 메시지 를 사용하기 위해 리스너를 작성해야 합니다.

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. 결론

이 기사에서는 Apache Kafka에 대한 Spring 지원의 기본 사항을 다뤘습니다. 메시지를 보내고받는 데 사용되는 클래스를 간략하게 살펴 보았습니다.

이 기사의 전체 소스 코드는 GitHub 에서 찾을 수 있습니다 .

코드를 실행하기 전에 Kafka 서버가 실행 중이고 Topic가 수동으로 생성되었는지 확인하십시오.