自社プロダクトのデータ基盤における BigQuery SQLテストシステムについて

「データ活用をより多くの人が、より効率的に実施できるようになる取り組み」をエンジニア観点から自発的に実施するカルチャーを持つ、自社開発プロダクト「Rtoaster(アールトースター)」のエンジニアチーム。今回は、データ基盤チームで作成した BigQuery でのテストシステムを紹介します!


こんにちは、プロダクトビジネス本部開発部の柴内(データ基盤チーム)です。今回は、自社製品である「Rtoaster」プロダクトのデータ基盤チームで作成した BigQuery でのテストシステムについてご紹介します。

背景

データ基盤チームでは、

  1. Rtoaster製品からリアルタイムに連携される、WebやアプリのトラッキングといったデータをGCSや BigQuery に蓄積するデータレイク
  2. データレイクにあるデータを BigQuery で加工・変換して利用しやすい形式にしたデータマートやデータウェアハウス
  3. 上記の加工したデータを顧客向けにファイルやAPIとして出力するインターフェース

等の各種コンポーネントの開発・運用を行っています。

特に2については、速度やコストの関係からほとんどの処理を BigQuery のSQL上で処理を行っています。処理の際には、分析・集計の観点からより意味のある値を得るために、以下のようなことを考えてクエリ仕様を設計しています。

  • 重複排除(リロードの排除など)の詳細
  • 非ログインユーザーがログインユーザーになったときのセッションの処理をどうするか
  • ポップアップ要素が表示されてからクリックが判定されるまでの有効期限
    …など

ただ、通常の(例えばPythonでpandas.DataFrameで行うような)データ処理とは異なり、SQLでのデータ処理の特有の問題があります。

■ SQLの構造化が難しく複雑になりがち

特に分析・集計用途で用いられるクエリの場合、重複排除やノイズとなるデータ除外などの前処理が重たくなっていき、数千行のSQLになることが多々あります。それに加え、分析・集計用途で用いるクエリはWINDOW関数や非正規化のための膨大なJOINなど複雑性が高い傾向にあります。

そうなったクエリは人によるレビューもかなり難しくなります。通常のプログラミング言語のプログラムでは上から下に処理を読んでいけば大体の場合問題なく追えますが、SQLの場合は頭の中で関係代数モデルを構築しながら読んでいくことになるので、見落としが非常に発生しやすいです。とくに、次に述べるNULLの扱いは見落としがちです。

■ NULLの扱いが厄介

NULLは普通のプログラミングにおいてもバグの温床となりがちですが、特にSQLで厄介なのはNULLが原因で実行時エラーとなることがほとんどなく、バグがあること自体に気付きにくいというのがあります。JOINやGROUP BYを行うときにNULLの考慮が漏れていると紐付けや集計がうまくいかず、最悪の場合データの欠損につながります。

かといって一律にNULLを禁止するというわけにもいきません。特に BigQuery のような列志向DBの場合、NULLを効率的に利用しないとパフォーマンス向上やコスト抑制ができません。SQLを書く上ではNULLと共生する必要があります。

■ プロプライエタリなDB

BigQuery はプロプライエタリなSaaSとして提供されており、ローカルでサーバーを立てた検証はできません。クエリはデータのロード量に従って従量課金されるので、検証のために実データを用いるときには重課金にならないよう注意が必要です。

BigQueryのSQLのコアを抽出してOSSにしたZetaSQLはありますが、BigQuery そのものではないため利用用途はコードの整形など一部に限られます。

■ バグ発生時のリスクが大きい

SQL特有の問題ではなく集計・分析システムの話としては、集計した値を画面などに出力していたり、お客様にファイルとして提供していたりする場合だと、一度バグが発覚し異なる数値が出ていた場合、過去に遡って影響を調査・報告する必要があります。とくにデータ分析や集計で用いられる値は、マーケティングや戦略など企業の意思決定に影響することが多く、バグの影響のリスクが他のバグ(単に機能が利用できない等)よりも重大になりえます。

今回はそんな BigQuery での分析・集計用途のSQLの品質保証を行うために作成したテストシステムについてご紹介します。

実装

今回説明するテストシステムの処理の流れは概して以下のようになっています。

  1. Pythonのクラスで定義されたテストケースから検査用SQLクエリを生成
  2. 検査用SQLクエリを BigQuery に投入して結果を得る
  3. 1のテストケースの期待される値と結果を比較

以下それぞれのトピックについて説明していきます。

Pythonでのテストケース記述

SQLクエリを書く前に、最終的に満たしてほしい要件を元にテストケースを洗い出していきます。 既存システムからの移行の場合は既存の仕様を元にテスト設計し、新規の分析用途クエリの場合は一般にありうるデータなどをプロダクト知識・ドメイン知識をもとに作成します。

