Project/페이스콕

AWS SQS 도입기

gilbert9172 2025. 2. 14. 20:56

❐ Description


SQS가 무엇이고, 왜 SQS를 도입하기로 했는지 기록해보자.

AWS SQS란?

 

 

 

 

 

❐ SQS를 도입하자.


현재 페이스콕 프로젝트의 Member 테이블은 Cafe24에 위치해 있고, Firebase에서 이를 복사해서 사용하는

구조로 되어있다. 여기에 더해 이번에 추가하는 Lesson 도메인에서도 Member 정보가 필요한 상황이다.

왜냐면 Lesson 도메인에서는 Member와 다른 테이블을 Join해서 데이터를 조회해야 하기 때문이다.

 

단순히 Member 테이블을 AWS Rds로 옮기면 해결될 문제라고 생각할 수 있지만, 아래의 문제가 발생한다.

  • Member 도메인에 관련된 비즈니스 로직을 모두 가져와야 한다.
  • OAuth 구현까지 가져와야 한다.

시간이 무한정이라면 충분히 할 수 있는 작업이지만, 현재 주어진 시간이 많이 남지 않았기 때문에

Member 테이블을 옮기는 방법은 선택할 수 없었다.

그럼 결국 Cafe24와 Firebase와 RDS의 Sync를 맞춰줘야만 한다.

 

총 3개의 아이디어로 논의를 했다.

 

1. Cafe24 DB 커넥션

NLB를 구축하여 Elastic IP를 할당 받아서, Cafe24에 접근 가능한 IP로 등록하는 방법

➔ 정합성 걱정 없음. (Cafe24에 위치한 Member를 쓰기 때문에)

➔ 구현 난이도 쉬움.

➔ Join을 할 수 없음.

 

2. SpringBoot에서 Firestore를 관찰

➔ SpringBoot 서버가 죽는다면? 추가적인 보상 정책 필요

➔ 약간의 지연으로 인해 데이터가 즉시 반영되지 않을 수 있음.

➔ Join 할 수 있음.

 

3. Firebase cloud Function에서 Event 발행

➔ Join 할 수 있음.

➔ 이벤트 소모되지 않는한, 무조건 1회 소모 가능(Kafka 또는 SQS 사용시)

➔ 속도도 느리지 않음.

➔ 추가 구축이 필요함.

➔ 작은 규모에서는 무료로 사용가능

 

 

안정성과 비용을 고려했을 때, 3번 방법을 채택하기로 결정했다. 그리고 이제 구현체로 Kafka를 사용할지 SQS를

사용하지 결정할 일만 남았다. Kafka는 기술적인 숙련도 미흡과 추가 Infra 구축 및 관리 부분에서 문제가 됐다.

 

그래서 비교적 빠른 구현과 관리 리소스가 적은 SQS를 선택했다. 무엇보다 현재 사용하려는 목적(Member 테이블

정합성)을 달성하기엔 SQS면 충분하다고 생각했다. 

 

 

 

 

 

❐ SQS(feat. SpringBoot3)


1. SQS 대기열 생성

AWS 콘솔에서 SQS 대기열을 생성해줘야 한다. 

  • 메시지를 무조건 1회 처리해야 하기 때문에 FIFO 선택

 

  • 표시 제한 시간 : 다른 소비자가 메시지를 처리하는 동안(20초) 다른 소비자는 볼 수 없음.
  • 메시지 보존 기간 : 메시지 소비가 실패한 경우 메시지 보존 기간 (기본: 4일)
  • 전송 지연 : 대기열에 메시지가 추가되었을 때 소비되는 시간 (0초 : 즉시 소비)
  • 최대 메시지 크기 : 
  • 메시지 수신 대기 시간 : 

 

추가적으로 DLQ도 생성해주어야 한다. 무조건 메시지가 1회 처리되어야 하는 환경에서 메시지가 N번 실패할 경우

해당 메시지를 휘발하지 않고 가지고 었어야 하기 때문이다. 또한 문제가 있는 메시지가 계속해서 SQS에서 재시도되면

