본문 바로가기

[Project] 프로젝트 삽질기52 (feat 검색 로그 구축 2)

어가며

저번 글에서 검색 로그 구축을 위해 필요한 사전 지식을 정리했다면, 이번 글에서는 검색 로그 구축을 어떻게 할 수 있는지와 관련된 내용을 정리해보려 합니다. 바로 시작하겠습니다. 

 

 

 

 

 

 

 


 

 

 

 

 

 

Kinesis Streams

데이터 수집 아키텍처 위와 같이 구성했습니다. 그럼 먼저 Kinesis Streams, Kinesis Firehose를 구축하고, EC2에 연결하여 데이터를 전송하는 부분에 대해 알아보겠습니다.  

 

 

 

 

먼저 AWS에서 검색창에 Kinesis라고 검색하여 들어가면 위와 같은 화면이 나올 것입니다. 여기서 데이터 스트림 생성 버튼을 클릭합니다.

 

 

 

그 후 data stream 이름을 설정합니다. 저는 검색 로그를 저장할 것이기 때문에 search_log라고 입력하고, 용량 모드는 온디멘드를 선택했습니다. 만약 프로비저닝을 선택하면 프로비저닝 된 샤드 수를 입력하면 됩니다. 검색 기능은 언제 어떻게 사용자가 활용할지 예측이 불가능하기 때문에 온디맨드로 설정하고 다음으로 넘어갔습니다.

 

 

 

 

 

모두 입력이 완료됐다면 데이터 스트림 생성 버튼을 클릭합니다. 버튼 클릭 후, 생성된 kinesis stream의 status가 active 되면 kinesis data streams를 사용할 준비가 완료됐습니다. 

 

 

 

Kinesis Firehose

그럼 다음으로는 Kinesis Data Firehose를 이용해서 S3에 데이터를 전송하기 위해, Kinesis Firehose 구성해보겠습니다. 

 

 

 

먼저 Kinesis 콘솔 화면으로 들어가 Kinesis Data Firehose 서비스의 전송 스트림 생성 버튼을 클릭합니다.

 

 

 

소스는 Amazon Kinesis Data Streams, 대상은 Amazon S3를 선택해 줍니다.

 

 

 

 

소스는 위에서 생성했던 Kinesis Data Streams를 연결해야 하기 때문에 찾아보기 버튼을 클릭해 줍니다.

 

그 후 방금 생성했던 Kinesis Data Streams 이름인 search_log를 선택해 줍니다.

 

 

 

그 후엔, Firehose와 연결할 S3 설정을 해야 하는데요. S3 버킷에는 여러분들이 데이터를 저장할 S3 버킷 이름을 설정해 주시면 됩니다. 그리고 S3 버킷 접두사, S3 버킷 오류 출력 접두사는 아래와 같이 작성해 주시면 됩니다. 

 

 

 

S3 버킷 접두사는 

json-data-search/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

 

S3 버킷 오류 출력 접두사는

 

error-json/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}

 

 

위와 같이 구성했습니다. 이렇게 구성하면, 로그 데이터가 데이터가 연, 월, 일, 시간 단위 별로 저장되며, 에러가 발생할 경우에도 연, 월, 일, 시간 단위 별로 로그가 저장됩니다. 

 

 

 

 

그리고 힌트, 압축 및 암호화 버퍼 토글을 클릭하면 위와 같이 나오는데요. 저는 버퍼 크기를 10 MiB, 버퍼 간격을 300초로 설정했습니다. 검색 로그를 완벽히 실시간으로 저장하지 않고, 준실시간성을 구성해도 괜찮겠다고 생각하여, 적당한 크기, 적당한 시간이 차오르고 지나면 데이터를 S3로 전달하도록 구성했습니다.  

 

 

 

이렇게 구성했으면, Kinesis Data Streams, Kinesis Data Firehose, S3까지 모두 생성을 완료했습니다. 그럼 이제 Kinesis Data Streams에 데이터를 전송하여, 전달한 데이터가 S3에 저장되는 과정을 구현해 보겠습니다. 

 

 

 

 

 

 

 


 

 

 

 

 

NestJS

NestJS에서 Kinesis Data Streams에 데이터를 추가하는 간단한 코드를 예시로 작성했습니다. aws-sdk를 활용하여 Kinesis를 활용하는 방식을 선택해서 활용했습니다. 

 

 

import { Injectable } from '@nestjs/common';
import Kinesis from 'aws-sdk/clients/kinesis';

import { ISearchLog } from '@apps/common-config/src/logging/interface/ISearchLog';

@Injectable()
export class SearchLogKinesisService
{
  private kinesis: Kinesis;