また、実装中の段階で気付いた細かい挙動を追加したり、バグ修正の際にそれに関連したテストケースを追加することでテストを成長させていきます。 以下にテストケース定義の例を挙げます。テーブル名・スキーマなどは架空のものです。

class ItemJoinTestCase:
  NAME = '購買アイテムの名前が引ける'  # ← テストケース名

  def query(self, dataset: DatasetReference):
    return render_query('item.sql', dataset=dataset)  # テンプレートエンジンを利用してクエリをレンダーする

  @source('mart.purchase_data')  # ← Pythonのモジュール名
  def purchase_data(self):
    """ 購買データ """
    yield {'user_id': 'foobar', 'purchase_id': 123123}
    yield {'user_id': 'barbaz', 'purchase_id': 456456}
    yield {'user_id': 'geegee', 'purchase_id': 789789}

  @source('mart.item_mst')
  def item_mst(self):
    """ アイテムデータ """
    yield {'item_id': 123123, 'item_name': '石鹸'}
    yield {'item_id': 456456, 'item_name': 'カレー'}

  def expected(self):
    """ 期待されるデータ """
    yield {'user_id': 'foobar', 'item_name': '石鹸'}
    yield {'user_id': 'barbaz', 'item_name': 'カレー'}
    yield {'user_id': 'geegee', 'item_name': '不明'}

1ケースごとに1クラスとして定義を行います。各メソッドではテーブルデータを生成するようになっています。

クラスベースのテストでは、利点としてミックスインを用いてデータを共通化することで一部のデータだけ異なるテストケースが作りやすくなります。

class ItemMstMixin:
  """ アイテムデータ(共通) """
  @source('mart.item_mst')
  def item_mst(self):
    yield {'item_id': 123123, 'item_name': '石鹸'}
    yield {'item_id': 456456, 'item_name': 'カレー'}

class AnotherMstMixin:
  ...

class AnotherTestCase(ItemMstMixin, AnotherMstMixin):
  def expected(self):
    ...

expectedメソッドはSQLの結果として期待されるデータを記述します。この例ではアイテムの名前がJOINできる(できない場合は「不明」)というテストケースを記述しています。

アノテーションの「mart.purchase_data」等はPythonのモジュール名です。データ基盤のコードでは、テーブルの定義として取り込み元ファイル、テーブル名、データ変換ロジック、そしてデータ型などを1つずつPythonのファイルとして管理しています。*1 ここではそのモジュールの名前を指定しています。

# 例: mart/purchase_data.py
GCS_BUCKET = 'mart_datalake'
GCS_PATH = r'purchase_data/(?<site>\w+)/(?<timestamp>\d+)\.csv\.gz$'
SOURCE_FORMAT = 'CSV'
TABLE_ID = 'purchase_data'
DATASET_ID = '{site}'
SCHEMA = [
   {'name': 'user_id', 'type': 'STRING'},
   {'name': 'purchase_id', 'type': 'INTEGER'},
]
CLUSTERING = {'fields': ['user_id']}
...

ここでジェネレータを用いた理由の一つは可読性です。例えば時系列によるテストケースで繰り返しのパターンとなる際に、以下のように書けるので読みやすくなっています。

def purchase_data(self):
  for i in range(5):
    yield {'user_id': f'foobar{i}', 'purchase_id': 123123}
  for i in range(5):
    yield {'user_id': f'barbaz{i}', 'purchase_id': 456456}

親クラス(ミックスイン)からのコピーも簡単です。

def some_data(self):
  yield from super().some_data()
  yield {'user_id': 'aaa'}

クエリ生成

テストケースのクラス定義をメタプログラミングを用いて収集し、そのインスタンスから生成したデータからSQLを動的に生成します。定義したテーブルデータはCTEでダミーテーブルとして作り、テスト対象のクエリの参照テーブルをこの一時テーブルの名前で置換します。

このCTE形式を用いると、データのロードも発生しないため、BigQueryの消費バイトが0になるのでコストフリーで実行でき、実行速度も早くなる利点があります。

ただ、あまりにも複雑なクエリの場合はBigQueryのパース能力を超えてしまったというエラーが発生します。そのときはCREATE TEMP TABLEやINSERTを用いたクエリ生成方式に適宜スイッチします。

先程の例をとると以下のようなクエリが生成されます。

