코딩은 마라톤

[RabbitMQ] Delayed Message Plugin를 활용한 메시지 지연 처리 (SpringBoot) 본문

Backend

[RabbitMQ] Delayed Message Plugin를 활용한 메시지 지연 처리 (SpringBoot)

anxi 2025. 2. 4. 18:34

[ 도입 배경 ]

저번 프로젝트에서 활동 자동 종료 기능을 구현해야 했습니다.

 

활동 자동 종료 과정:

1. 사용자가 자투리 시간을 입력한다.

2. 자투리 시간이 지나면, DB의 활동 상태는 자동으로 종료 상태로 변경된다.

 

자동으로 변경해야 하는데, 이 기능을 구현하려면 TTL을 사용해야 한다고 생각했습니다. 

그래서 활동 시작 시점에 TTL을 자투리 시간만큼 설정해두고 TTL이 지나면 비동기 처리되면 되지 않을까.. 생각하고 방법을 모색하던 중에,,,

 

 

RabbitMQDelayed Message Plugin을 알게 되었습니다.

 

 

[ RabbitMQ ]

RabbitMQ는 오픈소스로 개발된 메시지 브로커입니다. RabbitMQ는 여러 프로토콜을 지원하지만, 대다수는 AMQP(Advanced Message Queuing Protocol)에서 사용됩니다.

 

[ 메시지 브로커 ]

 

메시지 브로커는 공식 메시징 프로토콜들 사이에서 메시지를 변환하여, 서로 다른 언어로 작성된 애플리케이션들이 직접 통신(대화, 상호작용)할 수 있도록 합니다. 또한 메시지를 큐에 저장해 웹 서버가 빠르게 응답할 수 있도록 돕고, 작업 부하를 여러 소비자에게 분산하여 효율적인 처리를 가능하게 해줍니다.

 

[ AMQP ]

클라이언트 애플리케이션과 메시지 브로커 간의 통신을 지원하는 메시징 프로토콜

 

[ RabbitMQ 동작 과정 ]

 

 

  • Producer가 메시지를 발행
    • 애플리케이션이 시작될 때 Producer와 Exchange 사이에 채널이 설정됩니다.
    • Producer는 이 채널을 통해 메시지를 Exchange로 보냅니다.
  • Exchange가 메시지를 수신하고 라우팅 결정
    • Exchange는 메시지를 받은 후, 메시지 속성과 Exchange 유형에 따라 적절한 바인딩(메시지를 큐에 보낼 규칙)을 찾습니다.
  • 메시지가 적절한 큐로 전달
    • 선택된 바인딩을 사용하여 메시지를 지정된 큐로 보냅니다.
  • 메시지는 소비되기 전까지 큐에 저장
    • 메시지는 Consumer가 처리할 때까지 큐에 남아 있습니다.
  • Consumer가 메시지를 수신
    • Consumer는 애플리케이션 시작 시 설정된 채널을 통해 메시지를 가져와 처리합니다.

https://medium.com/cwan-engineering/rabbitmq-concepts-and-best-practices-aa3c699d6f08

 

RabbitMQ: Concepts and Best Practices

Introduction

medium.com

https://www.ibm.com/kr-ko/topics/message-brokers

 

메시지 브로커란 무엇인가요? | IBM

메시지 브로커를 사용하면 각기 다른 메시징 프로토콜을 메시지를 변환하여 애플리케이션, 시스템, 서비스들이 서로 통신하고 정보를 교환할 수 있습니다.

www.ibm.com


[ Delayed Message Plugin ]

RabbitMQ에서 TTL을 설정하여 메시지를 보내면, 일반적으로 메시지는 삭제됩니다.

허나, 큐에 Dead Letter Exchange(DLX)가 존재한다면 만료된 메시지는 DLX로 이동하며, DLX에서 메시지를 재처리하는 등 다양한 작업을 수행할 수 있습니다.

 

이러한 구현을 쉽게 할 수 있도록, RabbitMQ에서 Delayed Message Plugin이 나왔습니다.

 

https://docs.spring.io/spring-amqp/reference/amqp/delayed-message-exchange.html

 

Delayed Message Exchange :: Spring AMQP

To use a RabbitAdmin to declare an exchange as delayed, you can set the delayed property on the exchange bean to true. The RabbitAdmin uses the exchange type (Direct, Fanout, and so on) to set the x-delayed-type argument and declare the exchange with type

docs.spring.io

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

 

Scheduling Messages with RabbitMQ | RabbitMQ

For a while people have looked for ways of implementing delayed

www.rabbitmq.com


[ SpringBoot에서 적용하기 ]

1. Spring AMQP 의존성 추가

우선 RabbitMQ를 사용하려면 Spring AMQP 의존성을 추가해야 합니다.

implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'

 

 

2. Delayed Message Plugin Docker Container 생성 및 YML 설정

