AWS SQS로 비동기 메시징 구현하기

728x90

 

서론


메시지 큐를 사용하는 용도는 다양하다. 외부 서비스에 특정 정보나 상태 변경을 알리기 위해 비동기적으로 알림을 보내야 할 때 또는 MSA에서 각각의 서비스 정보 교환이 이뤄질 때 각 서비스의 의존성을 낮추는 느슨한 결합을 구현하기 위해 종종 사용한다.

수많은 Kafka, RabbitMQ 등 많은 메시지 큐 서비스가 있지만, 여기선 AWS의 SQS를 사용하여 비동기 메시징을 구현했다. SQS 역시 메시지 큐의 일종으로 어플리케이션 코드를 통해 해당 큐에 메세지를 보내거나 메세지를 받아올 수 있다. 

아래 본문에서는 SQS를 Spring으로 사용하는 법을 설명한다.

AWS 콘솔에서 SQS 사용 준비하기

IAM 액세스키 발급

 




IAM을 통해 SQS를 사용할 사용자 계정을 생성하고 해당 계정의 보안 자격 증명에서 액세스키를 생성한다. 생성하면 .csv로 다운로드 할 수 있으므로 해당 내용을 꼭 저장하도록 한다.

SQS 생성

 



AWS SQS로 들어가 새 대기열을 생성해준다. 표준을 선택하고 이름을 정한 후, 서버 측 암호화는 우선 비활성화시켜 생성한다.

생성 후에는 해당 대기열의 URL을 볼 수 있는데, 이를 기록해둔다.

Spring 환경 설정

dependency 

build.gradle에 아래와 같이 aws에서 제공하는 aws sqs sdk를 삽입해준다.

 

implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.472'



AWS configuration


AWS SDK를 사용하려면 아래와 같이 @Configuration을 이용하여 AWS 사용 환경을 설정해줘야 한다.

 

@Configuration
public class AWSConfig {

    @Value("${aws.accessKey}")
    private String awsAccessKey;
    @Value("${aws.secretKey}")
    private String awsSecretKey;
    @Value("${aws.region}")
    private String awsRegion;

    public AWSCredentials awsCredentials() {
        return new BasicAWSCredentials(awsAccessKey, awsSecretKey);
    }

    public AWSCredentialsProvider awsCredentialsProvider() {
        return new AWSStaticCredentialsProvider(awsCredentials());
    }

    // SQS Configuration
    @Bean
    public AmazonSQS amazonSQSClient(){
        return AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider())
                .withRegion(awsRegion).build();
    }
}

 


AWS Configuration을 위해 메서드를 작성하는 방법은 각양각색이다. 다만 공통적으로 필요로 하는 매개변수가 있는데 그것은 다음과 같다.

- AWS 액세스키
- AWS 비밀키
- AWS 리전

해당 정보를 AmazonSQSClientBuilder의 메서드를 사용하여 주입하고 빌드하면 AmazonSQS가 리턴되며 이를 사용하여 메시지 큐를 사용할 수 있게 된다.

application.properties


AWS IAM에서 발급받은 액세스키와 비밀키 그리고 AWS region을 application.properties에 작성한다. application.properties에 작성된 값은 각 클래스에서 스프링의 @value 애노테이션을 통해 가져올 수 있다.

 

spring.config.import=optional:file:.env[.properties]

# AWS IAM
aws.accessKey=${AWS_ACCESS_KEY}
aws.secretKey=${AWS_SECRET_KEY}
aws.region=ap-northeast-2

# AWS SQS setting
aws.sqs.url.customer=${AWS_SQS_URL_CUSTOMER}
aws.sqs.url.restaurant=${AWS_SQS_URL_RESTAURANT}
aws.sqs.url.rider=${AWS_SQS_URL_RIDER}



보안 상의 이유로 액세스키와 비밀키 그리고 SQS의 URL은 .env 파일에 작성되었다.

@EnableScheduling

 

@EnableScheduling
@SpringBootApplication
public class CustomerApplication {

    public static void main(String[] args) {
    	SpringApplication.run(CustomerApplication.class, args);
    }

}


마지막으로 메시지를 받아오는 간격을 설정하기 위해 스프링의 스케줄링을 사용할 것이므로 @EnableScheduling 애노테이션을 Spring boot application 클래스에 붙여준다.

어플리케이션 코드


SQS를 사용하기 위한 클래스를 하나 생성하고 아래와 같이 작성한다. 우선 전체 코드는 다음과 같다. 

 

@Service
@Slf4j
public class SqsService {

    @Value("${aws.sqs.url.customer}")
    private String customerSqsUrl;
    @Value("${aws.sqs.url.restaurant}")
    private String restaurantSqsUrl;
    @Value("${aws.sqs.url.rider}")
    private String riderSqsUrl;

    private final AmazonSQS amazonSQSClient;

    public SqsService(AmazonSQS amazonSQSClient) {
        this.amazonSQSClient = amazonSQSClient;
    }

    public SendMessageResult sendToRestaurant(String data){
        SendMessageRequest sendMessageRequest = new SendMessageRequest(restaurantSqsUrl, data);
        return amazonSQSClient.sendMessage(sendMessageRequest);
    }

