こんにちはSREチームの宮後(@miya10kei)です。最近、7.1がリリースされて攻略に勤しんでいます⚔️
今回はPark Directの非同期タスク処理に対するメトリクスの収集について紹介します!
背景
Park Directでは非同期タスク処理にCeleryを採用し次の構成で利用しています。
機能拡張とサービス成長に伴い非同期タスクの量が日に日に増加しており、キュー詰まり(タスク登録数に処理が追いついていない)のような事象が起きていました。しかし、当時はタスクの処理件数や処理速度、キューにタスクが滞留していた時間といった基本的な情報を計測できていなかったため、正確には原因を把握できずにいました。
そこで、まずは実態を把握するために、必要なメトリクスを収集することにしました。
計測する情報
今回は次の4つのメトリクスを計測することにしました。
# | 計測する情報 | メトリクス名 | タグ | タイプ |
---|---|---|---|---|
1 | キュー内に滞留しているタスク件数 | worker.queue | - | Count |
2 | キューにタスクが滞留していた時間 | worker.queue.duration | task_name | Distribution |
3 | タスクの処理数件 | worker.tasks.count | task_name task_status | Count |
4 | タスクの処理時間 | worker.tasks.duration | task_name task_status | Distribution |
計測方法
Celeryには「Flower」というモニタリングツールがあります。このツールはブローカーと接続するだけで、様々な情報を簡単に取得できる便利な機能を提供しています。
しかし、キューにタスクが滞留していた時間など、ほしい情報を取得できないため、今回は採用を見送り、自前で実装することにしました。
メトリクス化する上での、基本的な方針は以下の2つになります
- Custome MetricsとしてDatadogに直接メトリクスを転送する
- 必要な情報をログ出力し、Datadogの「Generate Metrics from Ingested Logs」でメトリクス化する
それでは、前述の4メトリクスの計測方法をそれぞれ紹介します
1.キュー内に滞留しているタスク件数の計測
キュー内に滞留しているタスク件数はCeleryが利用するキュー(List型)のサイズを定期的に取得することで計測しました。EventBridgeからLambdaを定期実行し、取得したキューサイズをDatadogに送ることで実現しています。
実行しているLambdaのコードは以下になります。
import os import datadog_lambda.metric as metric import redis def lambda_handler(event, context): env = os.getenv("TARGET_ENV") redis_host = os.getenv("REDIS_HOST") redis_port = os.getenv("REDIS_PORT") redis_db = os.getenv("REDIS_DB") r = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db) metric_value = float(r.llen("celery")) metric.lambda_metric( metric_name="worker.queues", value=metric_value, ) # メトリクス転送前にLambdaが終了するのを防止するためFlushする metric.lambda_stats.flush()
2.キューにタスクが滞留していた時間の計測
キューにタスクが滞留していた時間は、CeleryClient側でタスクデータのヘッダーに送信タイムスタンプを設定し、Celery Worker側で「受信タイムスタンプ - 送信タイムスタンプ」を計算することで算出しています。
Celery Client側の実装は以下になります。すべてのタスク登録時にタイムスタンプを設定するため、celery.app.task.Taskのapply_asyncを拡張することで実現しました。
import time from celery import Celery, Task class CustomTask(Task): def apply_async(self, *args, **kwargs): kwargs["headers"] = {"sent_timestamp_ms": round(time.time() * 1000)} return super().apply_async(*args, **kwargs) app = Celery("Task") app.Task = CustomTask # --- hello.apply_aync()
Celery Worker側の実装は以下になります。すべてのタスク処理で共通の処理をおこないたいため、デコレータで実現しました。Celery Client側で設定したヘッダー情報は current_task.request から取得することができます。
import logging import time from functools import wraps from celery import current_task logger = logging.getLogger(__name__) def task_execution_log(func): @wraps(func) def wrapper(*args, **kwargs): task_request = current_task.request # 送信タイムスタンプ sent_timestamp_ms = task_request.sent_timestamp_ms # 受信タイムスタンプ received_timestamp_ms = round(time.time() * 1000) # キューにタスクが滞留していた時間 staying_time_ms = received_timestamp_ms - sent_timestamp_ms logger.info( f"Task {func.__name__} --- Start (staying_time_ms={staying_time_ms})" ) return func(*args, **kwargs) return wrapper # --- @task_execution_log def hello(): ...
前述の実装で次のログが出力されるようになるので、これをDatadogでパースしメトリクス化しています。
Task hello--- Start (staying_time_ms=10)
3.タスクの処理数件と処理時間の計測
タスクの処理件数はタスク終了時にログを出力することで計測し、処理時間は「処理終了タイムスタンプ - 受信タイムスタンプ」を計算することで計測しました。
実装は以下となり、前述と同じデコレーターに実装しています。(以下は、処理件数と処理時間の計測に関連のある実装のみを記載しています。)
def task_execution_log(func): @wraps(func) def wrapper(*args, **kwargs): # 受信タイムスタンプ received_timestamp_ms = round(time.time() * 1000) has_error = False try: response = func(*args, **kwargs) except Exception as e: has_error = True raise e finally: # 処理時間 proc_time_ms = round(time.time() * 1000) - received_timestamp_ms if has_error: _logger.error( f"Task {func.__name__} --- Abnormal End (proc_time_ms={proc_time_ms})" ) else: _logger.info( f"Task {func.__name__} --- Normal End (proc_time_ms={proc_time_ms})" ) return response return wrapper # --- @task_execution_log def hello(): ...
前述の実装で次のログが出力されるようになるので、これをDatadogでパースしメトリクス化しています。
Task hello--- Normal End (proc_time_ms=10) Task hello--- Abnormal End (proc_time_ms=10)
さいごに
最後にDatadogでの可視化結果を紹介して終わりたいと思います。
以前に「CeleryのMessage Priorities機能を利用した処理遅延の低減」で紹介したタスク優先度のメトリクスと合わせて、次のように可視化をしています。
非同期タスクのメトリクスを可視化したことで、キュー詰まりやボトルネックとなっているタスクを一目で確認できるようになり、日々の運用にとても役立っています。
非同期タスクの状態を手軽に可視化したくなったら参考にしてみてください!