CeleryのMessage Priorities機能を利用した処理遅延の低減

こんにちは、SREチームの宮後(@miya10kei)です。
バイクに乗っていて気持ちが良い季節になってきましたね🌸

メッセージキューを利用した非同期タスクを扱っていて、誰しも優先度順にタスクを処理したいなと思ったことがあるのではないでしょうか?

今回はCeleryの機能を利用して実現することができたので紹介したいと思います。

Celeryってなに?

Celeryは分散メッセージキュー機能を提供するPythonベースのOSSです。
メッセージキューのBrokerとしてRedisやRabbitMQ、Amazon SQSなどを使用でき、分散環境での非同期タスクの実行を実現しています。

公式サイトを引用すると次の説明になりますね。

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.
引用元:https://docs.celeryq.dev/en/stable/index.html#celery-distributed-task-queue

Celery自体はPythonで書かれていますが、他にもNode.jsやPHP向けのクライアントライブラリも提供されています。

Park DirectでのCeleryの使い方

Park Directでは非同期タスクの処理にCeleryを採用し次の構成で利用しています。

BrokerのバックエンドとしてElastiCache for Redisを使用しています。
Celery ClientとしてバックエンドのAPIとバッチからタスクを登録し、Celery Workerとして専用のECSタスクが処理をおこなう構成になっています。

課題

機能拡張とサービス成長に伴い、定期実行される処理や対象データが増加したことで短時間で大量のタスクがQueueに登録されはじめました。これにより、長い時は数時間単位でタスクが遅延してしまうこともあり事業に少なからず影響を及ぼしていました。

最初はECSタスクのオートスケーリングでスループットの改善を試みたものの、RDSの負荷を考慮するとスケールアウトにも限界があり別の対策を講じる必要がでてきました。

そこでタスクに優先度を設定する方法を検討し、CeleryのMessage Priorities機能を採用することにしました。

Message Priorities機能

それでは本題のMessage Priorities機能の話に移ります。

Celeryではタスクに対してPriorityを設定することで、処理順序を組み替える機能が提供されています。(Message Priorities機能

この機能を利用するとPriority毎に独立したQueueが作成されます。 ClientはPriorityを指定してタスクを登録することで該当するQueueにタスクを積むことができます。WorkerはよりPriortyの高いQueueから優先してタスクを取得し処理がおこなわれるようになります。

ClientとWorkerのサンプル実装は次のようになります。

Worker

from celery import Celery

app = Celery("task", broker="redis://localhost/0")
app.conf.broker_transport_options = {
    "priority_steps": list(range(3)),  # priorityを3つに区切る設定
    "sep": ":",
    "queue_order_strategy": "priority",
}


@app.task
def hello(name: str):
    print(f"Hello ${name}")

Client

from task import hello

for i in range(10):
    hello.apply_async(kwargs={"name": "miya10kei(2)"}, priority=2)

for i in range(10):
    hello.apply_async(kwargs={"name": "miya10kei(1)"}, priority=1)

for i in range(10):
    hello.apply_async(kwargs={"name": "miya10kei(0)"}, priority=0)

これを実行すると次のような出力となりPriorityの小さいタスクが優先して処理されることが確認できます。

[2024-03-27 20:26:58,723: WARNING/ForkPoolWorker-1] Hello $miya10kei(2)
[2024-03-27 20:26:59,730: WARNING/ForkPoolWorker-1] Hello $miya10kei(2)
[2024-03-27 20:27:02,741: WARNING/ForkPoolWorker-1] Hello $miya10kei(0)
[2024-03-27 20:27:03,749: WARNING/ForkPoolWorker-1] Hello $miya10kei(0)
...
[2024-03-27 20:27:11,794: WARNING/ForkPoolWorker-1] Hello $miya10kei(0)
[2024-03-27 20:27:12,797: WARNING/ForkPoolWorker-1] Hello $miya10kei(1)
[2024-03-27 20:27:13,805: WARNING/ForkPoolWorker-1] Hello $miya10kei(1)
...
[2024-03-27 20:27:21,855: WARNING/ForkPoolWorker-1] Hello $miya10kei(1)
[2024-03-27 20:27:22,861: WARNING/ForkPoolWorker-1] Hello $miya10kei(2)
[2024-03-27 20:27:23,868: WARNING/ForkPoolWorker-1] Hello $miya10kei(2)
...
[2024-03-27 20:27:27,891: WARNING/ForkPoolWorker-1] Hello $miya10kei(2)

Park Directでの使い方

Park Directでは以下の3種類のPriorityを使用できるようにしました。

Priority 説明
High (priority=0) 緊急度の高いタスク
例:問い合わせに対する返信メールを送信するタスクなど
Middle (priority=1) High/Lowでないタスク
Low (priority=2) 短時間に大量に発生する可能性のあるタスク
例:一括処理系のタスクなど

※ 現状はMiddleとLowの2種類のみを使用しており、Middleについては許容可能な遅延の範囲で運用できています。
※ Highは今後より優先して処理したいタスクが発生した場合に備えて用意しました。

また、DefaultのPriorityをpriority=1にすることでdelayメソッドでタスクを登録した場合にMiddleの優先度が設定されるようにしています。

設定方法は以下になります。

from celery import Celery

app = Celery("task", broker="redis://localhost/0")
app.conf.broker_transport_options = {
    "priority_steps": list(range(3)),
    "sep": ":",
    "queue_order_strategy": "priority",
}
app.conf.task_default_priority = 1  # Default Priorityの設定

※ Client側のタスク登録時のpriority指定はapply_asyncメソッドでのみ可能です。そのため、delayメソッドを使用している箇所の修正を最小限にしたい場合はおすすめです!

まとめ

最後にこの機能を導入した事による効果をお伝えして終わります。

導入前は数時間単位で遅延が発生することがありましたが、導入後はMiddleのタスクの遅延を数秒単位まで抑えることができ、十分に許容できるレベルまで改善されました。

Priority毎のタスクの処理件数とQueue内の滞留時間(上:Middle、下:Low)

同じ悩みを持ったことがある方は是非導入を検討してみてください!