こんにちは、ニーリーの佐古です。
現在開発速度や開発者体験の向上のため、取り組みの諸々を遂行しています。
Celeryの"接ぎ木"
もともと弊社プロダクトではバッチの実行などにCeleryを利用していた経緯もあり、
高速化のために処理の並列化を考えると
あるCelery TaskからべつのCelery Taskに処理を逃がすことが起きます。
これ自体は発想として割と自然なのですが、以下の問題があります。
単純に実装しづらい(特にWeb側)
Webリクエストなどバッチ以外のエントリポイントから、
気軽に並列処理を書こうとするとCeleryの構成や制約が邪魔になるのです。
非同期実行を行うには明示的なTask定義とapply_async()が必要で、
依存関係やエラーフローの扱いも暗黙的です。
エラーハンドリングというかエラー検知自体が困難
Task間の例外伝播がされず、モニタリングやトレースも分断され、
「どこで何が失敗したか」がわからない状態になりがちです。
Rx導入へ
ならI/Oの並列化が特に書きやすくてエラーハンドリングがしやすい
RxPYを導入しようということになりました。
以前から私自身がRxJava/RxJSアプリケーションの開発に携わっていることもあり
Rxを利用した実装のイデアを既に持っていたことも理由の一つです。
ところでRxって?
Rx(Reactive Extensions)は、
データの流れと非同期処理を「ストリーム」として扱えるライブラリ群です。
JavaのRxJava、JavaScriptのRxJS、PythonのRxPYなど複数の言語に実装があります。
「イベント」「I/O」「値の変換」「エラー処理」といったものを
.pipe(...) に繋げて書けるので、
非同期処理が直列の手続きとして扱えるようになるのが最大の強みです。
今回使っているRxPYはそのPython版で、from_iterable(...) のような入力を起点に、
.map(...), .flat_map(...), .filter(...), .reduce(...) などで
処理を組み立てていく構造になります。
が。
だいぶ学習コストが高い問題
Rx を初めて使うメンバーにとっては、オペレータの挙動やスケジューラの概念など、
初見で直感的に理解しづらいポイントが多く、
特に「非同期処理の見通しが良くなる」という効果を実感するまでには、
ある程度の習熟が必要です。
部分導入による問題 - アプリケーション全体はリアクティブではない
全体がリアクティブに構成されているアプリケーションの場合、
むしろ概ね以下の点を守ってもらえれば何とかなったりします。
- 「自分でsubscribeしないでね、基盤実装でやるから」
- 「Observableの入れ子に気を付けてね、値がemitされないことがあるから」
-
Single<?>を返す実装でSingle<Single<Integer>>とかをうっかり返す問題
-
が、今回はリアクティブでないアプリケーションに部分的にRxを導入するので
自分でリアクティブ実装とそうでない部分の境界線を引く必要があります。
うっかりCurrentThreadScheduler上ですべてのオペレーションを実行して
ただの直列処理を書く危険性もありますね。*1
スムーズなRx導入のために
便利で優秀な技術であっても使ってもらえなければダメなので、
ここで工夫を施すことにします。
ライブラリ整備
というわけで、
- コンテキストの切れ目を分かりやすくする
- できるだけスケジューラを意識させないようにする
ためのライブラリ実装を行いました。
WaitingScheduler*2
コンテキスト表現としてはコンテキストマネージャを実装するのが一番ですね。
スケジューラがコンテキストマネージャを兼ねれば
Rxとそうでないところの文脈を切るのに都合がよいでしょう。
__enter__と__exit__を実装すればそのクラスはコンテキストマネージャですので、こんな感じ。
class WaitingScheduler(ThreadPoolScheduler): def __init__(self, max_workers=None, observer=None): super().__init__(max_workers=max_workers) self._observer = observer or CurrentThreadScheduler() self._shutdown_subject: ReplaySubject | None = None def halt_shutdown(self) -> Self: if self._shutdown_subject is not None: raise RuntimeError("shutdown control already initialized") self._shutdown_subject = ReplaySubject(buffer_size=1) self._shutdown_subject.on_next(True) return self def allow_shutdown(self) -> None: if self._shutdown_subject is None: raise RuntimeError("halt_shutdown must be called before allow_shutdown") self._shutdown_subject.on_completed() def __enter__(self) -> "WaitingScheduler": return self def __exit__(self, exc_type, exc_val, exc_tb) -> bool: if self._shutdown_subject is not None: self._shutdown_subject.run() self.executor.shutdown(wait=True) return False @property def observer(self): return self._observer
to_observable
Rxに慣れている人向けに関数をスケジューラ未指定のObservable化する処理を定義します。
直前でops.observe_on(schduler)を呼んでおけばよいので普通のRxと同じです。
また、必ずしも要るかどうかはわかりませんが
関数をスケジューラ指定済みのObservable化する処理とを提供します。
スケジューラ未指定のもの(to_free_observableとしましょう)はこんな感じ。
def to_free_observable( blocking_func: Callable[..., Any], *args, **kwargs ) -> Observable: def action(observer, scheduler: Scheduler | None) -> Disposable: try: result = blocking_func(*args, **kwargs) observer.on_next(result or True) observer.on_completed() except Exception as e: observer.on_error(e) return Disposable() return create(action)
書き味。
というわけで書き味を試してみましょう。
with WaitingScheduler(cpu_count() * 2) as scheduler: rx.from_iterable(list(target_entities)).pipe( ops.subscribe_on(scheduler.observer), ops.flat_map(lambda entity: to_observable(some_heavy_task, scheduler, entity)), ops.observe_on(scheduler.observer), # 明示しなければカレントスレッドスケジューラがここに保持される ops.reduce(lambda acc, _: acc + 1, 0) ).subscribe( on_next= lambda count: _logger.info(f'[処理件数: {count}件]'), on_completed=lambda: _logger.info('バッチ処理が完了しました'), on_error=lambda e: _logger.exception(f'処理中にエラーが発生しました: {str(e)}') )
はい。閉じ込められている感が出せたので満足です。
値を返したい場合はFutureなどをon_nextで待ち構えさせておけばよいです。
注意
上記の例は見やすさ重視でlambda内でobservableを指定しているため、
コンテキスト離脱処理開始のタイミングでto_observableが呼び出される
(つまり、to_observableを呼び出す2つ目以降のオペレータ内のlambda)では
スケジューラが撤収にかかっていてobservableの生成に失敗する可能性があります。
おまけに付けた実装では
そのあたりを解決するための機能(halt_shutdown/allow_shutdown)がついていますが
特に問題なさそうなら素直にops.observe_on(scheduler)と書いた方がよさそうですよ。
実装のイディオム化
先の書き味の例とほぼ同じとなりますが、この書き方そのものが安全で再利用可能なパターンとして成立したため、
「イディオム」として明示的に共有することにしました。
result = concurrent.futures.Future() with WaitingScheduler(cpu_count() * 2) as scheduler: rx.from_iterable(list(target_entities)).pipe( ops.subscribe_on(scheduler.observer), ops.observe_on(scheduler), ops.flat_map(lambda entity: to_free_observable(some_heavy_task, entity)), ops.flat_map(lambda processed_entity: to_free_observable(another_heavy_task, processed_entity)), ops.observe_on(scheduler.observer), ops.reduce(lambda acc, _: acc + 1, 0), ).subscribe( on_next=result.set_result, on_error=result.set_exception, )
- スケジューラ兼コンテキストを作る
- 中でパイプラインを作って処理を流す
- ブロッキング処理はto_observableなどの関数でobservableに変換する
- スケジューラの扱いがよくわからなければ
to_observable(some_heavy_task, scheduler, arg)と書いておけばOK
- 以降の処理を行う(つまり復帰先の)スケジューラを指定する
- リダクションが必要なら(復帰先で)行う
- subscribeを宣言する
- (コンテキストから離脱したタイミングで確実に処理が成功または失敗で完了している)
- コンテキスト外ではエラーと結果をFutureから取り出す
これにより、
不慣れな人でもとりあえず安全に動くものが書ける世界をつくることを意図しています。
現時点での成果
1件のバッチ(処理件数: 15000+件)の高速化に採用されており、
もともと2~5時間だった処理時間を、現在では1時間半~2時間程度に圧縮できています。
というわけですでに他の処理でも採用予定があります。やったね。
今後の課題
バックプレッシャリング
RxPY自体には標準でバックプレッシャー機能がありません。マジでござるか。
次かその次あたりの採用個所では必要になってきますが、
一応別ライブラリを使えば実現できるらしいので
それをイディオムに組み込むことになるでしょう。
処理のキャンセル対応
Observableなんだからキャンセルできるべきです。 (多分標準のfrom_callableを利用してスケジューラの指定だけ遅らせればそれでよいはず
falsy値をEmitしたはずがTrueに化ける
これはただのポカです。
値を取得する必要が現状ないのでこそっと直します。
さいごに
よい道具は使われて初めてよい道具となるので、
よい道具があるのになぁと思っている人は環境に合わせ使われるための工夫をしていきましょう。