非同期タスクのメトリクス収集術

こんにちはSREチームの宮後(@miya10kei)です。最近、7.1がリリースされて攻略に勤しんでいます⚔️

今回はPark Directの非同期タスク処理に対するメトリクスの収集について紹介します!

背景

Park Directでは非同期タスク処理にCeleryを採用し次の構成で利用しています。

非同期タスク処理の構成

機能拡張とサービス成長に伴い非同期タスクの量が日に日に増加しており、キュー詰まり(タスク登録数に処理が追いついていない)のような事象が起きていました。しかし、当時はタスクの処理件数や処理速度、キューにタスクが滞留していた時間といった基本的な情報を計測できていなかったため、正確には原因を把握できずにいました。

そこで、まずは実態を把握するために、必要なメトリクスを収集することにしました。

計測する情報

今回は次の4つのメトリクスを計測することにしました。

# 計測する情報 メトリクス名 タグ タイプ
1 キュー内に滞留しているタスク件数 worker.queue - Count
2 キューにタスクが滞留していた時間 worker.queue.duration task_name Distribution
3 タスクの処理数件 worker.tasks.count task_name task_status Count
4 タスクの処理時間 worker.tasks.duration task_name task_status Distribution

計測方法

Celeryには「Flower」というモニタリングツールがあります。このツールはブローカーと接続するだけで、様々な情報を簡単に取得できる便利な機能を提供しています。
しかし、キューにタスクが滞留していた時間など、ほしい情報を取得できないため、今回は採用を見送り、自前で実装することにしました。

メトリクス化する上での、基本的な方針は以下の2つになります

  • Custome MetricsとしてDatadogに直接メトリクスを転送する
  • 必要な情報をログ出力し、Datadogの「Generate Metrics from Ingested Logs」でメトリクス化する

それでは、前述の4メトリクスの計測方法をそれぞれ紹介します

1.キュー内に滞留しているタスク件数の計測

キュー内に滞留しているタスク件数はCeleryが利用するキュー(List型)のサイズを定期的に取得することで計測しました。EventBridgeからLambdaを定期実行し、取得したキューサイズをDatadogに送ることで実現しています。

実行しているLambdaのコードは以下になります。

import os

import datadog_lambda.metric as metric
import redis


def lambda_handler(event, context):
    env = os.getenv("TARGET_ENV")
    redis_host = os.getenv("REDIS_HOST")
    redis_port = os.getenv("REDIS_PORT")
    redis_db = os.getenv("REDIS_DB")

    r = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db)
    metric_value = float(r.llen("celery"))
    metric.lambda_metric(
        metric_name="worker.queues",
        value=metric_value,
    )
    # メトリクス転送前にLambdaが終了するのを防止するためFlushする
    metric.lambda_stats.flush()

2.キューにタスクが滞留していた時間の計測

キューにタスクが滞留していた時間は、CeleryClient側でタスクデータのヘッダーに送信タイムスタンプを設定し、Celery Worker側で「受信タイムスタンプ - 送信タイムスタンプ」を計算することで算出しています。

Celery Client側の実装は以下になります。すべてのタスク登録時にタイムスタンプを設定するため、celery.app.task.Taskapply_asyncを拡張することで実現しました。

import time

from celery import Celery, Task


class CustomTask(Task):

    def apply_async(self, *args, **kwargs):
        kwargs["headers"] = {"sent_timestamp_ms": round(time.time() * 1000)}
        return super().apply_async(*args, **kwargs)


app = Celery("Task")
app.Task = CustomTask

# ---

hello.apply_aync()

Celery Worker側の実装は以下になります。すべてのタスク処理で共通の処理をおこないたいため、デコレータで実現しました。Celery Client側で設定したヘッダー情報は current_task.request から取得することができます。

import logging
import time
from functools import wraps

from celery import current_task

logger = logging.getLogger(__name__)


def task_execution_log(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
        task_request = current_task.request

        # 送信タイムスタンプ
        sent_timestamp_ms = task_request.sent_timestamp_ms
        # 受信タイムスタンプ
        received_timestamp_ms = round(time.time() * 1000)
        # キューにタスクが滞留していた時間
        staying_time_ms = received_timestamp_ms - sent_timestamp_ms

        logger.info(
            f"Task {func.__name__} --- Start (staying_time_ms={staying_time_ms})"
        )

        return func(*args, **kwargs)

    return wrapper

# ---

@task_execution_log
def hello():
  ...

前述の実装で次のログが出力されるようになるので、これをDatadogでパースしメトリクス化しています。

Task hello--- Start (staying_time_ms=10)

3.タスクの処理数件と処理時間の計測

タスクの処理件数はタスク終了時にログを出力することで計測し、処理時間は「処理終了タイムスタンプ - 受信タイムスタンプ」を計算することで計測しました。

実装は以下となり、前述と同じデコレーターに実装しています。(以下は、処理件数と処理時間の計測に関連のある実装のみを記載しています。)

def task_execution_log(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
        # 受信タイムスタンプ
        received_timestamp_ms = round(time.time() * 1000)
        has_error = False
        try:
            response = func(*args, **kwargs)
        except Exception as e:
            has_error = True
            raise e
        finally:
            # 処理時間
            proc_time_ms = round(time.time() * 1000) - received_timestamp_ms
            if has_error:
                _logger.error(
                    f"Task {func.__name__} --- Abnormal End (proc_time_ms={proc_time_ms})"
                )
            else:
                _logger.info(
                    f"Task {func.__name__} --- Normal End (proc_time_ms={proc_time_ms})"
                )
        return response

    return wrapper


# ---
@task_execution_log
def hello():
  ...

前述の実装で次のログが出力されるようになるので、これをDatadogでパースしメトリクス化しています。

Task hello--- Normal End (proc_time_ms=10)
Task hello--- Abnormal End (proc_time_ms=10)

さいごに

最後にDatadogでの可視化結果を紹介して終わりたいと思います。

以前に「CeleryのMessage Priorities機能を利用した処理遅延の低減」で紹介したタスク優先度のメトリクスと合わせて、次のように可視化をしています。

非同期タスクのメトリクスを可視化したことで、キュー詰まりやボトルネックとなっているタスクを一目で確認できるようになり、日々の運用にとても役立っています。

非同期タスクの状態を手軽に可視化したくなったら参考にしてみてください!