리소스 낭비 발생하는데 DLQ가 있으면, 비정상 메시지를 빠르게 식별하고, 자동 재처리를 방지할 수 있다.

  • 우선 DLQ를 생생해준다.
  • 기존에 생성한 대기열에 DLQ를 연결해준다.

 

2. IAM 정책 추가

생성한 대기열에 Message를 요리조리 다룰수 있게 IAM 정책추가를 해줘야한다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:SendMessage",
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": ${queue_url}
        }
    ]
}

 

 

3. Add Dependency

현재 프로젝트는 멀티 모듈 구조이기 때문에 SQS 관련 의존성은 Infra 모듈에 추가했다.

implementation("io.awspring.cloud:spring-cloud-aws-starter:3.3.0")
implementation("io.awspring.cloud:spring-cloud-aws-sqs:3.3.0")
implementation("software.amazon.awssdk:sqs:2.30.21")
implementation("software.amazon.awssdk:auth:2.30.21")

 

 

4. SQS Configuration

총 4개의 프로파일(로컬 / 테스트 / 개발계 / 운영계)에 대해서 설정을 진행해줘야 한다고 판단했다.

그래서 SqsConfig 인터페이스를 작성하고, 각 프로파일에 맞게 구현하기로 결정했다.

 

interface SqsConfig {

    // SQS 통신의 핵심
    fun sqsAsyncClient(): SqsAsyncClient
    
    // 소비자의 정책을 정의
    fun defaultSqsListenerContainerFactory(): SqsMessageListenerContainerFactory<Any> {
        return SqsMessageListenerContainerFactory.builder<Any>()
            .configure { opt ->
                opt.acknowledgementMode(AcknowledgementMode.MANUAL)
            }
            .acknowledgementResultCallback(AckResultCallback())
            .sqsAsyncClient(sqsAsyncClient())
            .build()
    }

    // Template 관련
    fun sqsTemplate(): SqsTemplate {
        return SqsTemplate.newTemplate(sqsAsyncClient())
    }
}

 

여기서 하나 봐야할 부분은 `defaultSqsListenerContainerFactory( )` 부분이다.

SpringBoot에서 메시지를 Poll하게 되는데, 그 이후 메시지 수신 여부를 AWS에 알려줘야 한다.

 

이 설정을 AcknowledgementMode를 사용해서 할 수 있다. 

Document link
  • ON_SUCCESS : 메시지를 성공적으로 소비하면 승인한다.
  • ALWAYS : 메시지 소비 결과에 상관없이 승인한다.
  • MANUAL : 메시지 소비 결과에 따라, 개발자가 추가적인 조치를 할 수 있다.

나의 경우에는 Acknowledge의 결과에 따라 로그를 찍는 컴포넌트를 작성해줬다.

@Slf4j
@Component
class AckResultCallback : AcknowledgementResultCallback<Any> {

    private val logger: Logger = LoggerFactory.getLogger(AckResultCallback::class.simpleName)

    override fun onSuccess(messages: MutableCollection<Message<Any>>) {
        logger.info("Ack with success at {}", OffsetDateTime.now())
    }

    override fun onFailure(messages: MutableCollection<Message<Any>>, t: Throwable) {
        logger.info("Ack with failed at {}", OffsetDateTime.now())
    }
}

 

 

그럼 이제 구현체를 만들어줘야 한다. (여기서는 개발계 설정 코드를 가져왔다.)

@Profile("dev")
@Configuration
class DevSqsConfig : SqsConfig {

    @Value("\${spring.cloud.aws.credentials.access-key}")
    private val accessKey: String? = null

    @Value("\${spring.cloud.aws.credentials.secret-key}")
    private val secretKey: String? = null

    @Bean
    override fun sqsAsyncClient(): SqsAsyncClient {
        val credential = StaticCredentialsProvider.create(
            AwsBasicCredentials.create(accessKey, secretKey)
        )
        
        return SqsAsyncClient.builder()
            .region(Region.AP_NORTHEAST_2)
            .credentialsProvider(credential)
            .build()
    }

    @Bean
    override fun defaultSqsListenerContainerFactory(): SqsMessageListenerContainerFactory<Any> {
        return super.defaultSqsListenerContainerFactory()
    }

