Project/페이스콕

RabbitMQ 도입기

gilbert9172 2025. 2. 20. 03:23

 

❐ Description


  • RabbitMQ에 대해서 공부
  • 왜 SQS를 걷어내고 RabbitMQ를 도입하게 된건가
  • RabbitMQ와 SpringBoot 연동
  • 로컬에서 RabbitMQ 테스트 

 

 

 

 

 

❐ RabbitMQ란?


[TODO] RabbitMQ란? 포스팅 링크
[TODO] 표준 메세징 프로토콜 정리하기 (참고 링크)

 

 

 

 

❐ RabbitMQ 도입하기


1️⃣ Private EC2 접근하기

우선 ssh 커맨드로 public EC2에 접근한다. 현재 RabbitMQ가 구동되는 EC2는 private-subnet에 있기 때문에

public EC2에서 또 ssh 커맨드를 사용해야 한다. 그러나 public EC2에는 pem 파일이 없다. 

따라서 내 로컬 PC에서 public EC2에 pem 파일을 옮겨줘야 한다.

 

이럴 때 scp 커맨드를 사용하면 쉽게 파일을 옮길 수 있다.

scp -i ${pem_file} ${pem_file_path} ubuntu@${public_ec2_ip}:${target_dir}

 

 

2️⃣ RabbitMQ 설치하기

sudo apt update -y && sudo apt upgrade -y

sudo apt install -y rabbitmq-server

# /lib/systemd/systemd-sysv-install
sudo systemctl enable rabbitmq-server

sudo systemctl start rabbitmq-server

sudo rabbitmq-plugins enable rabbitmq_management

sudo systemctl restart rabbitmq-server

정상 실행 확인

 

 

3️⃣ Public EC2 ➙ Private EC2

# 5672 port 확인
ubuntu@ip-10-0-1-52:~$ telnet ${PRIVATE_IP_V4} 5672
Trying 10.0.4.195...
Connected to 10.0.4.195.
Escape character is '^]'.
Connection closed by foreign host.

# 15672 port 확인
ubuntu@ip-10-0-1-52:~$ curl -I http://${PRIVATE_IP_V4}:15672
HTTP/1.1 200 OK
content-length: 2884
content-security-policy: script-src 'self' 'unsafe-eval' 'unsafe-inline'; object-src 'self'
content-type: text/html
date: Thu, 20 Feb 2025 16:04:21 GMT
etag: "2482175333"
last-modified: Thu, 04 Apr 2024 19:54:06 GMT
server: Cowboy
vary: origin

 

왜 telnet으로 접근하면 바로 커넥션이 끊기는 것일까?

RabbitMQ의 AMQP 프로토콜(5672 포트)은 단순 Telnet 클라이언트로 직접 유지할 수 없다. 

왜냐면 Telnet으로 접속하면 RabbitMQ는 잘못된 클라이언트로 간주하고 연결을 즉시 종료하기 때문이다.

따라서 RabbitMQ 자체는 정상적으로 실행되고 있고, Security Group도 정상적으로 설정됐다고 해석할 수 있다.

 

 

4️⃣ DashBoard 접근하기

우선 내 PC에서 private-subnet에 위치해 있는 RabbitMQ 대시보드에 접근하기 위해서 터널링은 필수다. 

로컬 15672포트를 private-ec2의 15672 포트로 터널링
ssh -L 15672:${private_ec2_ip}:15672 -i ${pem_file} ubuntu@${public_ec2_ip}

 

터널링이 성공적으로 수행되면 localhost:15672로 접근해보자. 접근 후 Blank 화면과

`undefined: There is no template at js/tmpl/undefined.ejs` 경고가 노출된다면 아래의 명령어를 입력하자.

명령어 의미 : RabbitMQ의 웹 기반 관리 UI 플러그인을 오프라인일 때도 활성화

 

rabbitmq-plugins enable --offline rabbitmq_management

 

