Spring Boot로 Kafka Cloud에 연결하여 Json 데이터 주고 받기 (Callback 함수 및 Shutdown Hook)

728x90

서론


지난 글에서 Kafka cloud에 연결하여 String으로 메시지를 주고 받았었다. 이번에는 Json으로 데이터를 주고 받는 코드를 구현할 것이다.

만약 Kakfa Cloud에 연결하기 위해 Configuration 설정하는 법이 알고 싶다면 지난 글을 참고하시길 바란다.

 

https://bascat-code.tistory.com/21

 

Spring Boot로 Kafka cloud(Confluent, Conduktor 등)에 연결하여 데이터 주고 받기 (Configuration 방법 2가지)

서론 spring Boot를 사용하여 kafka에 연결하여 데이터를 주고 받으려고 Reference 자료들을 검색해보는데, 대부분의 자료들이 localhost:9092에 접속하여 로컬 컴퓨터에 있는 Kafka를 사용하는 예시를 보여

bascat-code.tistory.com

 


또한 추가적으로 producer에서 메시지를 보낸 후 받은 callback을 확인해 볼 것이다. Spring에서 Kafka를 사용하는 예문을 검색해보면 대부분의 자료에서 callback을 Listenablefuture에 담는 것을 볼 수 있는데, 해당 클래스는 Spring Boot 3부터는 deprecated된 기능이라 Spring Boot 3부터는 지원이 되지 않는다. Spring Boot 3에서는 java 8 부터 추가된 Completable Future를 사용하고 있으므로 이번 글에서도 해당 클래스를 사용해 Callback check를 진행해보도록 한다.

Dependency


추가적으로 JSON 라이브러리나 Jackson 라이브러리가 필요하지 않다. Kafka 라이브러리에서 알아서 Json 데이터를 직렬화 및 역직렬화를 해주기 때문이다. 

 

implementation 'org.apache.kafka:kafka-clients'
implementation 'org.springframework.kafka:spring-kafka'

 

Json 데이터 주고 받기 위한 Configuration 수정


지난 번 글과 마찬가지로 2가지 Configuration 방법에 대해 모두 살펴볼 것이다.

방법1. application.properties

 

# Confluent cloud kafka setting
spring.kafka.bootstrap-servers=${CONFLUENT_BOOTSTRAP_SERVER}
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${CONFLUENT_API_KEY}' password='${CONFLUENT_API_SECRET}';
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*


Confluent에 연결하는 경우의 application.properties 설정이다. 
key는 사용하지 않을 것이기 때문에 value의 serializer와 deserializer만 수정하면 된다.

지난 글과 비교하여 변경된 부분

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer


여기서 주의할 점은 apache.kafka.common에 있는 serializer를 가져오는 것이 아니라, springframework.kafka.support에 있는 serializer를 가져와야 한다는 것이다.

 

spring.kafka.consumer.properties.spring.json.trusted.packages=*

 


또한 위처럼 json trusted packages를 명시할 필요가 있다. 해당 설정을 추가해주지 않으면 Desialization 과정에서 에러가 발생하니 꼭 추가해주도록 한다.

 

방법2. @Configuration


application.properties가 아니라 @Configuration을 통해 kafka 연결 설정을 했을 경우에는 아래처럼 수정하면 된다. 개인적으로 코드레벨에서 config를 관리하는 이 방법을 좀 더 추천한다. 

 

package test.kafka.configuration;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class CloudKafkaConfig {

    @Value("${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    @Value("${kafka.cloud.username}")
    private String cloudUsername;
    @Value("${kafka.cloud.password}")
    private String cloudPassword;

    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required " +
                "username='" + cloudUsername +
                "' password='" + cloudPassword + "';");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required " +
                "username='" + cloudUsername +
                "' password='" + cloudPassword + "';");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

지난 글과 비교하여 변경된 부분


ProducerFactory에서 serializer를 JsonSerializer.class로 변경한다.

 

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


ConsumerFactory에서 deserializer를 JsonDeserializer.class로 변경하고, JsonDeserializer.TRUSTED_PACKAGES에 "*" 전체 경로를 입력한다.

 

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");



