들어가며
사이드 프로젝트에서 푸시 알림을 활용한 서비스를 개발하고 있습니다. 그 과정에서 생각하고 배웠던 점들을 하나씩 작성하고자 합니다. 저번 글에서는 Queue에 대해 알아봤고, Queue 중에서도 Bull을 사용하는 것에 대해 알아봤습니다. 이번 시간에는 Bull에 대해 조금 더 자세히 알아보겠습니다.
Bull 활용하기
Bull은 Node.js에서 활용할 수 있는 Redis 기반의 큐 시스템 라이브러리입니다. 물론 기본적인 Redis를 활용해서 큐를 직접 구현할 수 있지만, Bull을 활용하면 Redis의 기본 기능 이상으로 큐를 효율적으로 활용할 수 있습니다. 저번 글에서는 Bull에 대해 간략하게 활용해봤다면, 이번 글에서는 Bull에 대해 조금 더 자세히 알아보겠습니다.
Bull을 사용하기 전에, 먼저 의존성을 설치하겠습니다.
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
Setting
설치가 완료됐다면, BullModule을 AppModule에 imports 해서 활용합니다.
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
The forRoot() method is used to register a bull package configuration object that will be used by all queues registered in the application (unless specified otherwise). A configuration object consist of the following properties:
forRoot() 메서드를 활용해서 설정하는데, 다음의 구성 요소를 설정할 수 있습니다.
- limiter: RateLimiter - 큐의 작업이 처리되는 속도를 제어하는 옵션입니다.
- redis: RedisOpts - Redis 연결을 구성하는 옵션입니다.
- defaultJobOptions: JobOpts - 새로운 jobs의 기본 설정을 제어하는 옵션입니다.
모든 옵션은 옵션이고, 큐 동작을 상세하게 제어할 수 있습니다.
Register Queue
위와 같이 BullModule을 설정했다면, 아래에서는 큐를 등록하는 방법에 대해 알아보겠습니다. 먼저 이름이 audio인 큐를 등록하겠습니다.
BullModule.registerQueue({
name: 'audio',
});
만약 audio와 같은 하나의 큐를 등록하는 것이 아닌, 여러 개의 큐를 등록하려면 콤마를 활용해서 이름을 추가하면 됩니다. registerQueue() 메서드는 큐의 인스턴스화 및 등록에 사용됩니다. 큐는 동일한 credential을 사용하여 동일한 Redis 데이터베이스에 접속하는 모듈 및 프로세스 간에 공유됩니다.
만약 특정 큐에 대해서 옵션을 다르게 설정하고 싶다면 아래와 같이 설정할 수 있습니다.
BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
});
Named configurations
큐가 여러 Redis 인스턴스에 연결되어 있는 경우 Configuration이라는 이름의 기술을 사용할 수 있습니다. 이 기능을 사용하면 지정된 키로 여러 설정을 등록할 수 있습니다. 예를 들어 응용 프로그램에 등록된 몇 개의 큐에서 사용되는 추가 Redis 인스턴스(기본 인스턴스 이외)가 있다고 가정하면 다음과 같이 설정을 등록할 수 있습니다.
BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
});
위에서 설정한 이름으로 registerQueue() options 객체의 설정을 가리킬 수 있습니다.
BullModule.registerQueue({
configKey: 'alternative-queue'
name: 'video',
});
Producers
큐에 작업을 추가하려면 먼저 다음과 같이 큐를 서비스에 삽입합니다.
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue 데코레이터는 registerQueue 메서드 호출에서 제공되는 이름으로 큐를 식별합니다. 이제 큐의 add 메서드를 호출해서 작업을 추가하겠습니다.
const job = await this.audioQueue.add({
foo: 'bar',
});
Named Jobs
작업은 고유한 이름을 가질 수 있습니다. 이를 통해 지정된 이름의 작업만 처리하는 전문 소비자를 생성할 수 있습니다.
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
이름 있는 작업을 사용할 경우 큐에 추가된 고유 이름별로 프로세서를 생성해야 합니다. 이 내용은 Consumers에서 더 자세히 살펴보겠습니다.
Job Options
작업에 연결된 추가 옵션이 있을 수 있습니다. Queue.add() 메서드에서 job 인수 뒤에 옵션 개체를 전달합니다. 작업 옵션 속성은 다음과 같습니다
- priority: number - 우선순위, 최우선 순위를 1로 넣고, INT값의 끝까지를 우선순위 값으로 설정할 수 있습니다.
- delay: number - 이 작업을 처리할 수 있을 때까지 대기하는 시간입니다.
- attempts: number - 작업이 완료될 때까지 시도한 총횟수입니다.
- repeat: RepeatOpts - cron에 대한 명시에 따라 작업을 반복합니다. RepeatOpts의 인터페이스는 아래에 첨부했습니다.
- backoff: number | BackoffOpts - 작업이 실패할 경우 자동 재시도를 위한 백오프 설정입니다. BackoffOpts의 인터페이스는 아래에 첨부했습니다.
- lifo: boolean - true일 경우 왼쪽 끝이 아닌 오른쪽 끝에 작업을 추가합니다.
- timeout: number - 작업이 시간 초과 오류로 인해 실패할 때까지의 시간(밀리초)입니다.
- jobId: number | string - 기본적으로 작업 ID는 고유한 정수이지만, 이 설정을 사용하여 재정의할 수 있습니다. 이 옵션을 사용할 경우 jobId가 고유함을 확인하는 것은 사용자에게 달려있습니다. 기존 ID를 사용하여 작업을 추가하려고 하면 추가되지 않습니다.
- removeOnComplete: boolean | number - true일 경우 작업이 정상적으로 완료되면 삭제됩니다. 숫자는 유지할 작업의 양을 지정합니다. 기본 동작은 완료된 세트 내에 작업을 유지하는 것입니다.
- removeOnFail: boolean | number - true일 경우 모든 시도 후 작업이 실패하면 이 작업을 제거합니다. 숫자는 유지할 작업의 양을 지정합니다. 기본 동작은 실패한 세트 내의 작업을 보관 유지합니다.
- stackTraceLimit: number - 스택 트레이스에 기록되는 스택 트레이스 행의 양을 제한합니다.
interface RepeatOpts {
cron?: string; // Cron string
tz?: string; // Timezone
startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron).
endDate?: Date | string | number; // End date when the repeat job should stop repeating.
limit?: number; // Number of times the job should repeat at max.
every?: number; // Repeat every millis (cron setting cannot be used together with this setting.)
count?: number; // The start value for the repeat iteration count.
}
interface BackoffOpts {
type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.
delay: number; // Backoff delay, in milliseconds.
}
다음은 작업 옵션을 사용하여 작업을 사용자 정의하는 몇 가지의 예시입니다. 작업 시작을 지연시키려면 delay 속성을 사용합니다.
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 }, // 3 seconds delayed
);
큐의 오른쪽 끝에 작업을 추가하려면 LIFO 속성을 true로 설정합니다.
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
);
실패 시 재시도를 하는 설정은 attempts를 사용해야 합니다. 만약 특정 이유로 주문이 처리되지 않는 경우, 재시도를 할 수 있습니다. 작업이 처리되기 위해 2회 시도해야 함을 나타내는 attempts 옵션을 추가해 보겠습니다.
작업의 우선순위를 지정하려면 priority를 사용합니다.
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 },
);
또 다른 예시를 살펴보겠습니다. 가장 높은 우선순위는 1이며 사용하는 정수가 클수록 낮습니다. 아래의 경우 지불 금액에서 100을 초과하는 주문에 대해 높은 우선순위를 고려하고 금액이 같거나 낮은 주문에 대해서는 낮은 우선순위를 설정합니다.
Consumers
컨슈머는 큐에 추가된 작업을 처리하거나 큐에서 이벤트를 수신하거나 둘 다 수신하는 메서드를 정의하는 클래스입니다. 다음과 같이 @Processor() 데코레이터를 사용하여 컨슈머 클래스를 선언합니다.
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
여기서 데코레이터의 string 인수(ex audio)는 클래스 메서드와 연관 짓는 큐의 이름입니다. 컨슈머 클래스 내에서 핸들러 메서드를 @Process() 데코레이터로 데코레이션함으로써 작업 핸들러를 선언합니다.
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
await job.progress(progress);
}
return {};
}
}
데코레이터 메서드는 (예를 들어 transcode()) 큐에 처리할 작업이 있을 때마다 호출됩니다. 이 핸들러 메서드는 작업 개체를 유일한 인수로 받습니다. 핸들러 메서드에 의해 반환된 값은 작업 개체에 저장되며 나중에(예: 완료된 이벤트의 리스너에서) 액세스 할 수 있습니다.
작업 핸들러 메서드는 다음과 같이 특정 유형의 작업(특정 이름의 작업)만 처리하도록 지정할 수 있습니다. 특정 개인 사용자 클래스에 각 작업 유형(이름)에 대응하는 여러 @Process() 핸들러를 포함할 수 있습니다. 명명된 작업을 사용할 때는 각 이름에 대응하는 핸들러가 있어야 합니다.
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
Event Listeners
이벤트 리스너는 컨슈머 클래스(즉, @Processor() 데코레이터로 장식된 클래스) 내에서 선언해야 합니다. 이벤트를 파악하려면 이벤트 처리기를 선언해야 합니다. 예를 들어 작업이 큐에서 활성 상태가 되었을 때 방출되는 이벤트를 재생하려면 다음 구성을 사용합니다.
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
...
이벤트 핸들러는 대응하는 이벤트가 송신될 때마다 호출됩니다. 여기서는 로컬 이벤트 핸들러와 글로벌 이벤트 핸들러의 차이점에 대해 설명하겠습니다
Local event listeners | Global event listeners | Handler method signature / When fired |
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - An error occurred. error contains the triggering error. |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - A Job is waiting to be processed as soon as a worker is idling. jobId contains the id for the job that has entered this state. |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job) - Job job has started. |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job) - Job job has been marked as stalled. This is useful for debugging job workers that crash or pause the event loop. |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number) - Job job's progress was updated to value progress. |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) Job job successfully completed with a result result. |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error) Job job failed with reason err. |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler() The queue has been paused. |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job) The queue has been resumed. |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) Old jobs have been cleaned from the queue. jobs is an array of cleaned jobs, and type is the type of jobs cleaned. |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler() Emitted whenever the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed). |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job) Job job was successfully removed. |
로컬 버전으로 job을 수신하는 메서드는 글로벌 버전으로 jobId(번호)를 수신합니다. 이 경우 실제 job에 대한 참조를 얻으려면 Queue#getJob 메서드를 사용합니다. 이 콜은 대기하고 있기 때문에 핸들러는 비동기로 선언해야 합니다. Queue 객체에 액세스 하려면(getJob() 콜을 발신하기 위해) 당연히 큐 객체를 삽입해야 합니다. 또한 큐를 삽입하는 모듈에 큐를 등록해야 합니다.
Queue Management
큐에는 일시 중지 및 재개, 다양한 상태의 작업 수 검색 등의 관리 기능을 수행할 수 있는 API가 있습니다. 다음 일시 중지/재개 예시와 같이 큐 개체에서 직접 이러한 메서드 중 하나를 호출합니다.
pause() 메서드를 사용하여 큐를 일시 정지합니다. 일시 중지된 대기열은 재개될 때까지 새 작업을 처리하지 않지만 현재 처리 중인 작업은 완료될 때까지 계속됩니다.
await audioQueue.pause();
일시정지 큐를 재개하려면 다음과 같이 resume() 메서드를 사용합니다.
await audioQueue.resume();
Async Configuration
bull 옵션을 정적인 대신 비동기적으로 전달할 수 있습니다. 이 경우 비동기 설정을 처리하는 몇 가지 방법을 제공하는 forRootAsync() 메서드를 사용합니다. 마찬가지로 큐 옵션을 비동기적으로 전달하려면 registerQueAsync() 메서드를 사용합니다.
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
Factory를 활용하면, 다른 비동기 프로바이더와 마찬가지로 동작합니다. 주입을 통해 의존관계를 설정할 수 있습니다.
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
또는 useClass를 활용해서 설정할 수 있습니다.
BullModule.forRootAsync({
useClass: BullConfigService,
});
위의 구성에서는 BullModule 내의 BullConfigService를 인스턴스 화하고 createSharedConfiguration()을 호출하여 옵션 개체를 제공합니다. 이는 다음과 같이 BullConfigService가 SharedBullConfigurationFactory 인터페이스를 구현해야 함을 의미합니다.
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
BullModule 내에서 BullConfigService가 생성되지 않도록 하고 다른 모듈에서 Import 된 공급자를 사용하려면 useExisting 구문을 사용할 수 있습니다.
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
});
이 구성은 하나의 중요한 차이점을 가진 useClass와 동일하게 동작합니다. BullModule은 Import 된 모듈을 검색하여 새로운 ConfigService를 인스턴스 화하는 대신 기존 ConfigService를 재사용합니다.
마치며
앞으로도 팀의 발전을 돕는 개발자가 되기 위해 노력하려 합니다. 팀에 필요한 부분이 무엇일지 고민하면서, 팀에 도움이 된다면, 열심히 공부해서 실무에 적용할 수 있는 개발자가 되기 위해 노력하고 싶습니다. 팀의 성장에 기여할 수 있는 개발자가 되겠습니다.
참고 및 출처
'Project > 서버 개발' 카테고리의 다른 글
[Project] 프로젝트 삽질기12 (feat 직렬화) (0) | 2022.03.29 |
---|---|
[Project] 프로젝트 삽질기11 (feat pgAdmin4) (0) | 2022.03.28 |
[Project] 프로젝트 삽질기9 (feat Queue, bull) (1) | 2022.03.15 |
[Project] 프로젝트 삽질기8 (feat ormConfig) (0) | 2022.03.15 |
[Project] 프로젝트 삽질기7 (feat ORM 비교) (4) | 2022.03.13 |