그리고 guest 계정은 EC2에서 직접 접속하는 경우에만 로그인 가능하고, 원격에서 접속하려면 계정을 새로 생성해야 한다.

# 계정, 비밀번호 입력
sudo rabbitmqctl add_user ${user_id} ${user_password}

# 태그 부여
sudo rabbitmqctl set_user_tags ${user_id} administrator

# 권한 부여(configure / write / read)
sudo rabbitmqctl set_permissions -p ${vhost} ${user_id} ".*" ".*" ".*"

 

 

 

 

 

❐ RabbitMQ (with. SpringBoot)


1️⃣ 의존성 주입

현재 프로젝트는 멀티 모듈 구조이기 때문에 infra 모듈에 의존성을 추가했다.

Maven Repository Link
implementation("org.springframework.boot:spring-boot-starter-amqp:3.4.2")

 

 

2️⃣ Yaml 파일 & Yaml을 로드하는 클래스

spring:
  rabbitmq:
    host: ${RABBITMQ_HOST}
    port: ${RABBITMQ_PORT}
    username: ${RABBITMQ_USER}
    password: ${RABBITMQ_PW}
    virtual-host: ${virtual_host}
@ConfigurationProperties(prefix = "spring.rabbitmq")
data class RabbitMQProperties(
    val host: String,
    val port: Int,
    val username: String,
    val password: String,
    val virtualHost: Int
)

 

⚠️`@ConfigurationProperties`를 사용했다면, Spring Context가, 스캔할 수 있도록 추가 설정이 필요⚠️

@ConfigurationPropertiesScan // 이 부분
class FacecockApiApplication

fun main(args: Array<String>) {
    runApplication<FacecockApiApplication>(*args)
}

 

 

3️⃣ RabbitMQ config 작성

@Configuration
class RabbitMQConfig(
    private val properties: RabbitMQProperties
) {

    @Bean
    fun changingConnectionFactory(): ConnectionFactory {
        val factory = ConnectionFactory()
        factory.host = properties.host
        factory.port = properties.port
        factory.username = properties.username
        factory.password = properties.password
        factory.virtualHost = properties.virtualHost
        return factory
    }
    
}

 

 

4️⃣ Docker 환경 구축(🔥🔥🔥)

RabbitMQ는 json파일과 conf 파일로 최초에 컨테이너가 올라갈 때 Queue, Exchange 등의 설정을

미리 정의하고 자동으로 생성할 수 있다. 물론 RebbitMQ Management에 접속해서, GUI를 통해 추가할 수도 있다.

하지만 Queue 또는 Exchange가 많아진다면 매번 설정하기가 번거롭고 설정을 누락하는 실수가 생길 수 있다.

더보기
{
  "vhosts": [
    {
      "name": "/local"
    },
    {
      "name": "/test"
    }
  ],
  "users": [
    {
      "name": "local",
      "password": "local",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": [
        "administrator"
      ]
    },
    {
      "name": "test",
      "password": "test",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": [
        "administrator"
      ]
    }
  ],
  "permissions": [
    {
      "user": "local",
      "vhost": "/local",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "test",
      "vhost": "/test",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "account_modify_queue",
      "vhost": "/local",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-queue-type": "classic"
      }
    },
    {
      "name": "test_account_modify_queue",
      "vhost": "/test",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-queue-type": "classic"
      }
    }
  ],
  "exchanges": [
    {
      "name": "account_exchange",
      "vhost": "/local",
      "type": "topic",
      "durable": true
    },
    {
      "name": "test_account_exchange",
      "vhost": "/test",
      "type": "topic",
      "durable": true
    }
  ],
  "bindings": [
    {
      "source": "account_exchange",
      "vhost": "/local",
      "destination": "account_modify_queue",
      "destination_type": "queue",
      "routing_key": "account.*.update",
      "arguments": {}
    },
    {
      "source": "test_account_exchange",
      "vhost": "/test",
      "destination": "test_account_modify_queue",
      "destination_type": "queue",
      "routing_key": "test.account.*.update",
      "arguments": {}
    }
  ]
}
더보기
# 🔥필수🔥 : json 파일 경로를 정의한다.
load_definitions = /etc/rabbitmq/definitions.json
loopback_users = none
default_user = guest
default_pass = guest
더보기
FROM rabbitmq:3-management

