Node.js로 만든 Worker Application으로 시계열 데이터 저장 & 실시간 스트리밍하기

2025년 8월 7일

들어가며

현재 진행중인 프로젝트(가상화폐 자동 거래 플랫폼)에서 티커 데이터를 실시간으로 받아와 저장하고, 이를 동시에 실시간으로 클라이언트에게 제공하는 기능을 구현할 필요가 생기게 되었다. 따라서 해당 기능을 수행하는 Worker를 구현하는 과정을 서술해보고자 한다.

대상 데이터 파악

저장이 필요한 데이터는 각 가상화폐 거래소에서 제공하는 티커들의 OHLCV1 데이터로, 대부분의 거래소에서 WebSocket을 통한 실시간 데이터 전송을 지원하기에 이를 활용하기로 했다.

데이터 받아오기

거래소에서 제공하는 WebSocket 데이터를 받아오기 위해서는 먼저 WebSocket 객체를 생성하여 거래소에서 제공하는 URL로 연결을 수립해야 한다.

connect() {
    this.ws = new WebSocket(this.config.ws);
}

연결이 완료되면 message 이벤트 리스너를 통해 서버로부터 받은 메시지에 대한 처리를 수행하면 된다.

this.ws.on("message", (message: string) => this.onMessage(message));

이러한 방식으로 간단하게 실시간 데이터 처리를 구현할 수 있다. 하지만 현재 방식의 경우 여러개의 거래소를 지원하기 위해 불필요한 중복 코드가 양산된다는 단점이 있다. 따라서 모든 거래소에 필요한 공통 메소드를 담은 추상 클래스를 정의한 후, 각 거래소 구현체에서 이를 상속하여 거래소마다 필요한 기능을 구현할 수 있도록 수정하였다. 구현한 추상 클래스의 구조는 다음과 같다.

export abstract class Worker {
    protected config: WorkerConfig;
    protected exchangeId: string | null;
    protected ws: WebSocket | null;

    constructor(config: WorkerConfig) {
        this.config = config;
        this.exchangeId = null;
        this.ws = null;
    }

    async initialize() {
        logger.info(`Starting worker for exchange: $ {this.exchangeId}`);
        this.connect();
    }

    invalidate() { ... }

    async shutdown() {
        try {
            this.invalidate();

            if (this.ws) {
                logger.info("Closing WebSocket connection...");
                this.ws.close();
            }

            logger.info("Worker shutdown complete.");
        } catch(e) {
            logger.error(`Error during worker shutdown: ${e}`);
        }
    }

    connect() {
        this.ws = new WebSocket(this.config.ws);

        this.ws.on("open", () => {
            this.onOpen();
        });

        this.ws.on("message", (message: string) => this.onMessage(message));

        this.ws.on("error", (e: Error) => this.onError(e));

        this.ws.on("close", () => this.onClose());
    }

    onOpen() {
        logger.info("Connected to WebSocket.");
    }

    abstract onMessage(message: string) : void;

    onError(e: Error) {
        logger.error(`WebSocket error: ${e}`);
        if (this.ws) this.ws.close();
    }

    onClose() {
        logger.warn(`WebSocket connection closed. Reconnecting...`);

        this.invalidate();
        this.reconnect();
    }

    reconnect() { ... }
}

이를 상속받은 특정 거래소(ex. Bybit)의 구현부는 다음과 같다.

onOpen() {
    super.onOpen();
    logger.info(`Subscribing symbols: ${this.config.symbols}`);
    this.ws?.send(JSON.stringify({ op: "subscribe", args: this.config.symbols.map((s) => `kline.1.${s}`) }));
}

async onMessage(message: string) {
    const parsed = JSON.parse(message);
    if (parsed.success && parsed.op === "subscribe") {
        logger.info("Successfully subscribed symbols.");
    } else if (parsed.topic && parsed.topic.startsWith("kline")) {
        const symbol = parsed.topic.split(".")[2];
        const data = parsed.data[0];
        const candleData: CandleData = {
            exchangeId: this.exchangeId ! ,
            symbolId: symbol,
            open: parseFloat(data.open),
            high: parseFloat(data.high),
            low: parseFloat(data.low),
            close: parseFloat(data.close),
            volume: parseFloat(data.volume),
            completed: data.confirm,
            timestamp: data.start
        };

        ...
    } else if (parsed.success && parsed.ret_msg === "pong") {
        logger.debug("Received pong response.");
    }
}

