非同期タスクの実行パラメータをAthenaで検索可能にする 〜障害調査を容易にする実装方法〜

こんにちはSREチームの宮後(@miya10kei)です。最近、デスクシェルフを買ってからデスク周りがとてもスッキリして大満足しています😏

前回に続いて非同期タスク関連の取り組みを紹介します!

ところで、、、障害が発生したときの調査って大変ですよね?非同期で実行される処理なんかは特に原因の特定が難しいと思っています💦

そこで、今回は障害時の調査用に非同期タスクの実行パラメータを永続化し、Athenaから検索できるようにした仕組みを紹介します。

背景

前回紹介した「非同期タスクのメトリクス収集術」で非同期タスクの状態を確認できるようになりました。しかし、障害時にどんなパラメータで実行されたのかといった詳細な情報まではまだ把握できません。エラーログにしっかりと必要な情報を出力できていれば良いのですが、なかなかそうはなっていないのが現状です、、、。

また、実行パラメータには場合によっては個人情報が含まれるケースがあるため、全員が閲覧できる場所においそれと出力することもできません。出力して良いものと悪いものを精査するのにも非常に時間がかかります。

そこで、今回はすべての非同期タスクの実行パラメータを通常のログ出力先とは別の場所に永続化し、特定の人だけが検索できるようにすることで障害調査の容易性とセキュリティのバランスを取ることにしました。

全体構成

Park Directでは通常のアプリケーションログはFluent BitでDatadogに転送し、開発者なら誰でも検索できるようになっています。

しかし、今回は以下の課題があったため、S3に永続化しAthenaから検索する構成としました。

  1. 実行パラメータログには個人情報が含まれるケースがある
  2. Datadogには個人情報を転送しない方針となっている
    1. 現状、ログ種別毎に閲覧者のアクセス制限をかける運用になっていないため
  3. パラメータログは大量に出力されるため、Datadogに転送するとコストがかかりすぎてしまう

DatadogとS3のどちらにログを転送するかはFluent Bitで振り分けるようにしています。

また、非同期タスクをキューに登録できなかった場合や、実行パラメータログと併せて処理結果も出力したいという事情なども考慮し、Celery ClientとWorkerの両方で実行パラメータログを出力するようにしました。

実現方法

それでは、具体的な実現方法を紹介していきます。今回はアプリケーション、Fluent Bit、S3、Athenaと複数の箇所に手をいれているため順番に説明していきます。

アプリケーションの実装

Celery Client側の実装

Celery Client側では以下の項目を実行パラメータログとして出力しています。

項目 説明
time 出力時刻
type Celery Clientのログであることを表す識別子
module タスク名
params タスクに渡されるパラメータ
tasks_id Celeryが払い出すタスクのID

実装は以下になります。すべての非同期タスク登録時に実行パラメータログを出力するため、celery.app.task.Taskapply_asyncを拡張することで実現しました。

import json
import logging
from datetime import datetime

import pytz
from celery import Celery, Task

_logger_client = logging.getLogger("celery_param_log_client")


class CustomTask(Task):

    def apply_async(self, *args, **kwargs):
        result = None
        try:
            result = super().apply_async(*args, **kwargs)
        finally:
            task_json = {
                "time": f"{datetime.now(pytz.timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S.%f')}",
                "type": "client",
                "module": f"{self.__module__}.{self.__name__}",
                "params": {"args": f"{args}", "kwargs": f"{kwargs}"},
                "task_id": f"{result}",
            }
            _logger_client.info(json.dumps(task_json))
        return result


app = Celery("Task")
app.Task = CustomTask

# ---

hello.apply_aync()

Celery Worker側の実装

Celery Worker側では以下の項目を実行パラメータログとして出力しています。

項目 説明
time 出力時刻
type Celery Workerのログであることを表す識別子
module タスク名
params タスクに渡されるパラメータ
tasks_id Celeryが払い出すタスクのID
result 処理結果

実装は以下になります。すべての非同期タスク処理で共通の処理をおこないたいため、デコレータで実現しました。

import json
import logging

from datetime import datetime
from functools import wraps

import pytz


def task_execution_log(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
        task_request = current_task.request
        has_error = False
        try:
            response = func(*args, **kwargs)
            _logger.info(response)
        except Exception as e:
            has_error = True
            raise e
        finally:
            task_json = {
                "time": f"{datetime.now(pytz.timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S.%f')}",
                "type": "worker",
                "task_id": f"{task_request.id}",
                "module": f"{task_request.task}",
                "result": "N/A",
                "params": {"args": f"{args}", "kwargs": f"{kwargs}"},
            }
            if has_error:
                task_json["result"] = "Failure"
            else:
                task_json["result"] = "Success"
            _logger_worker.info(json.dumps(task_json))
        return response

    return wrapper

# ---

@task_execution_log
def hello():

Fluent Bitの設定

Fluent Bitを以下のように設定することで実行パラメータログのみをS3に転送するようにしています。(実行パラメータログに関わる部分のみを抜粋しています)

# FireLenseの仕様で16KB以上のログは複数行に分割されるため、マルチライン処理をおこなう
[FILTER]
    name                  multiline
    match                 *
    multiline.key_content log
    mode                  partial_message

# CeleryClientから出力されるClientパラメータログのタグをcelery-param-log-clientに変更
[FILTER]
    Name          rewrite_tag
    Match         client-firelens-*
    Rule          log (^.*\"type\":\s\"client\".*$) celery-param-log-client false

# CeleryWorkerから出力されるWorkerパラメータログのタグをcelery-param-log-workerに変更
[FILTER]
    Name          rewrite_tag
    Match         worker-firelens-*
    Rule          log (^.*\"type\":\s\"worker\".*$) celery-param-log-worker false

# celery-param-log-clientとcelery-param-log-workerタグのログをS3に転送
[OUTPUT]
    Name s3
    Match celery-param-log-*
    region ap-northeast-1
    bucket ${バケット名}
    s3_key_format /$TAG[0]/year=%Y/month=%m/day=%d/%H%M%S-$UUID.gz
    upload_timeout 1m
    compression gzip
    use_put_object On

S3の設定

前述の設定でS3には実行パラメータログが以下の構成で保存されます。

バケット名
├── celery-param-log-client
│   └──year=yyyy
│       └──month=MM
│           └──day=dd
│               ├──HHmmSS-xxxxxx.gz
│               ├──HHmmSS-xxxxxx.gz
│               └──HHmmSS-xxxxxx.gz
└── celery-param-log-worker
    └──year=yyyy
        └──month=MM
            └──day=dd
                ├──HHmmSS-xxxxxx.gz
                ├──HHmmSS-xxxxxx.gz
                └──HHmmSS-xxxxxx.gz

本バケットの閲覧者を制限する必要があるため、S3バケットに以下のポリシーを設定しています。

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Effect": "Deny",
      "Principal": "*",
      "Action": "s3:*",
      "Resource": [
        "arn:aws:s3:::{バケット名}",
        "arn:aws:s3:::{バケット名}/*"
      ],
      "Condition": {
        "ArnNotLike": {
          "aws:PrincipalArn": [
            "arn:aws:iam::*:role/aws-reserved/sso.amazonaws.com/*/AWSReservedSSO_{ロール名}_*"
          ]
        }
      }
    }
  ]
}