    public SendMessageResult sendToRider(String data){
        SendMessageRequest sendMessageRequest = new SendMessageRequest(riderSqsUrl, data);
        return amazonSQSClient.sendMessage(sendMessageRequest);
    }

    @Scheduled(fixedDelay = 1000)
    public void receive() throws JsonProcessingException {
        try{
            ReceiveMessageResult receiveMessageResult = amazonSQSClient.receiveMessage(customerSqsUrl);
            if(!receiveMessageResult.getMessages().isEmpty()){
                Message message = receiveMessageResult.getMessages().get(0);
                String messageBody = message.getBody();
                log.info("message body={}", messageBody);
                amazonSQSClient.deleteMessage(customerSqsUrl, message.getReceiptHandle());
            }
        } catch (QueueDoesNotExistException e){
            log.error("Queue Dose not exist {}", e.getMessage());
        }
    }
}



각 부분에 대해 아래에 상세히 설명한다.

설정 값 가져오기

@Value("${aws.sqs.url.customer}")
private String customerSqsUrl;
@Value("${aws.sqs.url.restaurant}")
private String restaurantSqsUrl;
@Value("${aws.sqs.url.rider}")
private String riderSqsUrl;

private final AmazonSQS amazonSQSClient;

public SqsService(AmazonSQS amazonSQSClient) {
    this.amazonSQSClient = amazonSQSClient;
}


AWS의 SQS에 메시지를 보내거나 SQS로부터 메시지를 받아오기 위해서는 해당 SQS의 url이 필요하다. 위 코드는 application.properties에서 url을 가져오고, 이전에 Configuration을 통해 생성한 amazonSQS 클라이언트를 생성자를 통해 가져오는 것이다.

본 글쓴이는 아래와 같이 받는 큐와 보내는 큐가 달라서 큐를 세가지 생성했으므로 이처럼 queue url을 세가지를 가져왔다. 

 



하지만 그냥 테스트 용도로 사용할거면 아래처럼 queue를 하나만 생성해도 무방하다. 보내는 큐와 받는 큐가 일치하게 만들어서 테스트한다.

 



물론 url을 굳이 application.properties에 기록할 필요 없이 바로 코드에 작성해도 무방하다. 

메세지 받아오기

 

@Scheduled(fixedDelay = 1000)
public void receive() throws JsonProcessingException {
    try{
        ReceiveMessageResult receiveMessageResult = amazonSQSClient.receiveMessage(customerSqsUrl);
        if(!receiveMessageResult.getMessages().isEmpty()){
            Message message = receiveMessageResult.getMessages().get(0);
            String messageBody = message.getBody();
            log.info("message body={}", messageBody);
            amazonSQSClient.deleteMessage(customerSqsUrl, message.getReceiptHandle());
        }
    } catch (QueueDoesNotExistException e){
        log.error("Queue Dose not exist {}", e.getMessage());
    }
}

 


SQS로부터 메시지를 가져오는 메서드다. 

@Scheduled를 사용하여 1초에 한 번씩 메시지를 가져오게 하였다.

메시지를 가져오는 방법은 정말 간단한데, 그냥 amazonSQS 객체의 receiveMessage()에 해당 큐의 url을 담고 실행하기만 하면 된다. 그 메서드의 반환값으로는 receiveMessageResult라는 객체가 반환되는데 여기에 getMessages().get() 메서드를 통해 메시를 하나 가져올 수 있고 다시 이 메시지에서 getBody()를 사용하면 메세지의 Body에 해당하는 내용만 추출할 수 있다.

마지막에 deleteMessage() 메서드를 통해 해당 큐에서 방금 받는 것에 성공한 메시지는 삭제하도록 할 수 있다. (삭제를 안 하면 큐에 계속 쌓이게 된다.)

여튼 해당 메시지를 받아오고 나서 따로 실행할 로직이 있다면 따로 클래스를 만들어 작성한 후, getBody() 메서드 이후에 동작시키면 정상적으로 동작할 것이다.

메세지 보내기

public SendMessageResult sendToRestaurant(String data){
    SendMessageRequest sendMessageRequest = new SendMessageRequest(restaurantSqsUrl, data);
    return amazonSQSClient.sendMessage(sendMessageRequest);
}

 


보내는 건 더 쉽다.
url과 보낼 메시지를 인자로 넣어서 SQS의 sendMessageRequest 객체를 하나 생성한다.
해당 객체를 amazonSQS 객체의 sendMessage 메서드의 매개변수로 넣고 실행해주기만 하면 된다.

실행결과

메시지를 보냈을 경우


AWS console에서 큐에 메시지가 쌓인 것을 확인 할 수 있다.
메시지 송신 및 수신에서 메시지 폴링을 누르면 메시지 큐에 쌓인 메시지들을 확인할 수 있다.



메시지를 받는 경우


log를 통해 아래와 같이 메시지를 수신한 것을 알 수 있다.

 

글을 마치며

AWS SQS는 유료로 제공하는 서비스인 만큼(?) 사용성은 정말 편리하다. standard queue를 사용하면 무제한에 가까운 TPS가 가능하다고 하니 사용하지 않을 이유가 없는 것 같다.