저는 Docker Compose를 사용하여 Delayed Message Pulgin 컨테이너를 생성했습니다.

하기 깃허브를 통해 Delayed Message Plugin의 버전을 확인하고 이미지를 다운로드 받을 수 있습니다.

 

version: '3.3'

services:
  rabbit:
    image: heidiks/rabbitmq-delayed-message-exchange:latest
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=password
    ports:
      - "5672:5672"
      - "15672:15672"

 

https://github.com/heidiks/rabbitmq-delayed-message-exchange

 

GitHub - heidiks/rabbitmq-delayed-message-exchange: Docker image of RabbitMQ with management and compatible version of the delay

Docker image of RabbitMQ with management and compatible version of the delayed message exchange plugin - heidiks/rabbitmq-delayed-message-exchange

github.com

 

application.yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: password

 

1, 2번을 통해 RabbitMQ와 플러그인 설정이 완료됐습니다.

 

3. RabbitMQ Configuration 작성

@Configuration
@RequiredArgsConstructor
public class RabbitMQConfiguration {

    private final RabbitProperties rabbitProperties;

    private final FinishActivityQueueProperty finishActivityQueueProperty;

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter rabbitMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitProperties.getHost());
        connectionFactory.setPort(rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        return connectionFactory;
    }

    @Bean
    public CustomExchange finishActivityExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(finishActivityQueueProperty.getExchange(), "x-delayed-message", true, false, args);
    }

    @Bean
    Queue finishActivityQueue() {
        return new Queue(finishActivityQueueProperty.getQueue(), false);
    }

    @Bean
    Binding bindingFinishActivityQueue(CustomExchange finishActivityExchange) {
        return BindingBuilder.bind(finishActivityQueue())
                .to(finishActivityExchange)
                .with(finishActivityQueueProperty.getRoutingKey())
                .noargs();
    }
}

 

RabbitMQ 동작과정에서 보았던 Exchange, Binding, Queue를 설정해야 합니다.

여기서 Delayed Message Plugin을 사용하기 위해 Exchange를 추가로 설정해야 합니다.

 

Exchange를 설정할 때, "x-delayed-message" 타입을 지정하여 지연 메시지(Delayed Message) 를 처리할 수 있도록 구성해야 합니다.

또한, "x-delayed-type" 속성을 설정하여 메시지가 어떤 방식으로 라우팅될지 지정할 수 있습니다. 상기 코드와 같이 "direct"로 설정하면 특정 라우팅 키를 가진 메시지만 해당 큐로 전달됩니다.

 

4. 메시지를 발행하는 Producer, 메시지를 소비하는 Consumer 구현

하기 코드부터 제가 프로젝트에서 직접 구현한 코드가 아닌 하나의 클래스에서 가독성있게 보여주기 위해 일부 변경되었습니다.

만일 실제 코드를 보고 싶다면, 

https://github.com/KUSITMS-30th-TEAM-C/backend/pull/93/files#diff-5c1f000142e52bbb2ae5a581d122a3e5f3c068202c597ebd06e8f4ed3954584c

 