Athenaの設定

AthenaからS3内のデータを検索できるようにするために以下のDDLでテーブルを作成します。また、パーティションを自動読み込みするためにyear/month/dayに対してPartition Projectionを設定しています。(以下はCeleryWorker側の例です)

CREATE EXTERNAL TABLE `worker_log`(
  `time` timestamp COMMENT 'from deserializer', 
  `type` string COMMENT 'from deserializer', 
  `task_id` string COMMENT 'from deserializer', 
  `module` string COMMENT 'from deserializer', 
  `result` string COMMENT 'from deserializer', 
  `params` struct<args:string,kwargs:string> COMMENT 'from deserializer', 
  `source` string COMMENT 'from deserializer', 
  `container_id` string COMMENT 'from deserializer', 
  `container_name` string COMMENT 'from deserializer', 
  `ecs_cluster` string COMMENT 'from deserializer', 
  `ecs_task_arn` string COMMENT 'from deserializer', 
  `ecs_task_definition` string COMMENT 'from deserializer')
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  's3://{バケット名}/celery-param-log-worker'
TBLPROPERTIES (
  'classification'='json', 
  'projection.day.digits'='2', 
  'projection.day.range'='1,31', 
  'projection.day.type'='integer', 
  'projection.enabled'='true', 
  'projection.month.digits'='2', 
  'projection.month.range'='1,12', 
  'projection.month.type'='integer', 
  'projection.year.format'='yyyy', 
  'projection.year.interval'='1', 
  'projection.year.interval.unit'='YEARS', 
  'projection.year.range'='NOW-31DAYS,NOW+9HOUR', 
  'projection.year.type'='date', 
  'storage.location.template'='s3://{バケット名}/celery-param-log-worker/year=${year}/month=${month}/day=${day}/', 
)

ここまで実施することでA、thenaで次のSQLを発行することで、非同期タスクに渡された実行パラメータを確認することできるようになりました🎉🎉🎉

SELECT * FROM worker_log
WHERE year = '2024' AND month = '11' AND day = '19'

Tips: 非同期タスクのロストに対する監視

Athenaから実行パラメータログを検索できるようにしたことで、障害時の調査が容易におこなえるようになりました。一方で、もう一つ別の恩恵があったのでTipsとして紹介します。

Park Directでは大量の非同期タスクを日々捌いているのですが、ある時、一部の非同期タスクが処理されないままロストしているのではないか?という疑惑が生じました。

そんなとき、CeleryClientとCeleryWorkerの両方で実行パラメータログを取得していたことが功を奏し、Athenaから次のSQLを発行することでロストしたであろう非同期タスクの件数を把握することができました。

WITH current_jst AS (
    SELECT current_timestamp AT TIME ZONE 'Asia/Tokyo' - INTERVAL '1' DAY AS jst_timestamp
)

SELECT count(1) AS lost_count
FROM client_logd c
WHERE
    time IS NOT NULL
    AND (c.year, c.month, c.day) = (
        SELECT 
            CAST(EXTRACT(YEAR FROM jst_timestamp) AS varchar),
            LPAD(CAST(EXTRACT(MONTH FROM jst_timestamp) AS varchar), 2, '0'),
            LPAD(CAST(EXTRACT(DAY FROM jst_timestamp) AS varchar), 2, '0')
        FROM current_jst
    )
    AND NOT EXISTS (
        SELECT 1
        FROM worker_log w
        WHERE c.task_id = w.task_id
    );

弊社ではBIツールとしてRedashを使用しており、データソースとしてAthenaに指定することができます。Redashアラートとして上記SQLを定期実行しSlack通知することで、非同期タスクのロストに対して監視をおこなっています。

さいごに

今回は非同期タスクの実行パラメータログをAthenaから検索するための仕組みを紹介しました。

非同期タスクはスケーラビリティ向上やリソースを効率的に利用できるという面でメリットがありますが、その反面、障害時の調査が難しいというデメリットがあると思っています。
実行パラメータを永続化し、検索できるようにするだけで障害調査やリカバリー対応が格段に実施しやすくなります。

もし、同じことで困っている方がいたら、ぜひ参考にしてみてください!!!