본문 바로가기

Spring

[Spring] RabbitMQ를 이용한 이벤트 예약

 

프로젝트를 진행하다가 유저가 특정 이벤트를 등록하고 승인 받은 후 해당 이벤트의 시작 시점이 되면 이벤트의 상태가 변화 되도록 하는 프로세스가 필요했다. 방법은 여러가지가 있지만 난 Message Broker를 이용하기로 했고 그중에서도 RabbitMQ를 이용하기로 했다.

 

RabbitMQ 설치

https://www.rabbitmq.com/download.html

 

Downloading and Installing RabbitMQ — RabbitMQ

Downloading and Installing RabbitMQ The latest release of RabbitMQ is 3.12.2. See change log for release notes. See RabbitMQ support timeline to find out what release series are supported. Experimenting with RabbitMQ on your workstation? Try the community

www.rabbitmq.com

윈도우 기준 위 사이트에서 표시된 부분의 링크를 클릭하면 설치 프로그램을 다운 받을 수 있다. 설치 프로그램 실행시 Erlang이 설치 되지 않은 상태면 Erlang을 먼저 설치해야 된다는 안내가 뜨면서 Erlang다운로드 사이트로 이동한다. 거기서 Erlang 다운받은 후 설치 프로그램을 재 실행한다.

 

로컬에서 테스트를 위해 이렇게 직접 다운 받았지만 AWS 사용시 docker-compose를 이용하거난 AWS에서 제공하는 MQ를 이용하는 것이 더 편하다.

 

RabbitMQ 세팅

 

설치 후 RabbitMQ Service - start나 rabbitmq-server를 실행하면 RabbitMQ이 실행된다.

sbin 폴더에서 cmd 접속후 rabbitmq-plugins enable rabbitmq_management  해당 명령어를 통해 RabbitMQ management를 활성화 시킨다. 15672 포트가 이미 사용시 해당 15672를 사용하는 프로세스를 죽이고 재실행하거나 RabbitMQ management 포트를 변경시키는 방법을 적용해야 한다.

활성화 후 자동으로 사이트가 켜지는 경우가 있지만 아닌 경우 http://localhost:15672를 통해 접속이 가능하다

Default로 guest라는 admin 계정이 생긴다. username, password 둘다 guest를 입력하고 접속한다. 

우선 Exchange를 생성해준다

Exchange는 Message가 Send 되면 먼저 Exchange에서 모으고 정해진 규칙에 따라 Queue로 메세지들이 분배하는 역할을 한다. 

  • Name에 자신이 만들고자 하는 Exchange이름을 적는다.
  • Server가 자기 자신과 통신하는 것이고 복잡한 Routing key 패턴이 필요한 작업이 아니기에 direct type로 설정하였다.
  • Durability를 Durable로 설정해야 브로커가 재실행되더라도 Exchange가 보존되어있다.
  • 각 Type에 대해서는 상황에 맞춰서 선택해야 한다.

 

그 후 Queue를 생성해준다. 

Type만 classic으로 변경한 후 이름을 작성후 queue를 추가해준다.

 

그 후 다시 아까 만들었던 exchange와 queue를 바인딩 시켜준다.

direct type에서는 Routing key로 바인딩 된다.

지금 화면상에는 Routing key와 queue name이 같게 설정이 되어 있는 데 Routing key는 원하는 값으로 설정이 가능하다.

 

Spring RabbitMQ

 

Gradle 의존성 추가

implementation 'org.springframework.boot:spring-boot-starter-amqp'

 

RabbitMQConfig

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;
import java.util.Map;

public class RabbitMQConfig {
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("delay_queue", true);
    }

    @Bean
    public Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("delay_queue").noargs();
    }
//    @Bean
//    MessageConverter messageConverter() {
//        return new Jackson2JsonMessageConverter();
//    }
}
  • Delay message를 이용하기 위해서는 plugin을 사용해야 하는 데 이를 사용하기 위해 CustomExchange를 추가시켜준다. 
  • 아까 생성해준 Queue도 생성해주고 바인딩 규칙으로 이를 묶어준다. 
  • Binding에서 with()의 인자로 Routing key가 들어간다.

프로젝트에선 String만 보냈기에 MessageConverter가 필요없었지만 만약 Message에 추가적인 정보를 넣거나 DTO를 보내고 싶으면 밑에 주석 처리된 부분을 사용하면 된다.

 

Sender

@Service
@RequiredArgsConstructor
@Transactional
public class ConferenceAdminService {
    private final RabbitTemplate rabbitTemplate;
    private final ConferenceRepository conferenceRepository;

    public void permitConference(Long conferenceId){
        Conference conference = conferenceRepository.findById(conferenceId).orElseThrow(() -> new CustomException(CONFERENCE_NOT_FOUND));
        conference.permitted();
        
        long differenceInMillis = getDifferenceTimeFromNow(conference.getStartDate());
        // Message Sender 부분
        MessageProperties messageProperties = new MessageProperties();
        if (differenceInMillis > 0) 
            messageProperties.setHeader("x-delay", differenceInMillis);
        Message message = new Message(String.valueOf(conference.getId()).getBytes(), messageProperties);
        rabbitTemplate.send("delay_exchange", "delay_queue", message);
    }

    public long getDifferenceTimeFromNow(LocalDateTime startTime){
        long now = LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli();;
        long start = startTime.toInstant(ZoneOffset.UTC).toEpochMilli();
        long differenceInMillis = start - now;
        return differenceInMillis;
    }
}

이벤트 시작시간에서 현재 시간을 뺀 만큼 delay주는 방법을 채택했다.

send의 두번째 인자는 queue이름이 아닌 Routing key가 들어가야 한다.

 

Receiver

Api를 호출하면 message 호춯과 함께 조회 쿼리가 날아간다.

 

결론

 이벤트 예약에 관해서는 다양한 방법이 있다. 그 중 가장 많이 고민한 방법은 QuartzMessage broker를 이용한 방법이다. Quartz는 이벤트마다 하나의 스케쥴러를 할당하는 방법으로 구현할 수 있었는데 스케쥴러 하나당 쓰레드를 하나씩 잡아 먹는다. 물론 쓰레드 풀에서 꺼내다 쓰긴 하지만 이벤트가 늘어남에 따라 메모리 사용량이 높아지는 문제가 발생한다. 또한 이벤트 참가자들에게 공지사항이나 단체 메시지등을 보낼 때에도 Message Broker를 사용할 수 있기 때문에 Message broker를 사용하기로 결정했다.

 Message broker 중에서는 kafka, RabbitMQ 두개를 고민했는데 두개 다 해당 기능 구현이 가능했지만 RabbitMQ가 이미 만들어진 플러그인이 있어서 쉽게 구현이 가능했기에 이를 채택했다.

'Spring' 카테고리의 다른 글

[Spring] @SpringBootTest vs @DataJpaTest  (0) 2023.08.10
[Spring] 패키지 구조(계층형, 도메인형)  (0) 2023.01.14