    @Bean
    override fun sqsTemplate(): SqsTemplate {
        return super.sqsTemplate()
    }
}

 

 

여기서 중요한 부분은 `credentialProvider(...)` 부분이다. 최초에 저 부분을 아래와 같이 작성했다.

credentialsProvider(DefaultCredentialsProvider.create())

얼핏봤을 때는 문제가 없는 코드다. 기본적으로 AWS 정보를 가져와서 Credential을 만들어 주는 코드다.

하지만 문제는 로컬 환경이 아닐 때 발생한다. 

 

로컬 환경에서는 AWS CLI 환경변수나 `~/.aws/credentials` 파일을 통해 인증이 자동으로 되어 문제가 없다.

하지만 배포 환경에서는 AWS Credentials가 설정되지 않았거나, IAM Role이 없어서 인증 실패 가능성이 높다.

결과적으로 AWS SDK가 credential을 생성하는데 필요한 Key를 찾지 못해 SQS 요청이 실패한 것이다.

 

 

5. YAML 파일 작성

Yaml 파일은 아래의 형식으로 작성해줬다.

spring:
  config:
    activate:
      on-profile: dev
  cloud:
    aws:
      sqs:
        enabled: true
        endpoint: https://sqs.ap-northeast-2.amazonaws.com
      credentials:
        access-key: ${AWS_PUBLIC_KEY}
        secret-key: ${AWS_PRIVATE_KEY}
      region:
        static: ap-northeast-2

custom:
  aws:
    sqs:
      queue-url: ${MEMBER_UPDATE_SQS}

 

 

 

5. Publisher & Listener

이번에는 SQS에 메시지를 발행하고 소비하기 위한 Publisher와 Listener를 구현할 것이다.

 

1️⃣ Publisher

@Component
@PropertySource(value = ["classpath:application-sqs.yaml"], factory = YamlLoadFactory::class)
class SqsMessagePublisher(
    private val sqsTemplate: SqsTemplate,
    @Value("\${custom.aws.sqs.queue-url}") private val queueUrl: String
) {

    fun send(payload: String) {
        sqsTemplate.send { sendOpsTo ->
            sendOpsTo
                .queue(queueUrl)
                .messageDeduplicationId(UUID.randomUUID().toString())
                .payload(payload)
        }

    }
}
  • Yaml 파일을 읽기 위해, Custom PropertySourceFactory(YamlLoadFactory) 구현
  • messageDeduplicationId : FIFO 큐에서 중복된 메시지를 Id 기반으로 식별 (중복된 경우 대기열에 추가 ❌)

 

2️⃣ Listener

@Component
class SqsMessageListener(
    private val eventPublisher: EventPublisher
) {
    @SqsListener("MemberUpdateQueue.fifo")
    fun handleMessage(message: String, acknowledgement: Acknowledgement) {
        
        // Poll한 메시지의 값으로 SpringEvent 생성
        val coachEvent = CoachEvent(message)
        
        // SpringEvent 발행 
        eventPublisher.publishEvent(coachEvent)

        // SpringEvent 소비완료 후 SQS Message 소비 완료하기
        acknowledgement.acknowledge()
    }
}

`@SqsListener` 애노테이션을 사용하면 쉽게 Listener를 구현할 수 있다.

 

해당 애노테이션을 살펴보면 아래와 같은 속성들을 가지고 있으며, 각자 필요에 따라 설정하면 된다. 

 

 

 

 

 

❐ LocalStack


1.  LocalStack이란?

SQS를 어떻게 테스트 할 수 있을지 구글링을 하다가 LocalStack 이라는 `테스트 컨테이너`를 알게됐다.

LocalStack은 AWS 서비스를 로컬 환경에서 에뮬레이션할 수 있도록 도와주는 오픈 소스 도구로,

실제 AWS 환경을 사용하지 않고도, 로컬에서 AWS의 다양한 서비스(S3, SQS, DynamoDB 등)를 모의

테스트할 수 있는 환경을 제공한다.

 

 

2.  LocalStack을 사용하게 된 이유 