Producer 및 Consumer 구현

data model

producer 및 consumer를 구현하기 앞서 Json으로 보낼 데이터 모델을 생성한다. 해당 모델을 통해 @RequestBody를 통해 입력 받을 것이기 때문이다. 여기선 예시로 User라는 클래스를 생성해 이름과 나이를 입력 받도록 한다.

package test.kafka.dto;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class User {
    private String name;
    private int age;
}

 

Producer


기존의 configuartion에서 KafkaTemplate의 제네릭 타입을 Object로 지정했기 때문에 따로 제네릭을 수정할 필요는 없다. spring의 message builder를 사용하여 보낼 메시지를 생성한다. 메시지 타입은 기존에 생성한 User 데이터 모델로 지정한다.

 

@Component
@Slf4j
public class KafkaProducerForJson {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "demo_spring";

    @Autowired
    public KafkaProducerForJson(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User user){
        Message<User> message = MessageBuilder.withPayload(user).setHeader(KafkaHeaders.TOPIC, topic).build();
        kafkaTemplate.send(message);
        log.info("produced message: name={}, age={}", message.getPayload().getName(), message.getPayload().getAge());
    }
}



topic 이름은 반드시 기존에 생성해둔 topic 이름과 동일하게 지정한다.


Consumer 


consumer도 크게 바뀌는 부분은 없다. 아래처럼 @Payload를 User로 지정하면, 전송된 객체에서 원하는 property를 추출할 수 있다.

 

@Component
@Slf4j
public class KafkaConsumerForJson {

    @KafkaListener(topics="demo_spring", groupId = "spring-test")
    public void listener(@Headers MessageHeaders messageHeaders, @Payload User user){
        log.info("Received message: header={}, payload name={}, age={}", messageHeaders, user.getName(), user.getAge());
    }

}



Json 데이터 주고 받기 테스트

Controller 생성


@RequestBody를 통해 user 데이터를 받아온다.

@RestController
public class KafkaController {

    public KafkaController(KafkaProducerForJson kafkaProducerForJson) {
        this.kafkaProducerForJson = kafkaProducerForJson;
    }

    @PostMapping("/publish/json")
    public void sendJson(@RequestBody User user){
        kafkaProducerForJson.sendMessage(user);
    }
}



Postman으로 테스트

 



이처럼 정상적으로 데이터가 전송되고 수신했음을 log를 통해 알 수 있다.


callback 및 shutdown hook 추가하기


이렇게만 끝내면 뭔가 심심하다. Producer로 데이터를 보낼 때 callback을 통해 정상적으로 Topic이 전달되었는지 확인하는 코드를 추가해보자. Consumer의 경우에는 server가 Shutdown이 일어날 때 마지막으로 소비된 메시지의 파티션과 offset을 알려주는 shutdown hook을 추가해 볼 것이다.

producer에 callback 추가

 

@Component
@Slf4j
public class KafkaProducerWithCallback {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "demo_spring";

    @Autowired
    public KafkaProducerWithCallback(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User user){
        Message<User> message = MessageBuilder.withPayload(user).setHeader(KafkaHeaders.TOPIC, topic).build();

        CompletableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(message);
        future.whenComplete((result, e) -> {
           if (e == null){
               int partition = result.getRecordMetadata().partition();
               long offset = result.getRecordMetadata().offset();
               User sentUser = (User) result.getProducerRecord().value();
               log.info("produced message topic={}, partition={}, offset={}, payload: name={}, age={}",
                       topic, partition, offset, sentUser.getName(), sentUser.getAge());
           } else{
               log.error("Error occurred while producing message: {}", e.getMessage());
           }
        });
    }
}




KafkaTemplate의 send 메서드는 future 객체를 반환한다. 이것을 CompletableFuture 객체에 담고 whenComplete 메서드를 사용하면 콜백 메세지를 확인할 수 있다. 만약 send가 정상적으로 실행되지 않았으면 e가 표시될 것이고, send가 정상적으로 진행되었다면, e는 null의 값을 가지며, result를 통해 record에 대한 정보를 추출할 수 있다.


consumer 에 shutdown hook 추가


