본문 바로가기

[Project] 프로젝트 삽질기38 (feat AWS SQS 적용)

어가며

리팩터링 하면서 AWS SQS를 활용한 기능을 개발하고 있습니다. 개발하는 과정에서, AWS SQS를 NestJS에서 어떻게 적용시킬 수 있는지 참고할 수 있는 자료가 거의 존재하지 않았습니다. 이번 기회에 직접 적용해보면서 삽질했던 과정을 정리하면 누군가는 제 글을 통해 도움을 얻을 수 있지 않을까 생각했습니다. Queue 생성은 해당 링크를 참고했고, Queue를 NestJS에서 활용하는 부분은 해당 링크를 참고했습니다.

 
 

 

 

 

 

 

AWS SQS 설정하기

AWS SQS를 활용하기 위해 먼저 IAM 설정을 진행하겠습니다.

 

IAM 설정

먼저 Nestjs에서 AWS SDK를 활용하기 위해서는 AWS IAM에서 제공하는 액세스 키 및 시크리 키 값이 필요합니다. 키 값들을 받아오기 위해 먼저 IAM 설정을 해보겠습니다.

 

 

 

 

IAM 계정의 왼쪽에서 사용자를 누른 후, 사용자 추가를 클릭합니다.

 

 

 

 

저는 test라는 사용자 이름을 설정했고, AWS 자격 증명 유형은 액세스 키를 설정했습니다. 그 후 우측 하단의 다음: 권한을 클릭합니다. 

 

 

그 후, SQS의 모든 권한을 설정하는 정책을 설정한 후 다음을 클릭합니다. 그 후 태그는 넘어갑니다.

 

 

 

 

그럼 우리가 설정한 값들을 검토하는 화면이 나옵니다. 검토해서 우리가 설정한 값이 맞다면 우측 하단의 사용자 만들기를 클릭합니다. 

 

 

 

 

그럼 이렇게 액세스 키와 비밀 액세스 키를 발급받을 수 있습니다. 이 값들을 어딘가에 복사한 후 SQS 설정을 시작하겠습니다. 

 

 

 

 

 

 

SQS 설정

SQS 설정을 진행하겠습니다.

 

 

 

먼저 AWS에서 AWS SQS에 들어가면 다음과 같은 화면이 나올 것입니다. 여기서 대기열 생성을 클릭합니다.

 

 

 

 

 

저는 메시지를 선입선출로 활용하기 위해 FIFO 방식을 사용했습니다. 활용하고자 하는 큐의 이름은 testQueue.fifo로 설정했습니다. FIFO 큐를 활용하고자 한다면, 큐 뒤에 .fifo를 반드시 붙여줘야 합니다.

 

 

 

 

 

그 후 다른 부분은 기본 설정으로 하지만, 메시지 수신 대기 시간만 10초로 설정했습니다.

 

 

 

 

 

그 후 Queue에 접근할 수 있는 액세스 정책을 설정합니다. 저는 지정된 AWS 계정이 접근할 수 있도록 설정했습니다. 이 값에는arn:aws:iam::{accountId}:user/{iam} 형태로 입력해야 합니다. 이때 accountId에 해당하는 값은 AWS 콘솔 우측 최상단 탭에 보이는 사용자 이름을 클릭하면 나옵니다.

 

 

 

 

 

accountId는 모자이크 한 부분의 번호를 기입하고 iam에 해당하는 부분은 IAM 설정해서 입력했던 사용자 이름을 입력하면 됩니다. 저의 경우는 test라는 사용자 이름을 입력했기에, test를 입력합니다.

 

 

 

 

 

그 후 다음 대기열 생성을 클릭하면 Queue가 생성됩니다. 생성한 Queue를 NestJS에 적용하는 방법에 대해 알아보겠습니다.

 

 

 

 

 

 


 

 

 

 

 

 

AWS SQS 적용하기

AWS SQS를 Node.js에서 활용하기 위해 공식 문서의 예제를 살펴봤습니다. 

 

 

 

 

 

 

공식 문서에서는 대기열 사용, 메시지 전송 및 수신, 가시성 제한 시간 초과 관리, 긴 폴링 활성화, 배달 못한 편지 대기열 사용 등의 예시가 존재했습니다. 여기서 대기열 사용은 SQS에서 직접 큐를 생성했기에 살펴보지 않아도 괜찮았습니다. 그럼 여기서 살펴볼 것은 메시지를 어떻게 전송하고 수신하는지와 관련된 내용이었습니다. 

 

 

// Load the AWS SDK for Node.js
var AWS = require('aws-sdk');
// Set the region 
AWS.config.update({region: 'REGION'});

// Create an SQS service object
var sqs = new AWS.SQS({apiVersion: '2012-11-05'});