WITH 
  DUMMY_TABLE_purchase_data AS (
    SELECT col.* FROM (
      SELECT ARRAY<STRUCT<`user_id` STRING, `purchase_id` INT64>>
      [STRUCT('foobar', 123123),
       STRUCT('barbaz', 456456),
       STRUCT('geegee', 789789)] AS col
    ) AS t, UNNEST(t.col) AS col
  ),
  DUMMY_TABLE_item_mst AS (
    SELECT col.* FROM (
      SELECT ARRAY<STRUCT<`user_id` STRING, `purchase_id` INT64>>
      [STRUCT('foobar', 123123),
       STRUCT('barbaz', 456456),
       STRUCT('geegee', 789789)] AS col
    ) AS t, UNNEST(t.col) AS col
  )
(
  ... テスト対象のクエリ ...
)

pytestでの比較・CIへの組み込み

生成したクエリは、実際の BigQuery にリクエストが行われ、その結果はdictのリストとしてexpectedの結果と比較されます。dictのlist同士をassertで比較しているので、どこのカラムが違うかが一目で理解できるようになっています。

pytest-html での差分比較結果
pytest-html での差分比較結果

テストシステムにはpytestを用いています。pytestのエコシステムに乗っかることで、pytest-xdistでテストの並列化やpytest-htmlでHTMLのレポートを出力を行い効率的なテストが実現できています。

また、CTEによるノーロードクエリであり、かつpytest-xdistによる並列化を行っているので実行速度も数百のテストケースが2~3分程度で完了するようになっています。これらをCIに組み込むことによって、変更が既存の仕様に影響がないかをコミット毎に検証する回帰テストを効率的に実行でき、開発者の体験(安心感)が向上しました。

Airflowへの応用

このテストのシステムは、Airflowを用いたワークフロー処理の統合テストにも応用しています。

具体的には、テストケースクラスの記述方式はそのままに、クエリ生成の部分で一時的なデータセットを作りそこに実体テーブルを生成するように変更しました。そして、Docker上のAirflowでSequentialExecutorやDebugExecutorでDAGを実行し、ワークフローの最終成果物の対象テーブルの中身とexpectedを比較します。

このとき、Airflowのログファイルやpytestの結果ファイルをDockerコンテナから取り出す処理(あるいはボリュームマウント)を入れておりデバッグのとき便利にしています。

結果

以前は、分析・集計用のSQLは、一人が一気に書き上げるようなケースが多く、具体的にどういう処理がされているかはクエリを注意深く読み込むことでしか解釈できませんでしたし、それが正しく処理しているかについての確証は曖昧なところがありました。

このテストシステムを導入することで、レビューのときに、SQLを見るほかにテストケースを見ることで具体的な処理イメージが掴め、レビュアーの負担が軽減しました。また、クエリのミスや漏れの指摘があった際にテストケースを追加することで修正対応済みであることと、その内容が正しいことがわかりやすくなりました。

また、他の人(あるいは時間が経った自分自身)が処理の変更・修正を行う際にも、既存の仕様を壊していないかの品質が保証されるので、開発者体験も向上しました。

実際のテストケースの例、数百を超えるテストケースをCIでチェックしている
実際のテストケースの例、数百を超えるテストケースをCIでチェックしている

今後の展望

テストケースの自動生成

テストケースは今のところ、ドメイン知識や実際の発生する挙動に基づいて人力で作成しています。ですが、テーブルの元データをPythonで生成しているので、プログラマティックな生成も可能です。よって、仕様だけのランダムにテストケースを生成する(Property-based Testing)ことも可能です。ただ、課題として「ランダムに生成した元データからどうやって正しく期待されるデータを作成するか?」「時系列上の制約条件をどう定義するか?(案: 時相論理を使う?)」などが考えられます。

テスト仕様のドキュメント化

現在一部のテーブル定義リファレンスをYAMLで書かれたスキーマ定義からLaTeX経由でPDFにしたものとして作成しています。この仕組みを応用することで、テストのクラス定義から、「元のテーブルデータからこういう処理がされる」という仕様を具体的にテーブルデータと共に例示した内部向けドキュメントとして作成することが可能と思われます。こうすることで、開発担当者以外にもプロダクト担当者・サポート担当者に共有することでナレッジを共有することが期待できます。

他ツールとの統合

dbtなどのモデリングツールと統合して、それらのスキーマ定義を読み込んで利用したりテストケースを生成したりする拡張もアイデアとして考えられます。


ブレインパッドでは、今回ご紹介したような「データ活用をより多くの人が、より効率的に実施できるようになる取り組み」をエンジニア観点から自発的に行っています。共に便利なシステムを作る仲間を募集しています。ぜひご応募ください!
www.brainpad.co.jp

https://hrmos.co/pages/brainpad/jobs/hrmos.co

*1:ここでPythonモジュールとして定義している理由の一つとして、かつては変換関数自体をpickleでシリアライズして、非同期実行される変換ロジックをコンポーネント間の継続渡しで行うという設計になっていたという経緯があります。