해당 코드에 대해서 간단히 설명하자면, 먼저 onOpen 메소드에서 티커 구독 요청을 담은 Payload를 거래소에 보낸다. 이후 onMessage 메소드에서 앞서 요청한 티커 데이터에 대한 처리를 수행하는데, 추후에 서술할 데이터 저장을 위한 전처리 단계라고 볼 수 있다. 추가로 해당 코드에는 나와있지 않지만 WebSocket 연결이 지속적으로 유지될 수 있도록 keepalive 패킷도 특정 interval마다 보내도록 구현하였다.

데이터 실시간 스트리밍 구현

메시지 큐 시스템 선택

이제 WebSocket에서 실시간으로 받아온 데이터를 클라이언트에 뿌려주는 기능을 구현해야 한다. 이를 위해서는 메시지 큐 시스템을 사용해야 하는데, 대표적으로 Apache Kafka, RabitMQ, Redis가 있다. 결론부터 말하자면 Redis Pub/Sub를 사용하여 구현하였는데, Redis를 선택한 이유를 나름대로 정리해보면 다음과 같다.

항목Apache KafkaRabbitMQRedis Pub/Sub
지연 시간낮음보통매우 낮음
처리량매우 높음보통높음
메시지 영속성OOX
운영 복잡도높음보통낮음

실시간으로 거래에 반영되야 하는 지표이기에 지연 시간과 처리량을 가장 중요한 척도로 보았는데 이 경우 Apache Kafka와 Redis Pub/Sub가 가장 적절했다. 하지만 두 시스템의 경우 메시지 영속성과 운영 복잡도 부분에서 차이가 존재했다.

메시지 영속성의 경우 우리 시스템에서는 해당 데이터(가격 데이터)가 실시간이 아니면 애초에 의미가 없을뿐만 아니라 데이터를 가공하여 별도의 DB(InfluxDB)에 저장할 계획이므로 중요하지 않은 부분이라고 판단했다.

운영 복잡도의 경우 Apache Kafka에 비해 Redis가 월등히 낮았는데, 이미 기존 서비스에서 Redis를 사용 중이기도 했고 Apache Kafka의 경우 지금 당장 도입하기에는 너무 과하다고 판단하여 최종적으로 Redis Pub/Sub로 구현하기로 결정했다.

Redis Pub/Sub 구현

사용할 메시지 큐 시스템을 정했다면 이제 기존 WebSocket 메시지 처리 구현부에 해당 메시지를 Publish하는 기능을 추가한다. Redis 라이브러리는 TypeScript 지원이 잘 되어있는 ioredis를 사용하였다.

이 글을 작성하는 시점에서 다시 찾아보니 신규 프로젝트의 경우 node-redis 라이브러리 사용을 권장하고 있다.

단순히 앞서 가공한 데이터를 stringify하여 Redis 인스턴스에 publish 해주면 된다. 이때 주의해야할 점은 기존에 사용하던 Redis Client는 재사용할 수 없다는 것이다. 그렇기에 Publish용 별도의 Redis 인스턴스를 생성하여 이를 통해 데이터를 보내주었다.

const channel = "ticker:" + this.exchangeId + "_" + symbol;

this.redisPublisher.publish(channel, JSON.stringify(candleData)).then((n) => {
    logger.debug(`[Redis] [${symbol}] Published data to ${n} subscriber(s).`);
}).catch((e) => logger.error(`[Redis] Unable to publish data: ${e}`));

실제로 구현이 완료된 것을 확인해 보면 다음과 같이 데이터가 잘 들어오는 것을 볼 수 있다.

1754654317955-redispubsubavif

데이터 저장