var params = {
   // Remove DelaySeconds parameter and value for FIFO queues
  DelaySeconds: 10,
  MessageAttributes: {
    "Title": {
      DataType: "String",
      StringValue: "The Whistler"
    },
    "Author": {
      DataType: "String",
      StringValue: "John Grisham"
    },
    "WeeksOn": {
      DataType: "Number",
      StringValue: "6"
    }
  },
  MessageBody: "Information about current NY Times fiction bestseller for week of 12/11/2016.",
  // MessageDeduplicationId: "TheWhistler",  // Required for FIFO queues
  // MessageGroupId: "Group1",  // Required for FIFO queues
  QueueUrl: "SQS_QUEUE_URL"
};

sqs.sendMessage(params, function(err, data) {
  if (err) {
    console.log("Error", err);
  } else {
    console.log("Success", data.MessageId);
  }
});

 

 

 

위의 예시는 SQS에 메시지를 보낼 때 활용할 수 있는 코드입니다. 위 코드처럼 Queue에 메시지를 보낼 수 있겠지만, NestJS에서 SQS를 보다 잘 활용할 수 있는 방법이 없을까 고민했습니다. 이때 알 수 있었던 것이 @ssut/nestjs-sqs 라이브러리였습니다. 라이브러리를 활용하기 위해 먼저 하기와 같이 라이브러리를 설치했습니다.

 

 

npm i @ssut/nestjs-sqs
npm i aws-sdk

 

 

위와 같이 설치했다면, 메시지를 보내는 Producer와 메시지를 소비하는 Consumer를 설정해보겠습니다.

 

//producer.module.ts
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { MessageProducer } from './producer.service';
import * as AWS from 'aws-sdk';
import { config } from '../config';

AWS.config.update({
    region: config.AWS_REGION, // aws region
    accessKeyId: config.ACCESS_KEY_ID, // aws access key id
    secretAccessKey: config.SECRET_ACCESS_KEY, // aws secret access key
});


@Module({
    imports: [
        SqsModule.register({
            consumers: [],
            producers: [
                {
                    name: config.TEST_QUEUE, // name of the queue
                    queueUrl: config.TEST_QUEUE_URL, 
                    region: config.AWS_REGION, // url of the queue
                },
            ],
        }),
    ],
    controllers: [],
    providers: [MessageProducer],
    exports: [MessageProducer]
})
export class ProducerModule { }

 

 

먼저 Producer를 설정하기 위해 위와 같이 ProducerModule을 설정했습니다. 먼저 AWS.config.update 설정을 통해 AWS 사용을 위한 준비를 하고, @ssut/nestjs-sqs 라이브러리에서 SqsModule을 불러와서 등록합니다. 이때 Producer 역할만 하기에, producer에 대한 설정만 진행합니다. 

 

 

//producer.service.ts
import { Injectable } from '@nestjs/common';
import { SqsService } from '@ssut/nestjs-sqs';
import { config } from '../config';
@Injectable()
export class MessageProducer {
    constructor(private readonly sqsService: SqsService) { }
    async sendMessage(body: any) {

        const message: any = JSON.stringify(body);

        try {
            await this.sqsService.send(config.TEST_QUEUE, {
            id: 'id',
            body: message,
            groupId: 'test',
            deduplicationId: 'test'
            });
        } catch (error) {
            console.log('error in producing image!', error);
        }

    }
}

 

 

그 후, SqsModule을 Module에서 등록했으니 sqsService를 주입받아서 활용할 수 있습니다. 이때 sendMessage라는 메서드를 활용해서 큐에 메시지를 보내는 방식을 알아보겠습니다. 간단합니다. sqsService.send 메서드를 활용하여 메시지를 보냅니다. send 메서드의 첫 번째 파라미터로 config.TEST_QUEUE라는 이름이 등록되어 있는데, 위에서 우리가 설정한 Queue의 이름을 여기에 작성하면 됩니다.

 

 

그 후, 우리는 FIFO를 활용하기 때문에 sqsService.send의 두 번째 파라미터로 객체를 넘겨줍니다. 이때 groupId와 deduplicationId를 반드시 넘겨줘야 합니다. 여기서 groupId는 FIFO 큐에서 필수 값인 메시지가 특정 그룹에 속하는 것을 의미하는 값입니다. 그리고 deduplicationId는 FIFO 큐에서 필수 값인 메시지 중복 제거용 ID 값입니다. 이 값은 모든 메시지에서 고유한 값이어야 합니다. 

 

 

 

//app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProducerModule } from './producer/producer.module';