이번에는 server shutdown이 일어날 때 consumer가 받은 마지막 offset을 표시하는 로직을 추가해 볼 것이다. 우선 Consumer 로직을 다음과 같이 수정한다.

 

@Component
@Slf4j
public class KafkaConsumerWithShutdown {

    private volatile long lastConsumedOffset = -1;
    private volatile long lastPartition = -1;

    @KafkaListener(id = "listener_shutdown", topics = "demo_spring", groupId = "spring-test")
    public void listener(@Headers MessageHeaders messageHeaders, @Payload User user, ConsumerRecord<?, ?> record) {
        log.info("Received message: header={}, payload name={}, age={}", messageHeaders, user.getName(), user.getAge());
        long offset = record.offset();
        lastConsumedOffset = offset;
        long partition = record.partition();
        lastPartition = partition;
    }

    public long getLastConsumedOffset() {
        return lastConsumedOffset;
    }

    public long getLastPartition(){
        return lastPartition;
    }
}


변수로 lastconsumedOffset을 만들어준다. partition 정보도 알고 싶다면 partion 변수도 만들어준다.

KafkaListener는 매개변수로 ConsumerRecord를 가져올 수 있는데, 이 record를 통해 offset 정보 및 partition 정보를 가져올 수 있다. (Producer의 callback에 record 정보가 들어있는 것과 유사하다.) 어쨌든 이 record 정보로 부터 offset과 partition을 실시간으로 계속 업데이트하여 변수에 들고 있는다.


DisposableBean

Spring에서는 DisposableBean을 통해 shutdown hook을 손쉽게 구현할 수 있다. destroy() 메서드를 overriding하여 서버 종료시 동작할 로직을 안에 넣으면 된다. 

 

@Component
@Slf4j
public class ShutdownHook implements DisposableBean {

    private final KafkaConsumerWithShutdown consumer;

    @Autowired
    public ShutdownHook(KafkaConsumerWithShutdown consumer) {
        this.consumer = consumer;
    }

    @Override
    public void destroy() {
        System.out.println("----------------- The consumer server is shutting down --------------------");
        long lastConsumedOffset = consumer.getLastConsumedOffset();
        long lastPartition = consumer.getLastPartition();
        log.info("Last partition before shutdown: " + lastPartition);
        log.info("Last consumed offset before shutdown: " + lastConsumedOffset);
    }
}



앞서 consumer를 @Component를 통해 스프링에 등록해두었으므로 spring이 @Autowired를 통해 주입해준다. 

server shutdown이 일어나면 서버가 종료되기 직전 destroy() 메서드가 동작하게 되고 consumer가 들고 있는 offset 정보와 partition 정보를 log에 보여주게 된다.


Callback 및 Shutdown 테스트 하기

controller 수정

@RestController
public class KafkaController {
private final KafkaProducerWithCallback kafkaProducerWithCallback;
    
    public KafkaController(KafkaProducerWithCallback kafkaProducerWithCallback) {
        this.kafkaProducerWithCallback = kafkaProducerWithCallback;
    }
    
    @PostMapping("/publish/callback")
    public void sendMessageWithCallback(@RequestBody User user){
        kafkaProducerWithCallback.sendMessage(user);
    }
}



postman으로 테스트


메시지를 전송하면 kafka broker에 보낸 메시지 정보와 topic, partition, offset 정보도 함께 콜백으로 받는 것을 확인할 수 있다.

 


서버 종료 시 consumer에서 마지막으로 받은 메시지의 partition 번호와 offset 번호를 log를 통해 출력해준다.

글을 마치며


Spring Boot 3를 사용하여 kafka cloud에 연결하여 메시지를 주고 받는 test를 진행해보았다. 편의상 하나의 서버만 띄워 동작을 확인하기 위해 서버 위에 producer와 consumer를 모두 구현했지만, 실사용에서는 producer와  consumer가 별도의 서버에서 구현되어야 할 것이다.

해당 글에서는 kafka의 properties를 대부분 건들지 않고 default 설정을 그대로 사용했다는 것을 유념하길 바란다. 실제 사용에서는 본인의 어플리케이션에 적합하게 동작하도록 kafka의 properties를 수정해야 할 것이다.