Spring Boot로 Kafka cloud(Confluent, Conduktor 등)에 연결하여 데이터 주고 받기 (Configuration 방법 2가지)

728x90

서론


spring Boot를 사용하여 kafka에 연결하여 데이터를 주고 받으려고 Reference 자료들을 검색해보는데, 대부분의 자료들이 localhost:9092에 접속하여 로컬 컴퓨터에 있는 Kafka를 사용하는 예시를 보여주고 있었다. 

현재 수많은 Kafka Cloud들이 존재하고 편리한 UI를 제공하여 데이터를 확인하기 좋게 편의성을 제공하고 있는데, Kafka Cloud에 연결하는 Spring 예문을 찾아보기 어려운 점은 상당히 아쉽게 느껴졌다. localhost에 접속할 때와는 Configuration 설정에서 조금 다른 부분이 있기 때문이다.

실제로 kafka를 직접 로컬 서버에 띄워서 사용하기보다는 cloud를 통해 데이터를 관리하는 경우가 많으므로, cloud에 연결하여 데이터를 주고받는 Spring 예문이 필요하다고 생각되어 해당 글을 작성해본다. 본 글을 참고하면 Configuration이 잘못되어 낭비되는 시간을 최대한 줄일 수 있을 것이다. (본 글쓴이가 했던 삽질을 누군가는 하지 않길 바라는 마음에)


Cloud에서 API key 생성 및 topic 생성


우선 kafka cloud 계정이 있어야 한다. 계정이 없다면 가입을 해서 계정을 생성하자. 가입 자체는 어렵지 않을 것이다. 어떤 cloud를 사용해도 상관이 없다. 다만 본 글에서는 예시로 confluent와 conduktor를 살펴본다.

가입을 하면서 cluster를 생성할 때 region과 서버 공급자를 선택하는 경우가 있을 수 있는데, region은 최대한 가까운 곳으로 설정하기를 권장한다. 공급자는 AWS, Azure 등 무엇을 사용해도 상관없다.

1. Confluent를 사용하는 경우

API key 생성


계정을 생성할 때 동시에 cluster를 생성할 것이다. cluster에 접속하여 해당 화면까지 들어온다. 만약 가입후 재로그인했으면 해당 경로로 들어온다.
Home -> Environment -> default -> "클러스터 이름" -> API key
에서 Add key를 누름.



global access api key를 선택하고 next를 누르면 곧바로 생성이 완료된다.

 


해당 정보를 로컬 컴퓨터에 다운로드 해놓자.


그런 다음 그 아래 cluster settings를 보면 bootstrap server가 보인다. 해당 주소를 사용하여 cloud에 접속할 것이므로 해당 내용을 기록해둔다. 

topic 생성


아래 topics 탭을 누르고 add topic를 눌러서 topic을 생성해준다. partitions는 3으로 설정하고 create with default를 눌러 topic을 생성해준다.


이렇게 하면 클라우드 설정은 완료된다.

2. Conduktor를 사용하는 경우

API key 가져오기


conduktor의 경우 편의상 myplayground를 바로 사용한다. 좌측에서 admin 메뉴를 선택하고 myplayground에 들어가면 바로 Bootstrap 접속 Url과 username password가 자동적으로 생성되어있는 것을 확인할 수 있다. 


conduktor의 경우 username과 password를 새로 생성하는 방식은 아니고, clusters에 들어가서 내용을 수정할 수 있다. 그래도 해당 정보는 외부에 공개되지 않도록 주의하자.

topic 생성


console로 이동하여 topics에 들어가 topic을 생성하자. partitions 개수도 초기 설정이 3으로 되어있을 것이므로 건들지 말고 그대로 생성하면 된다.

 

3. 그 외


다른 Cloud 서비스를 사용해도 상관없다. 대부분의 Cloud 서비스는 비슷하게도 '접속 경로', 'username', 'password' 이 세가지를 제공할 것이기 때문에 해당 정보만 가져올 수 있으면 된다.

위 예시와 유사하게 사용할 topic만 생성해놓기만 하면 된다.


프로젝트 dependency 


어플리케이션 코드 구현으로 들어가기 앞서 dependency를 설정해준다.
gradle을 기준으로 아래 dependency를 추가하면 된다.

 

