
はじめに
PB本部開発部の新谷です。この記事では、自社製品「Rtoaster」のCDPチームで開発したワークフロー機能について紹介します。
背景
弊チームでは、顧客の保有するデータを統合し、分析した結果を施策につなげるためのプラットフォームを開発しています。具体的には以下のような機能を提供しています。
- 顧客の保有するGCSなどの外部ストレージと、弊社CDPとの間のETL機能
- 取り込んだデータをもとにノーコードでセグメントを作成する機能
- 取り込んだテーブルや作成したセグメントを、弊社の保有する施策実行のためのプロダクトに連携する機能
特に1つ目のETL機能については、取り込み・加工・出力の1つ1つの処理を「ジョブ」という単位で実現しています。プロダクトの初期段階ではジョブ間の順序関係を明示的に指定する方法がなく、顧客が各ジョブのスケジュール実行のタイミングを調整して、ジョブの実行順序を保証する運用となっていました。これには以下のような問題があります。
- ジョブの実行間隔を作業者の経験に基づいて調整しなければならず、人に依存した運用となる
- データの急増などによりジョブの実行時間に変動が起きると、ジョブ間の順序関係が破綻する
- ジョブの集合によって一連のETL処理を実現するが、その集合を管理する方法がプロダクト内にないため、ジョブの管理コストが上がる
これらの課題を解決するために、顧客の指定した順序関係に従って一連のジョブを実行する、ワークフロー機能を実装しました。
技術選定
UI上でワークフローを組み立てる部分に関しては、以下のような要件がありました。
- ワークフロー組み立てに必要な以下の操作を、顧客が直感的にできること
◦ジョブの追加、削除
◦ジョブ間の順序関係の追加、削除
◦ワークフローの見た目の変更(ジョブの表示位置の変更、表示の倍率変更)
- 弊チームの採用しているVue.jsで使いやすいこと
以上を満たすライブラリとして、Drawflow (drawflow - npm) を採用しました。こちらは他に似たようなライブラリが少なく、選定工程でほぼ迷いがありませんでした。
またワークフロー実行に関しては以下のような要件がありました。
- ジョブの前後関係を守った上で、可能であれば並列実行する
- 前回失敗したところからワークフローを再実行できる
これらを満たすワークフローエンジンとして、Digdag(Home Default )とCloud Composer(Cloud Composer | Google Cloud ) が候補に挙げられました。今回はコストを抑えたかったことと、既存のジョブ実行の仕組みとの親和性から、Digdagを採用しました。
実装
UIで表現されたワークフローをもとに隣接リストを作る
こちらがワークフローのUIイメージです。ジョブの端点をドラッグ&ドロップで繋ぎ合わせることで、直感的に順序関係を設定できます。

ユーザーがワークフローを作成すると、ワークフロー内のジョブの結びつきが隣接リストとしてバックエンドに送られます。上のワークフローの場合は、以下のような隣接リストが生成されます。
[["START", ジョブ1"], ["START", "ジョブ2"], ["ジョブ1", "ジョブ3"], ["ジョブ2", "ジョブ3"]]
隣接リストをもとにdigファイルを作る
バックエンドではこの渡された隣接リストをもとに、Digdagによるワークフロー実行に必要なdigファイルを組み立てる処理を行います。例えば先ほどの隣接リストからは、次のようなdigファイルが生成されます。
timezone: Asia/Tokyo +group_0: _parallel: limit: 5 +job_1: sh>: {ジョブ1を実行するスクリプト} +job_2: sh>: {ジョブ2を実行するスクリプト} +job_3: sh>: {ジョブ3を実行するスクリプト} +success: sh>: {成功時の処理}
基本的に書かれた順番に処理が実行されますが、parallelの指定されたブロック内の処理は並列実行されます。以下ではこのdigファイルの組み立て方について説明します。
まずは隣接リストで表現されている前後関係を破らないように、ジョブを一列に並べる必要があります。そのため隣接リストをもとにトポロジカルソートします。

その後、基本的にはソート結果の順番にdigファイルに書けば良いのですが、一般にはソート結果が一意にはならない場合があります。例に挙げたDAGの場合、ジョブ1とジョブ2は入れ替えた結果もトポロジカルソートとなっています。つまりジョブ1とジョブ2はどの順番で実行しても順序関係を崩さないので、並列実行することができます。結果として先ほど記載した通りのdigファイルが生成されます。
timezone: Asia/Tokyo +group_0: _parallel: limit: 5 +job_1: sh>: {ジョブ1を実行するスクリプト} +job_2: sh>: {ジョブ2を実行するスクリプト} +job_3: sh>: {ジョブ3を実行するスクリプト} +success: sh>: {成功時の処理}
一般に、入れ替えてもトポロジカルソートのままであるようなジョブの集合がある場合、そのジョブ集合は並列実行が可能です。
DAGをもとにしたdigファイルの組み立てが終わったので、これをDigdagに渡せばワークフローを実行することができます。またUI上でのワークフローの定義は顧客が自由に行えるので、一般には閉路がありえるのですが、それもトポロジカルソートを行うことで弾くことができます。
今後の展望
現状ではワークフロー内の各ジョブの実行ステータスは、ワークフローが全て終わってから一度にUI上に反映されるようになっています。ワークフロー実行中に順次ジョブのステータスを反映する方が、ユーザーに親切になると思います。
またワークフロー内の並列実行されるジョブが全て成功しないと、後続のジョブに進まない仕様となっております。つまりジョブの結びつきを依存関係ではなく単なる順序関係として解釈しているのですが、この部分を改善することでワークフロー失敗時に実行されないジョブを減らすことができます。
その他にも、以下のような要望もあります。
- 複数のワークフロー内の共通処理を1つのワークフローとして定義し、それを使いまわしたい
- そもそもワークフローをコピーしたい
これらの改善案も今後検討していきたいと思っております。
ブレインパッドでは新卒採用・中途採用共にまだまだ仲間を募集しています。
ご興味のある方は、是非採用サイトをご覧ください!
www.brainpad.co.jp