우선 SQS로 메시지를 Publish, Poll하는 과정을 AWS 리소스 없이 확인하고 싶었다.

만약 AWS 리소스를 사용하게 된다면, 대기열이 늘어날 때 마다 총 3개(개발 / 운영 / 테스트)의 대기열을

추가해야 한다. 이는 곧 AWS 사용료 증가로 이어질 확률이 높다.

 

 

3.  LocalStack의 단점

  • 기능 제한 : 모든 AWS 서비스가 100% 동일하게 작동하는 것은 아니다.
  • 퍼포먼스 이슈 : 대규모 테스트에서는 성능이 낮을 수도 있다.
  • AWS 최신 기능 지원이 느릴 수 있음 : LocalStack이 따라가기까지 시간이 걸린다.

 

 

 

 

 

❐ LocalStack Setting


1.  의존성 주입

Maven Repository Link
testImplementation("org.testcontainers:localstack:1.20.4")

 

 

2. DockerCompose.yaml 작성하기

나의 경우 Local 개발 환경을 docker compose로 구성했기 때문에, 기존 Yaml 파일에

LocalStack 구성을 추가해줬다.

  • environment
    • SERVICES=sqs (SQS 서비스만 사용)
    • DEBUG=1 (Debug mode on)
    • PERSISTENCE = 1 (데이터 영구저장)
더보기

Yaml 설정을 할 때 다음과 같은 에러를 마주쳤다.

ERROR: 'rm -rf "/tmp/localstack"': exit code 1; output: b"rm: cannot remove '/tmp/localstack': Device or resource busy\n"

이 오류는 도커 컨테이너를 실행할 때 /tmp/localstack 디렉토리를 삭제하려고 시도했지만,

해당 디렉토리가 사용 중이어서 삭제할 수 없는 경우 발생하는 문제다. 

 

관련해서 구글링을 하는 과정에서 LocalStack 가이드 문서를 참고해서 문제를 해결할 수 있었다. 

위 내용을 요약하자면,

  • `DATA_DIR` `HOST_TMP_DIR` 같은 개별 설정은 이상 지원되지 않음.
  • 데이터를 유지하려면 PERSISTENCE=1 사용. 그러면 /var/lib/localstack/state 저장.
  • LocalStack은 볼륨 마운트를 분석하여 자동으로 임시 폴더(HOST_TMP_FOLDER)를 설정함.

 

 

3. LocalStack init.sh

LocalStack 컨테이너를 구동할 때, 자동으로 대기열을 생성해주는 Shell script를 작성했다.

#!/bin/bash
awslocal sqs create-queue \
    --queue-name MemberUpdateQueue.fifo \
    --attributes '{
        "FifoQueue": "true",
        "ContentBasedDeduplication": "true"
    }' \
    --endpoint-url http://localhost:4566
  • "FifoQueue": "true"  ➩  FIFO 대기열임을 설정
  • "ContentBasedDeduplication": "true"  ➩  중복되는 메시지를 방지하기 위한 설정

 

위 도커 파일을 volumes 부분에서 mount도 해줘야 한다.

volumes:
  - ./localstack/init/init-sqs.sh:/etc/localstack/init/ready.d/init-sqs.sh

 

 

4. docker compose up -d

이제 모든 준비가 끝났으니 컨테이너를 띄어보자.

 

1️⃣ SQS 서비스가 제대로 구동중인지 health check를 진행해보자.

curl -f http://localhost:4566/_localstack/health

정상적으로 구동중임을 확인!

 

 

2️⃣ init.sh이 정상 실행됐는지 확인해보자.

docker logs ${container_name} | grep ${queue_name}
# docker logs facecock-localstack | grep MemberUpdateQueue.fifo

init.sh도 문제없이 정상 실행됨을 확인!

 

 

 

 

 

❐ Test 코드 작성


진행할 테스트 플로우는 아래와 같다. 

  • SQS에 `test.fifo` 대기열 생성
  • Publish & Poll Message
  • Publish Spring Event
  • Verify Method Call