implementation 'org.apache.kafka:kafka-clients'
implementation 'org.springframework.kafka:spring-kafka'



Configuration 설정하기

kafka cloud에 접속하기 위한 configuration 설정 방법으로 두 가지를 제시할 것이다. 하나는 application.properties에 작성하는 법이고, 다른 하나는 @Configuration을 사용하여 클래스 내에서 작성하는 법이다. 

기본적으로 confluent나 conduktor는 application.properties를 통해 (혹은 application.yaml으로) 설정하는 방법을 알려준다. application.properties를 통해 설정하면 쉽고 간단하지만 가독성이 떨어지는 단점이 있다. @Configuration을 사용하여 설정을 스프링 컴포넌트로 등록하는 방식은 조금 복잡해 보일 수 있으나, 가독성이 좋아 추후 관리하기 용이하다.

둘 중에 본인이 더 선호하는 방식을 선택해 설정하면 된다.

공통사항

server url과 username(api key), password(secret)가 노출되지 않기 위해서 .env 파일을 생성하고 아래와 같이 입력해준다.

 

CONDUKTOR_BOOTSTRAP_SERVER=
CONDUKTOR_USERNAME=
CONDUKTOR_PASSWORD=

 


각 등호의 우변에 클라우드로부터 받은 정보 세 가지 (Bootstrap url, username, password)를 각각 대응하여 입력한다. 글쓴이는 Conduktor 클라우드를 사용했기 때문에 이처럼 적었지만, 해당 변수 명은 마음대로 작성해도 무방하다.

방법1. application.properties


가장 간단한 방법이다. application.properties을 통해 다음과 같이 작성하면 spring이 알아서 클라우드 연결 설정을 완료한다.

 

spring.config.import=optional:file:.env[.properties]
# Conduktor cloud kafka setting
spring.kafka.bootstrap-servers=${CONDUKTOR_BOOTSTRAP_SERVER}
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${CONDUKTOR_USERNAME}' password='${CONDUKTOR_PASSWORD}';
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer



위는 conduktor를 사용했을 때 예시다. confluent를 사용할 때에도 ${}로 표시된 변수만 바꾸어서 입력하면 된다. 

이렇게 하면 설정은 완료된다. 이 방법을 선택했다면 바로 Producer와 Consumer 구현 코드로 넘어가면 된다.

해설


각 항목을 잠깐 해설하자면, bootstrap-servers는 서버 url을 가리키고, security.protocol 은 Authentication 방법을 가리키고 sasl은 그 auth 방식에 필요한 정보를 담고 있다. auth 방식은 클라우드에 따라 다를 수 있으므로 클라우드에서 지정한 방식으로 설정하도록 한다.

아래에는 key와 value의 직렬화 및 역직렬화 방식을 String으로 지정했는데 이건 본 글쓴이가 테스트 데이터로 단순 text를 보낼 것이기 때문이다. json으로도 데이터를 전송할 수도 있는데, 그럴 경우 이 항목들을 수정해야한다. json으로 데이터를 전송하는 것은 다음 번에 진행해볼 예정이다. 본 글에서는 String으로 데이터를 전송할 것이다.

방법2. @Configuration


application.properties를 사용하지 않고 Spring의 @Configuration을 통해 설정할 수도 있다. 우선 Configuration으로 사용할 클래스 하나를 생성하고 다음처럼 코드를 작성한다.

 

package test.kafka.configuration;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CloudKafkaConfig {

    @Value("${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    @Value("${kafka.cloud.username}")
    private String cloudUsername;
    @Value("${kafka.cloud.password}")
    private String cloudPassword;

    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required " +
                "username='" + cloudUsername +
                "' password='" + cloudPassword + "';");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required " +
                "username='" + cloudUsername +
                "' password='" + cloudPassword + "';");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}


그리고 Spring의 @Value를 통해 .env에 기록된 값을 가져오기 위해 application.properties에는 아래와 같이 작성해준다.

 

kafka.bootstrapAddress=${CONDUKTOR_BOOTSTRAP_SERVER}
kafka.cloud.username=${CONDUKTOR_USERNAME}
kafka.cloud.password=${CONDUKTOR_PASSWORD}