데이터 실시간 스트리밍까지 구현했다면 이제 이 데이터들을 효율적으로 저장해야 한다. 우리가 저장하고자 하는 데이터는 티커의 가격 데이터로 시계열 데이터에 해당한다. 해당 데이터의 경우 매우 잦은 읽기/쓰기, Aggregation, TTL 기능이 수반되어야 하기에 기존 DB(RDBMS, NoSQL)가 아니라 시계열 데이터 저장에 특화된 구조를 가진 InfluxDB를 사용하기로 했다.

데이터 저장 구현

데이터 저장 구현의 경우 InfluxDB에서 제공하는 JavaScript 라이브러리를 사용하여 기존 Worker 로직에 추가하는 방식으로 구현하였다. 앞서 구현한 Redis Publish 채널을 구독하여 데이터를 받아 이를 InfluxDB에 삽입하는 방식으로 구현하였는데, 굳이 Redis를 경유하여 데이터를 처리한 이유는 다음과 같다.

  1. 거래소마다 데이터 형식이 다르기 때문에 이미 전처리된 데이터를 DB에 바로 삽입하기 위함
  2. 각 거래소 Worker마다 코드를 작성하면 불필요한 코드 중복이 생겨 공통 로직으로 처리하기 위함

이를 구현한 코드는 다음과 같다.

async startProcessor() {
    this.redisSubscriber.on("pmessage", (pattern: string, channel: string, message: string) => {
        const data = JSON.parse(message);
        if (!data.completed) return;

        const point = convertCandleDataToPoint(data);
        this.writeApi.writePoint(point);

        this.processedSymbols.add(data.symbolId);

        // Flush data once reaches predefined symbols count
        if (this.processedSymbols.size >= this.config.symbols.length && !this.isFlushing) this.flushData();
    });

    // Fallback flush logic
    this.intervalId = setInterval(async () => {
        if (this.processedSymbols.size && !this.isFlushing) this.flushData();
    }, 60 * 1000); // 1 minute interval

    await this.redisSubscriber.psubscribe("ticker:*");
}

private async flushData() {
    if (this.isFlushing) return;
    this.isFlushing = true;

    try {
        await this.writeApi.flush();
        logger.info(`Wrote data for symbols (${this.processedSymbols.size}): ${[...this.processedSymbols].join(", ")}`);
        this.processedSymbols.clear(); // Reset symbol set after flushing
    } catch (e) {
        logger.error(`Failed to write data: ${e}`);
    } finally {
        this.isFlushing = false;
    }
}

해당 코드의 경우 pmessage이벤트 리스너를 통해 들어온 데이터를 InfluxDB에 맞는 형식으로 변환한 뒤 writeApi.writePoint() 메소드를 호출하여 InfluxDB에 삽입한다. 이때 writePoint() 메소드의 경우 호출 시 데이터가 바로 DB에 반영되는 것이 아니라 내부 버퍼에 데이터가 저장된 후, 이후 flush 과정을 거쳐 DB에 반영된다.

여기서 InfluxDB 내부 로직에 따라 자동으로 flush가 이루어지지만, 보다 빠른 반영을 위해 자체 규칙에 따라 임의로 flushData() 메소드를 호출하여 즉시 데이터가 DB에 반영되도록 구현하였다.

이제 결과를 확인해보면 아래와 같이 데이터가 정상적으로 쌓이고 있음을 알 수 있다.

influx-webui.png

마무리

이렇게 시계열 데이터를 받아와서 저장하고 실시간으로 스트리밍해주는 기능을 구현해 보았는데, 정상적으로 작동은 되긴 하나 작업하면서도 많은 개선의 여지가 있음을 느꼈다. 추후에 개선할 수 있는 사항 몇 가지를 정리하면서 글을 마치고자 한다.

  1. Worker Health Check 기능 (via Redis Hash)
  2. WebSocket 연결 해제 시 retry 로직 (Exponential Backoff)
  3. Sentry 연동
  4. Containerize & Auto-scaling

Footnotes

  1. Open, High, Low, Close, Volume: 시가, 고가, 저가, 종가, 거래량

댓글