Feat: (#78) 자투리 시간을 초과한 활동은 자동으로 종료처리한다 by anxi01 · Pull Request #93 · KUSITMS-30t

✅ PR 유형 어떤 변경 사항이 있었나요? 새로운 기능 추가 버그 수정 코드에 영향을 주지 않는 변경사항(오타 수정, 탭 사이즈 변경, 변수명 변경) 코드 리팩토링 주석 추가 및 수정 문서 수정 빌

github.com

에서 확인 가능합니다!

 

[ Producer ]

@Component
@RequiredArgsConstructor
@Log4j2
public class RabbitMessagePublisher {

    private final RabbitTemplate rabbitTemplate;
    private final FinishActivityQueueProperty queueProperty;
    
    public void publishMessageWithDelay(long delayTime, FinishActivityMessage message) {
        try {
            rabbitTemplate.convertAndSend(
                    queueProperty.getExchange(),
                    queueProperty.getRoutingKey(),
                    message,
                    messagePostProcessor -> {
                        messagePostProcessor.getMessageProperties().setDelayLong(delayTime);
                        return messagePostProcessor;
                    }
            );
        } catch (Exception e) {
            log.error("[MessagePublisher] - publishMessageWithDelay Failed", e);
        }
    }
    
    public void publishFinishActivityMessage(Activity activity) {
        int spareTime = activity.getSpareTime();
        FinishActivityMessage finishActivityMessage = FinishActivityMessage.builder()
                .activityId(activity.getId())
                .spareTime(spareTime)
                .build();
        
        long delayTime = (long) spareTime * 60 * 1000;  // TTL을 분 단위로 설정
        
        publishMessageWithDelay(delayTime, finishActivityMessage);
    }
}

 

위와 같이 메시지를 발행할 때, setDelayLong() 메소드를 이용하여 메시지에 TTL을 적용할 수 있습니다.

 

[ Consumer ] 

@Component
@RequiredArgsConstructor
@Log4j2
public class RabbitMessageConsumer {

    private final ActivityJpaRepository activityJpaRepository;

    @RabbitListener(queues = "${finish-activity-queue.queue}")
    public void consumeMessage(FinishActivityMessage message) {
        try {
            if (message == null) {
                log.error("[FinishActivityMessageConsumer] Message is null");
                return;
            }
            ActivityJpaEntity activity = activityJpaRepository.findById(message.activityId()).orElseThrow(ActivityErrorCode.NOT_EXIST_ACTIVITY::toException);
            activity.finish();
        } catch (Exception e) {
            log.error("[FinishActivityMessageConsumer] Error processing message", e);
        }
    }
}

 

Producer를 통해 메시지가 발행되면 TTL이 지난 뒤 Consumer에서 처리됩니다.

위 로직에서는 message를 통해 현재 진행 중인 활동을 찾아서 종료시키고 있습니다.

 

5. 비즈니스 로직에 메시지 발행 로직 추가

RabbitMQ를 사용하기 위한 초기 설정부터, Producer와 Consumer까지 구현되었습니다.

이제 활동이 시작된 후, Producer를 통해 메시지를 발행해야 합니다.

 

@Service
@RequiredArgsConstructor
@Log4j2
public class UserActivitySelectService {

    private final ActivityRepository activityRepository;

    private final RabbitMessagePublisher rabbitMessagePublisher;

    public UserActivitySelectResponse userActivitySelect(Member member, UserActivitySelectRequest userActivitySelectRequest) {
        ....
        Activity activity = Activity.create(member.getId(), null, userActivitySelectRequest.spareTime(), userActivitySelectRequest.type(), userActivitySelectRequest.keyword(), userActivitySelectRequest.title(), userActivitySelectRequest.content(), userActivitySelectRequest.location());
        Activity savedActivity = activityRepository.save(activity);
        rabbitMessagePublisher.publishFinishActivityMessage(savedActivity);        
        return new UserActivitySelectResponse(savedActivity.getId(), savedActivity.getTitle(), savedActivity.getKeyword());
    }
}

 

위와 같이 활동 선택 로직에서 메시지를 발행할 수 있게 추가하면, 활동 선택 후 메시지가 자투리 시간만큼 TTL이 설정되어 발행됩니다.

이후 Consumer에서 TTL이 지난 후 메시지를 처리함으로써 자동으로 활동 상태를 변경할 수 있습니다.


[ 결과 ] 

TTL 이후 rabbit mq 메시지 발행
활동 상태가 true로 잘 변경되는 모습

 

386412498-7a8302e1-d634-4ac2-a862-b5239405becf.mov
1.57MB

 

 

프로젝트를 끝내고 전시회를 통해 사용자 실테스트를 진행하면서 잘 작동하였습니다.

다수의 요청에도 RabbitMQ의 Delayed Message Plugin이 원활하게 작동함을 확인했으며, 비동기 처리 덕분에 시스템 성능을 저하시키지 않으면서 효율적으로 메시지를 처리하였습니다. 

 

[ 한계 ]


저희 프로젝트에서는 자투리 시간이 최대 3시간인 단기의 지연 시간 처리였지만, 며칠, 몇주 등의 장기적인 스케쥴링 해결책은 될 수 없다고 합니다.

 

또한 제가 개발 당시에 찾아봤을 때, 수십만건까지는 가능하지만 그 이상의 대용량 트래픽 처리에는 한계가 있다는 글을 보았습니다.

(분명 공식 문서에 적혀 있었는데,, 아무리 찾아봐도 어디에 적혀있는지 모르겠네요ㅜㅜ)

 

[ 후기 ]

RabbitMQ의 Delayed Message Plugin을 사용해서 TTL이 지난 메시지를 처리하는 방법에 대해서 알아봤습니다.

 

그때 당시에는 RabbitMQ와 Kafka 둘 중 어떤 것을 사용할 지 고민했었는데, 지금 생각해보니 Redis의 Pub, Sub에 대해서도 고민했음 좋았을 거 같습니다.

 

Delayed Message Plugin을 사용하기 위해 여러 글을 참고하였지만, 생각보다 글이 많이 없더라고요.. (설정이 간단해서 그런가 🧐)

하지만 처음 이걸 사용하려고 맞닥뜨렸을 때, 메시지를 지연하거나 Exchange 설정 등을 진행하면서 꽤나 애먹은 기억이 있네요 😂

 

메시지 지연 처리에 대해 고민하고 있는 개발자 분께 도움이 될 수 있기를 바랍니다!

 

P.S. 만약 메시지 지연 처리에 대한 방법을 알고 계시다면 댓글로 자유롭게 남겨주시면 참고해서 리팩토링 해보겠습니다 😁