예시로는 conduktor를 사용했지만 다른 cloud를 사용할 때에도 이와 유사한 방식으로 작성하면 된다.

해설

ProducerFactory는 kafka producer client를 생성하기 위한 클래스로 설정 내용을 Map에 담아넣고 해당 클래스의 생성자를 사용해서 빈으로 등록하면 Spring 컨테이너가 빈을 인식하여 producer를 사용할 수 있게 된다. 

ConsumerFactory도 마찬가지로 kafka consumer client를 생성하기 위한 클래스로 map에 설정 정보를 매핑하고 생성자를 통해 생성하여 빈으로 등록하면 된다.

KafkaTemplate은 kafka topic에 메시지를 전송하기 위해 Spring에서 제공하는 클래스로 producerFactory를 통해 producer를 등록했어도 KafkaTemplate이 있어야 KafkaTemplate의 메서드를 통해 토픽에 메시지를 전달할 수 있다.

각 props항목에 대한 해설은 위 appication.properties 방법의 해설과 동일하다.


Producer 및 Consumer 구현

Producer


KafkaTemplate을 사용하면 producer를 구현하는 것은 매우 간단하다.

 

package test.kafka.messaging;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaProducerForString {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "demo_spring";

    @Autowired
    public KafkaProducerForString(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message){
        kafkaTemplate.send(topic, message);
        log.info("Produced message={}", message);
    }
}


기존에 생성해둔 KafkaTemplate을 @Autowired 의존성 주입을 통해 가져오고 위와 같이 send() 메서드를 통해 데이터를 보내기만 하면 끝이다.

여기서 주의해야할 점은 send()에 들어가는 topic이름이 기존에 생성해둔 topic이름과 같아야 한다는 것이다.

Consumer


@KafkaListener 애노테이션을 사용하면 consumer 구현도 매우 간단하게 완료된다.

 

package test.kafka.messaging;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaConsumerForString {

    @KafkaListener(topics="demo_spring", groupId = "spring-test")
    public void listener(@Headers MessageHeaders messageHeaders, @Payload String message){
        log.info("Received message: header={}, payload={}", messageHeaders, message);
    }

}



메시지를 받고자 하는 메서드에 @KafkaListener 애노테이션을 붙여주기만 하면 된다. 역시 topic의 이름을 기존에 생성한 토픽과 동일하게 해줘야 하며, consumer의 경우는 추가적으로 group id를 지정해야하는 점을 잊으면 안 된다. @Payload 애노테이션을 사용하여 손쉽게 message 데이터를 가져올 수 있다.


Controller 생성 및 테스트


데이터를 주고받는 테스트를 하기 위해 아래와 같이 RestController를 생성해준다. @RequestBody를 통해 String 입력값을 가져올 것이다.

 

@RestController
public class KafkaController {

    private final KafkaProducerForString kafkaProducerForString;

    public KafkaController(KafkaProducerForString kafkaProducerForString) {
        this.kafkaProducerForString = kafkaProducerForString;
    }

    @PostMapping("/publish/string")
    public void sendString(@RequestBody String data){
        kafkaProducerForString.sendMessage(data);
    }
}




POSTMAN으로 테스트


Postman을 사용하여 메시지를 전송하고 로그를 확인한다.

 



정상적으로 데이터가 전송되었음을 확인할 수있다.


클라우드 플랫폼에서도 UI를 통해 해당 메시지가 제대로 전송되었음을 확인할 수 있다.

다음 글에서 계속


솔직히 구현코드가 조금 심심하게 느껴졌을 것이다. 군더더기는 모두 빼고 메시지 전송이라는 기본 기능에 집중하기 위해 부가적인 역할을 하는 코드는 모두 제거했기 때문이다. 

현재는 모든 동작 방식이 default값으로 동작하고 있지만, 원하는 kafka 동작 방식이 따로 있다면 property 추가로 인해 configuration이 복잡해질 수 있고, 원하는 기능에 따라 producer나 comsumer에 추가되는 코드가 있을 수 있다.

다음 글에서는 String이 아니라 json으로 데이터를 전송해보고, producer에서 해당 데이터를 전송하고 받은 callback을 검사하는 로직을 추가해볼 것이다.