COPY rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
COPY definitions.json /etc/rabbitmq/definitions.json
COPY init-mq.sh /init-mq.sh

RUN chmod +x /init-mq.sh

CMD ["bash", "-c", "/init-mq.sh"]
더보기
#!/bin/bash

# RabbitMQ의 Management Plugin을 활성화하는 명령어
rabbitmq-server --load-definition /etc/rabbitmq/definitions.json

sleep 15

rabbitmq-plugins enable --offline rabbitmq_management
더보기
// 설정 추가
rabbitmq:
  build: ./rabbitmq/
  image: facecock-rabbitmq:latest
  container_name: facecock-rabbitmq
  ports:
    - '5672:5672'
    - '15672:15672'
  environment:
    - RABBITMQ_DEFAULT_USER=local
    - RABBITMQ_DEFAULT_PASS=local
  volumes:
    - facecock_rabbitmq_data:/var/lib/rabbitmq

 

추가적으로 definitions.json을 적용하는 다른 방식을 document에서 확인할 수 있다.

 

 

5️⃣ Message Producer & Message Consumer 정의

@Component
class RabbitMQConsumer(
    private val eventPublisher: EventPublisher
) {
    @RabbitListener(queues = ["큐 이름 입력"])
    fun receiveMessage(message: String) {
        val coachEvent = CoachEvent(message)
        eventPublisher.publishEvent(coachEvent)
    }
}
@Component
class MessageProducer(
    private val rabbitMqTemplate: RabbitTemplate
) {
    fun sendMessage(exchange: String, routingKey: String, message: String) {
        rabbitMqTemplate.convertAndSend(exchange, routingKey, message)
    }
}

 

 

6️⃣ DLX (Dead Letter Exchange)

TODO : DLX (참고 블로그)

 

 

 

 

 

❐ RabbitMQ Test


1️⃣ 추가 설정

우선 테스트 환경에서 필요한 Yaml 설정 하나가 더 있다. 

listener:
  simple:
    auto-startup: false

 

왜 위 설정을 추가할까?

  1. SpringBoot 테스트를 할 때 `RabbitMQConsumer` 빈으로 등록된다.
  2. 이때 `@RabbitListener`에 정의된 xxx-queue를 찾는다.
  3. 현재 vhost`/test`에는 xxx-queue가 없기 때문에 에러가 발생한다.

위 설정을 추가한다면 테스트 환경에서는 `@RabbitListener`를 비활성 처리할 수 있다.

 

 

2️⃣ 테스트 작성

@SpringBootTest
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RabbitMQMessagingTest @Autowired constructor(
    private var publisher: ApplicationEventPublisher,
    private var coachJpaRepository: CoachJpaRepository,
    private var messageProducer: MessageProducer,
    private var messageMQReceiver: RabbitMQReceiver,
    @SpykBean private var coachService: CoachService
) {

    @BeforeEach
    fun setUp() {
        val coach = Coach(1, "coach1")
        coachJpaRepository.save(coach)
    }

    @Test
    @DisplayName("publish & consume")
    fun verify_update_coach_method_called() {
        // 1. 메시지 전송
        messageProducer.sendMessage(
            "test_account_exchange",
            "test.account.modify.update",
            "1"
        )

        // 2. 메시지 수신
        val receiveMessage = messageMQReceiver.receiveMessage("test_account_modify_queue")

        // 3. Event 발행
        publisher.publishEvent(
            CoachEvent(receiveMessage)
        )

        verify(exactly = 1) {
            coachService.updateCoach(any())
        }
    }
}

 

성공적으로 테스트가 통과함을 확인할 수 있다.