@SpringBootTest
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class SqsMessagePublisherTest @Autowired constructor(
    private val publisher: ApplicationEventPublisher,
    private val coachJpaRepository: CoachJpaRepository,
    private val sqsAsyncClient: SqsAsyncClient
) {
    @SpykBean
    private lateinit var coachService: CoachService

    private lateinit var queueUrl: String

    @BeforeAll
    fun setUp() {
        val createQueueResponse = sqsAsyncClient.createQueue(
            CreateQueueRequest.builder()
                .queueName("test.fifo")
                .attributes(
                    mapOf(
                        QueueAttributeName.FIFO_QUEUE to "true",
                        QueueAttributeName.CONTENT_BASED_DEDUPLICATION to "true"
                    )
                )
                .build()
        )
        queueUrl = createQueueResponse.get().queueUrl()

        val coach = Coach(1, "coach1")
        coachJpaRepository.save(coach)
    }
    
    @Test
    @DisplayName("SQS에서 Poll 받은 메시지를 처리하는 리스너 호출 Test")
    fun verify_update_coach_method_called() {
        // 1. Publish Message
        val messageBody = "1"
        sqsAsyncClient.sendMessage(
            SendMessageRequest.builder()
                .queueUrl(queueUrl)
                .messageGroupId("message-group-id")
                .messageBody(messageBody)
                .build()
        )

        // 2. Poll Message
        val receiveMessageResponse = sqsAsyncClient.receiveMessage(
            ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(1)
                .waitTimeSeconds(1)
                .build()
        )

        // 3. Get payload from polled message
        val receivedMessageBody = receiveMessageResponse.get()
            .messages()[0]
            .body()

        // 4. Publish Spring Event
        publisher.publishEvent(
            CoachEvent(receivedMessageBody)
        )

        // 5. Verify method call
        verify(exactly = 1) {
            coachService.updateCoach(any())
        }
    }
}

 

1️⃣ @SpykBean

테스트 과정에서 Mocking된 빈을 주입하고 싶었다. 하지만 MockK에서는 @MockBean, @SpyBean을 지원하지

않았고, 구글링을 하는 과정에서 Ninja-Squad/springmockk를 사용해야 함을 알게됐다.

 

그럼 왜 MockK에서는 @MockBean, @SpyBean와 같은 애너테이션을 지원하지 않는 것일까?

  • Mockk는 Spring Boot 전용 Mocking 프레임워크가 아니라 Kotlin 특화 Mocking 라이브러리
  • Spring Boot는 기본적으로 Mockito를 공식 지원하며, Mockk 지원을 중단함.

결과적으로 Mockk를 사용하면서 Spring Boot의 @MockBean, @SpyBean 기능을 활용하려면

springmockk 의존성을 추가해야 한다.

 

 

2️⃣ @Autowired 생성자

관련해서 읽은 논리적인 블로그

최초에는 아래와 같이 코드를 작성했다.

@Autowired
private lateinit var publisher: ApplicationEventPublisher

@Autowired
private lateinit var coachJpaRepository: CoachJpaRepository

//...

 

하지만 생성자 주입을 사용하게 되면 `lateinit var` 키워드를 사용하지 않고 보다 깔끔히 작성할 수 있다.

@SpringBootTest
class SqsMessagePublisherTest @Autowired constructor(
    private val publisher: ApplicationEventPublisher,
    private val coachJpaRepository: CoachJpaRepository,
    private val sqsAsyncClient: SqsAsyncClient
) {
    //...
}

 

 

3️⃣ Test 결과

(많은 실패가 있었지만) 결국 성공했다!!

 

 

 

 

 

❐ 개발계 배포하기


로컬에서는 통과했던 테스트가 CI 환경에서는 실패하는 문제를 만났다. 위에서 잠깐 언급 했었던 이유가 바로 이 문제

때문이다. `DevSqsConfig.kt` 소스의 credential 부분을 수정하니 배포까지 무리없이 성공했다.

 

 

 

 

 

❐ 많이 사용한 Command


# 현재 Credential
aws sts get-caller-identity

# 현재 Queue List
awslocal sqs list-queues

# 도커 로그 확인
docker logs facecock-localstack | grep sqs

 

 

 

 

 

❐ 참고 자료