  constructor() {
    this.kinesis = new Kinesis({
      region: process.env.REGION,
      accessKeyId: process.env.ACCESS_KEY_ID,
      secretAccessKey: process.env.SECRET_ACCESS_KEY,
    };);
  }

  public async putData(log: ISearchLog): Promise<void> {
    const params = {
      Data: JSON.stringify(log),
      PartitionKey: String(log.body.userId),
      StreamName: 'searchLog',
    };

    try {
      await this.kinesis.putRecord(params).promise();
    } catch (error) {
      throw error;
    }
  }
}

 

 

SearchLogKinesisService 클래스 안에 Kinesis를 주입받고, putData 메서드를 활용해서 데이터를 추가합니다. params의 Data 프로퍼티에 데이터를 직렬화하여 전달하고, PartitionKey는 임시로 userId를 활용하여 파티셔닝 할 수 있도록 설정했습니다. StreamName은 Data Streams의 이름을 작성했습니다. 

 

 

 

위와 같이 구성하여 AWS SDK를 활용하여 데이터를 전달하면, 정상적으로 S3에 데이터가 위와 같이 데이터가 폴더별로 나뉘어 데이터가 저장되는 것을 보실 수 있을 겁니다. 

 

단 데이터를 저장할 때, 추가적으로 데이터를 가공해서 저장해야 할 필요가 있었습니다. 즉 서버 내에서 Kinesis로 데이터를 전달하면, Firehose에서 lambda를 연결하여 lambda에서 데이터를 가공한 후 S3로 데이터를 저장하는 프로세스를 구축했습니다. 

 

그럼 어떻게 Firehose와 lambda를 연결했는지에 대해 알아보겠습니다. 

 

 

 

 

 

 


 

 

 

 

Lambda

Kinesis Firehose에 lambda를 연결해서 데이터를 변환하는 작업을 처리해 보겠습니다.

 

 

 

 

 

Firehose를 생성할 때, AWS Lambda를 사용하여 소스 레코드 변형이라는 부분의 데이터 변환 활성화 버튼을 클릭하면, AWS Lambda 함수를 연결할 수 있는 화면이 나옵니다. 여기서 함수를 생성해도 되고, 직접 Lambda를 생성해서 연결해 줘도 됩니다.

 

저는 직접 Lambda를 생성해서 연결해 주는 방식으로 Firehose와 Lambda를 연결해 보겠습니다. 

 

 

'use strict';

module.exports.handler = async (event, context) => {
  try {
    const records = event.records;
    const transformedRecords = records.map((record) => {
      const payload = Buffer.from(record.data, 'base64').toString('utf-8');
      const transformedPayload = transformPayload(payload);
      const flattenedPayload = flattenPayload(transformedPayload);

      // 라인 딜리미터(\n) 추가
      const flatOutputWithDelimiter = JSON.stringify(flattenedPayload) + '\n';
      const base64Output = Buffer.from(flatOutputWithDelimiter).toString(
        'base64'
      );

      return {
        recordId: record.recordId,
        result: 'Ok',
        data: base64Output,
      };
    });

    // 변환된 레코드 반환
    return { records: transformedRecords };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
};

function transformPayload(payload) {
  const jsonData = JSON.parse(payload);

  const transformedPayload = {
    publishTime: jsonData.publishTime,
    body: jsonData.body
  };

  return transformedPayload;
}

function flattenPayload(payload) {
  const flattenedPayload = {
    publishTime: payload.publishTime,
    userId: payload.body.userId,
    keyword: payload.body.keyword
  };

  return flattenedPayload;
}

 

 

위와 같이 lambda를 구성했습니다. event의 records 프로퍼티에 로그 데이터가 존재하고, 여러 개의 로그 데이터 변환을 처리하기 위해 로직을 구성했습니다. record를 payload로 변환하고, payload를 transformPayload 함수를 활용하여 객체로 변환했습니다. 그 후 flattenPayload 함수를 활용하여 데이터를 변환한 후, 변환한 데이터를 return 처리했습니다.

 

이 함수를 Firehose에 연결하여 데이터를 전송하면 

 

 

{"publishTime":"2023-09-17T16:28:42.234","userId":"test","keyword":"test"}
{"publishTime":"2023-09-17T16:28:42.354","userId":"test","keyword":"test"}

 

 

위와 같이 데이터가 S3에 저장되는 것을 볼 수 있습니다. 

 

 

 

 

 

 


 

 

 

 

 

 

 

 

 

마치며

지금까지 로그 데이터를 적재하는 과정을 알아봤습니다. 다음 글에서는 S3에 저장된 데이터를 AWS Glue, AWS Athena를 활용해서 데이터를 출력해 보는 작업을 해보겠습니다. 

 

 

 

 

 

 

 


 

 

 

 

 

출처

 

Workshop Studio

 

catalog.us-east-1.prod.workshops.aws