@Module({
  imports: [
    ProducerModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

 

 

그 후 Producer와 관련된 모듈을 AppModule에 등록하면 Producer를 쉽게 활용할 수 있습니다. 그럼 이제 Consumer를 설정해보겠습니다.

 

 

//consumer.module.ts
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { MessageHandler } from './messageHandler';
import * as AWS from 'aws-sdk';
import { config} from '../config';


AWS.config.update({
    region: config.AWS_REGION,
    accessKeyId: config.ACCESS_KEY_ID,
    secretAccessKey: config.SECRET_ACCESS_KEY,
});
@Module({
    imports: [
        SqsModule.register({
            consumers: [
                {
                    name: config.TEST_QUEUE, // name of the queue 
                    queueUrl: config.TEST_QUEUE, // the url of the queue
                    region: config.AWS_REGION,
                },
            ],
            producers: [],
        }),
    ],
    controllers: [],
    providers: [MessageHandler],
})
export class ConsumerModule { }

 

 

먼저 ConsumerModule을 생성해서, Producer 설정과 같은 형태로 SqsModule을 등록합니다. 이때 Consumer에 관한 설정만 담아둡니다. 

 

 

//messageHandler.ts
import { Injectable } from '@nestjs/common';
import { SqsMessageHandler } from '@ssut/nestjs-sqs';
import * as AWS from 'aws-sdk';
import { config } from '../config';


console.log('config.AWS_REGION', config);
@Injectable()
export class MessageHandler {
    constructor() { }
    @SqsMessageHandler(config.TEST_QUEUE, false)
    async handleMessage(message: AWS.SQS.Message) {
        const obj: any = JSON.parse(message.Body) as {
            message: string;
            date: string;
        };
        const { data } = JSON.parse(obj.Message);

        // use the data and consume it the way you want // 

    }
}

 

 

그 후, messageHandler.ts 파일을 만들고 AWS SQS에서 메시지를 수신한 후 활용하는 로직을 작성하겠습니다. 간단하게 SqsMessageHandler 데코레이터를 활용하여 큐의 메시지를 가져올 수 있습니다. 

 

 

// Load the AWS SDK for Node.js
var AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'REGION'});

// Create an SQS service object
var sqs = new AWS.SQS({apiVersion: '2012-11-05'});

var queueURL = "SQS_QUEUE_URL";

var params = {
 AttributeNames: [
    "SentTimestamp"
 ],
 MaxNumberOfMessages: 10,
 MessageAttributeNames: [
    "All"
 ],
 QueueUrl: queueURL,
 VisibilityTimeout: 20,
 WaitTimeSeconds: 0
};

sqs.receiveMessage(params, function(err, data) {
  if (err) {
    console.log("Receive Error", err);
  } else if (data.Messages) {
    var deleteParams = {
      QueueUrl: queueURL,
      ReceiptHandle: data.Messages[0].ReceiptHandle
    };
    sqs.deleteMessage(deleteParams, function(err, data) {
      if (err) {
        console.log("Delete Error", err);
      } else {
        console.log("Message Deleted", data);
      }
    });
  }
});

 

 

공식 문서에 맞게 활용한다면, 메시지를 받기 위해 reseiveMessage 메서드를 활용하고, 메시지를 받았을 경우 메시지를 지우는 deleteMessage 메서드를 활용해야 하지만, 위의 SqsMessageHandler 메서드를 활용하면 이 과정을 줄일 수 있습니다. 

 

 

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ConsumerModule } from './consumer/consumer.module';
import { ProducerModule } from './producer/producer.module';

@Module({
  imports: [
    ProducerModule,
    ConsumerModule
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

 

 

 

그 후, AppModule에 ConsumerModule을 import 하면 AWS SQS를 활용할 수 있습니다. 

 

 

 

 

 

 


 

 

 

 

 

 

마치며

앞으로도 팀의 발전을 돕는 개발자가 되기 위해 노력하려 합니다. 팀에 필요한 부분이 무엇일지 고민하면서, 팀에 도움이 된다면, 열심히 공부해서 실무에 적용할 수 있는 개발자가 되기 위해 노력하고 싶습니다. 팀의 성장에 기여할 수 있는 개발자가 되겠습니다. 

 

 

 

 

 

 


 

 

 

 

 

참고 및 출처

 

Amazon SQS에서 대기열 사용 - AWS SDK for JavaScript

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com

 

Node.js 환경에서 SQS 를 사용해봅니다. (1)

Node.js 환경에서 SQS 를 이용하여 데이터를 주고 받아 봅시다.

velog.io

 

@ssut/nestjs-sqs

[![Test](https://github.com/ssut/nestjs-sqs/workflows/Test/badge.svg)](https://github.com/ssut/nestjs-sqs/actions?query=workflow%3ATest) [![npm version](https://badge.fury.io/js/%40ssut%2Fnestjs-sqs.svg)](https://badge.fury.io/js/%40ssut%2Fnestjs-sqs). Lat

www.npmjs.com

 

Using Amazon SQS in Nest.

I have been working lately with Amazon SQS on a nest project. I thought of sharing the experience...

dev.to