Timee Product Team Blog

タイミー開発者ブログ

flake8 pluginを書いてみた

こんにちは、タイミーのデータエンジニアリング部 データサイエンス(以下DS)グループ所属のYukitomoです。

今回はPythonのLinterとしてメジャーなflake8のプラグインの作り方を紹介したいと思います。

コードの記述形式やフォーマットを一定に保つため、black/isort/flake8などのformat/lintツールを使うことはpythonに限らずよく行われていますが、より細部のクラス名や変数名を細かく規制したい(例:このモジュールのクラスはこういう名前付けルールを設定したい等)、けれどコードレビューでそんな細かい部分を目視で指摘するのは効率的でない、といったケースはありませんか?そんな時、flake8のプラグインを用意して自動検出できるようにしておくと便利です。

ネット上には公式サイトを含めいくつかプラグイン作成の記事があるのですが、我々の想定ケースと微妙に異なる部分がありそのままでは利用できなかったため、

  • 最新のflake8(2024/7現在, 7.1.0)を用い
  • 比較的新しいパッケージマネージャーであるpoetry(1.8.3を想定)を利用して
  • 2種類のプラグインのそれぞれの作り方

を改めてここにまとめます。

準備するもの

  • Python: Versionは特に問いませんが、3.11.9で動作確認しています。
  • Poetry: 1.8 以上 (後述しますが1.8より導入されたpackage-mode = falseを指定しているため)。この記述を変えることで1.8以前のバージョンでも動くとは思いますが、この記事では1.8を前提としています。

上記が利用可能な環境をvenvやコンテナを利用して作成しておいてください。flake8本体はpyproject.tomlの依存モジュールとして導入されるため事前に準備する必要はありません(3.8以降で動作するはずですが、本記事では7.1を利用します)。

全体の構成

サンプルで利用するファイル群は以下の通りです。構文木を利用するタイプと1行ずつ読み込んでいくタイプと2種類あるため、それぞれをtype_a、type_bとしてサンプルを用意し、それら2つのサンプルを束ねる上位のプロジェクトを一つ用意しています。本来なら各プラグイン毎にユニットテスト等も実装すべきですが、本記事ではプラグインの書き方自体の紹介が目的のため割愛しています。なお、type A, type Bの呼称はflake8プラグインにおいて一般的な呼び名ではなく、本記事の中で2つのタイプを識別するために利用しているだけなので注意してください。

# poetry.lock 等本記事の本質と関係のないものは省略しています
(.venv) % tree .  # この位置を$REPOSITORY_ROOTとします。
.
├── pyproject.toml
└── plugins
    ├── type_a
    │   ├── pyproject.toml
    │   └── type_a.py
    └── type_b
        ├── pyproject.toml
        └── type_b.py

${REPOSITORY_ROOT}/pyproject.tomlは以下の通り。

# cat ${REPOSITORY_ROOT}/pyproject.toml
[tool.poetry]
name = "flake8 plugin samples"
version = "0.0.1"
description = "A sample project to demonstrate flake8 plugins"
authors = ["timee-datascientists"]
package-mode = false # この記述を外せばきっとpoetry 1.8より前でも動くはず

[tool.poetry.dependencies]
python = ">=3.11.9"

[[tool.poetry.source]]
name = "PyPI"
priority = "primary"

[tool.poetry.group.dev.dependencies]
# flake8を利用するので一緒によく利用されるblack/isortも導入
flake8 = "~7.1.0"
isort = "~5.13.2"
black = "~24.4.0"

# プラグインはローカルからeditable modeで登録
type_a = { path="./plugins/type_a", develop = true}
type_b = { path="./plugins/type_b", develop = true}

[build-system]
requires = ["poetry>=1.8"]
build-backend = "poetry.masonry.api"

Type A: AST Treeを利用する場合

Python codeの1ファイルをparseして抽象構文木(AST)として渡すタイプのプラグインです。ネットでflake8のプラグインを検索した時、こちらのタイプの実装例が出てくることが多く、また、構文木の処理が実装できるなら、こちらの方が使いやすいです。

構文木で渡されたpython ファイルを巡回し、その過程で違反を発見するとエラーを報告しますが、本記事のサンプルでは構文木の巡回結果は無視し、巡回後必ずエラーを報告しています。詳細はast.NodeVisitorを参照いただきたいのですが、各ノードを巡回する際に呼ばれるvisit()だけでなく、visit_FunctionDef() などファイル内で関数定義された場合、など個別の関数が用意されているので、これらを適切に上書きすることで、目的の処理を実現していくことになります。

なお、プラグインのコンストラクタには抽象構文木(ast)の他、lines, total_lines等公式ドキュメントのここに記述されているものを追加することができます。

以下にサンプルの実装(type_a/type_a.py)とプロジェクトの定義ファイル(type_a/pyproject.toml)を示します。

# type_a/type_a.py
import ast
from typing import Generator, List, Tuple

# プラグインの本体
class TypeAPluginSample:
    def __init__(
        self, tree: ast.AST  #, lines, total_lines: int = 0
    ) -> None:
        self.tree = tree

    def run(self) -> Generator[Tuple[int, int, str, None], None, None]:
        visitor = MyVisitor()
        visitor.visit(self.tree)
        # サンプルでは常にエラーを報告するが本来ならvisitorに結果を溜め込んで
        # 結果に応じてエラーをレポート
        if True:
            yield 0, 0, "DSG001 sample error message", None

# プラグインから利用する構文木の巡回機
class MyVisitor(ast.NodeVisitor):
    # visit() やvisit_FunctionDef()を目的に応じて上書き
    pass

# 他のサンプルでは必須っぽく書いてあるが、pyproject.tomlのentry-points
# の指定と被ってるなぁと思ってコメントアウトしても動いたので今はいらない気がする。
# def get_parser():
#    return TypeAPluginSample
(.venv) % cat plugins/type_a/pyproject.toml
# 親プロジェクトから直接ロードするため [project]の記述もしていますが
# プラグイン単体で独立したプロジェクトとするなら不要。
[project]
name = "type_a"
version = "0.1.0"
description = "Sample type a plugin"
authors = [{name = "timee-datascientists", email = "your.email@example.com"}]

[tool.poetry]
name = "type_a"
version = "0.1.0"
description = "Sample type-a plugin"
authors = ["timee-datascientists"]

[build-system]
requires = ["setuptools", "wheel", "poetry>=1.8.3"]
build-backend = "setuptools.build_meta"

[tool.poetry.dependencies]
python = ">=3.11.9"
flake8 = ">=7.1.0"

# ここでプラグインのクラス名を登録
[project.entry-points."flake8.extension"]
DSG = "type_a:TypeAPluginSample"

Type B: 1行ずつ処理する場合

対象となるpython ファイルを1行ずつ処理していくタイプのプラグインです。公式ドキュメントにある通り、歴史的な経緯で2種類あるようですが、こちらの1行ずつ処理するタイプを使ったサンプルを見かけたことがありません。特に非推奨とされているわけでもないですし、実装したいルール自体がシンプルであればこちらの方法で実装するのもありだと私は思います。physical_lineもしくはlogical_lineを第一引数に設定し、physical_lineの場合はファイルに書かれている1行ずつ、logical_lineの場合はpython の論理行の単位で指定した関数が呼ばれます。physical_line, logical_lineの両方を同時に指定することはできず、他の変数を追加する場合もphysical_line/logical_lineは第一引数とする必要があります。

以下にサンプルの実装(type_b/type_b.py)とプロジェクトの定義ファイル(type_b/pyproject.toml)を示します。

# type_b/type_b.py
from typing import Optional

# プラグイン本体
def plugin_physical_lines(
    physical_line: Optional[str] = None,
    line_number: Optional[int] = None,
    filename: Optional[str] = None,
):
    if line_number == 2:
        yield line_number, "DSG002 sample error message"
(.venv) % cat plugins/type_b/pyproject.toml

# type_aのものとほぼ同じ。project.nameおよびtool.poetry.nameをtype_bに書き換えた後、
# 差分は以下。プラグイン本体の関数を指定してやれば良い。

:
[project.entry-points."flake8.extension"]
DSG = "type_b:plugin_physical_lines"

実行結果

以下のようなサンプルファイルを用意し、flake8を実行した結果を示します。

# sample.py
def main():
    print(
        'Hello, World!'
    )

if __name__ == '__main__':
    main()

実行結果

% flake8 sample.py
sample.py:0:1: DSG001 sample error message
sample.py:2:3: DSG002 sample error message

注意点

Type A, Type B両方とも公式ドキュメントに書いてある変数は全てコンストラクタに追加できるのですが、それぞれのタイプにおいて意味のあるものは限られるため、必要なもののみを追加すれば良いです。

まとめ

flake8 のプラグインの定義方法を2通りご紹介しました。

タイミーのデータサイエンスグループでは通常のformat/lintだけでカバーできない(けれど少しの工夫により機械作業で抽出できる)運用ルールを本記事のようなflake8プラグインを用いてCIで事前に検出することで、コードレビューはできるだけ本質的な部分に集中できるよう取り組んでいます。

We’re Hiring!

タイミーのデータエンジニアリング部・データアナリティクス部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

「話を聞きたい」と思われた方は、是非一度カジュアル面談でお話ししましょう!

References

開発生産性カンファレンス2024に参加しました

タイミー QA Enabling Teamのyajiriです。

去る6月28日〜29日の2日間、ファインディ様主催の「開発生産性カンファレンス2024」に参加してきました。

(タイミーには世界中で開催されるすべての技術系カンファレンスに無制限で参加できる「Kaigi Pass」という制度があり、今回もこれを利用して新潟からはるばる参加してきました。) productpr.timee.co.jp

タイミーでは弊社VPoE(VP of ええやん Engineering)の赤澤の登壇でもご紹介した通り、チームトポロジーを組織に適用し、プロダクト組織の強化と改善にチャレンジしています。 speakerdeck.com

この登壇でも紹介されておりますが、私自身もイネイブリングチームの一員として、プロダクト組織全体のQA(品質保証)ケイパビリティの向上や、障害予防プロセスの改善に取り組んでいます。

開発生産性の観点から考える自動テスト

まずQAの視点で最も印象に残ったのは、皆さんもご存知のt_wadaさんによる「開発生産性の観点から考える自動テスト(2024/06版)」です。 speakerdeck.com

なぜ自動テストを書くのか?

この問いに対してt_wadaさんは「コストを削減するためではなく、素早く躊躇なく変化し続ける力を得るため」そして「信頼性の高い実行結果に短い時間で到達する状態を保つことで、開発者に根拠ある自信を与え、ソフトウェアの成長を持続可能にすること」と表現されていました。

(ここまで一言一句に無駄のない文章は久々に見た気がします)

タイミーでもアジャイル開発の中で高速なテストとフィードバックのサイクルを意識し、自動テストを含むテストアーキテクチャの強化に取り組んでいます。しかし、活動がスケールすると共にテストの信頼不能性(Flakiness)や実行時間の肥大化、費用対効果などの問題が発生します。

これらの問題に対する合理的な対応策を検討する上で、各々のテストの責務(タイプ)や粒度(レベル)を分類し、費用対効果と合目的性の高いものから重点的に対応していく必要があります。

そのためのツールとして「アジャイルテストの四象限」や「テストピラミッド」「テスティングトロフィー」などを活用し、テストレベルを整理し、テストのポートフォリオを最適化するアプローチを取っていましたが、具体的なアーキテクチャに落とし込んだ際に「これってどのテストレベルなんだっけ?」といった想定と実態の乖離がしばしば発生していました。

サイズで分類しテストダブルでテスト容易性を向上する

それを解決する手段として、テストレベルではなくテスト「サイズ」で整理する方法が提唱されました。

テストサイズの概念は古くは「テストから見えてくる グーグルのソフトウェア開発」、最近では「Googleのソフトウェアエンジニアリング」で紹介されていました。今回紹介されたのは、テストピラミッドにおいても具体的なテストタイプではなく「サイズ」で分類し、テストダブル(実際のコンポーネントの代わりに使用される模擬オブジェクト)を積極的に利用することでテスタビリティを向上させ、テストサイズを下げ、速度と決定性の高いテストが多く実装される状態を作るというアプローチです。

このアプローチは、タイミーのDevOpsカルチャーにも親和性が高く、ぜひ自動テスト戦略に取り入れたいと感じました。

おわりに

他にも魅力的で参考になる登壇が盛りだくさんで、丸々2日間の日程があっという間に過ぎる素晴らしいイベントでした。

主催のファインディ様やスポンサー、登壇者の皆さまに感謝するとともに、来年の開催も心より楽しみにしています。

H3を使用した BigQueryでの空間クラスタリングについて検証してみた

こんにちは、タイミーのデータエンジニアリング部データサイエンス(以下DS)グループ所属の菊地です。

今回は、H3を使用したBigQueryでの空間クラスタリングについて検証した内容を紹介したいと思います!

BigQueryでの空間クラスタリングとは

BigQueryにはクラスタリングという機能があり、うまく活用すると、クエリのパフォーマンスを向上させ、クエリ費用を削減できます。

クラスタリングは空間データにも適用でき、BigQuery がデフォルトで使用するS2インデックス システムを使用して、空間クラスタリングを行うことができます。

また、H3やGeohashなどの他の空間インデックスに対しても空間クラスタリングを行うことができ、今回はタイミーでも良く使用しているH3を使用して、空間クラスタリングを行う方法を検証してみました。

BigQueryでのクラスタリング及び空間クラスタリングについては、下記の記事が参考になるかと思います。

cloud.google.com cloud.google.com

H3を使用した BigQueryでの空間クラスタリングの検証

上記の参考記事でも挙げましたが、基本的にこちら記事の内容に沿いつつ、一部具体の実装が記載されていない箇所を補完しながら検証を行いました。 cloud.google.com

1. 検証用のテーブル作成

検証用のテーブルとして、経度と緯度のランダムポイントを、H3セルID(解像度13)に変換したテーブルを作成します。

DECLARE H3_INDEX_RESOLUTION INT64 DEFAULT 13;

-- 連番を格納しておくためだけのテーブル
-- CTEだと後続のテーブル作成が遅かったので実テーブルにしてます
CREATE OR REPLACE TABLE `tmp.tmprows` as
SELECT x FROM UNNEST(GENERATE_ARRAY(1, 10000)) AS x;

-- 経度と緯度のランダムポイントを、H3セルID(解像度13)に変換したテーブル
DROP TABLE IF EXISTS `tmp.h3_points`;
CREATE OR REPLACE TABLE `tmp.h3_points`
CLUSTER BY h3_index
AS 
WITH points AS (
  SELECT 
    `carto-os`.carto.H3_FROMLONGLAT(RAND() * 360 - 180, RAND() * 180 - 90, H3_INDEX_RESOLUTION) AS h3_index
    -- 後の検証のために追加
    , RAND() AS amount
  FROM 
    `tmp.tmprows` AS _a
    CROSS JOIN `tmp.tmprows` AS _b
)
select 
  h3_index
  , amount
FROM points

テーブルのストレージ情報と内容は以下のようになります。

2. クラスタリングによる絞り込みが効かないクエリ例

次に、参考記事で紹介されているように、親セルID(今回は解像度7)をWHERE句で指定してクエリを実行してみましたが、このクエリはテーブルをフルスキャンしてしまいます。

DECLARE PARENT_CELL_ID STRING DEFAULT '870000000ffffff'; -- H3解像度7のセルID

SELECT
    ROUND(SUM(amount), 6) AS sum_amount
FROM 
    `tmp.h3_points`
WHERE `carto-os`.carto.H3_TOPARENT(h3_index, 7) = PARENT_CELL_ID

ジョブ情報と結果

H3インデックスでクラスタリングを行っているにもかかわらず、テーブルをフルスキャンしてしまう理由としては、

H3_ToParentにはビット演算が関係し、複雑すぎて BigQuery のクエリアナライザが、クエリの結果がクラスタ境界にどのように関連しているかを把握できないために発生します。

参考記事では言及されています。

3. クラスタリングによる絞り込みが効くクエリ例

次に、クラスタリングによる絞り込みが適用されるクエリを検証してみます。

「2. クラスタリングによる絞り込みが効かないクエリ例」との違いとしては、低解像度の親セルに含まれる、高解像度セルの開始IDと終了IDを取得し、WHERE句で指定していることです。

DECLARE H3_PARENT_ID STRING DEFAULT '870000000ffffff';  -- H3解像度7のセルID
DECLARE H3_INDEX_RESOLUTION INT64 DEFAULT 13;

DECLARE RANGE_START STRING;
DECLARE RANGE_END STRING;

-- 低解像度の親セルに含まれる、高解像度セルの開始IDと終了IDを取得しセットする
SET (RANGE_START, RANGE_END) = (
  SELECT AS STRUCT
    `carto-os`.carto.H3_TOCHILDREN(H3_PARENT_ID, H3_INDEX_RESOLUTION)[0],
    ARRAY_REVERSE(`carto-os`.carto.H3_TOCHILDREN(H3_PARENT_ID, H3_INDEX_RESOLUTION))[0]
);

SELECT
  ROUND(SUM(amount), 6) AS sum_amount
FROM
  `tmp.h3_points`
WHERE 
  h3_index BETWEEN RANGE_START AND RANGE_END

ジョブ情報と結果は以下のようになっており、スキャン量が削減され、クエリのパフォーマンスも向上しています。クエリ結果も「2. クラスタリングによる絞り込みが効かないクエリ例」の結果と合致しています。

ジョブ情報と結果

まとめ

H3を使用した BigQueryでの空間クラスタリングについて検証してきました。

タイミーでは位置情報を活用した分析を行うシーンが多く、うまく活用することで機械学習時の特徴量生成や、BIツールからのクエリ最適化に繋げることができる可能性があるので、今後のデータ分析に活かしていきたいと思います。

We’re Hiring!

タイミーのデータエンジニアリング部・データアナリティクス部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

「話を聞きたい」と思われた方は、是非一度カジュアル面談でお話ししましょう!

【イベントレポート】Kotlin Fest 2024に参加しました

2024年6月22日(土)にKotlin Fest 2024が開催されました。Kotlin Festは「Kotlinを愛でる」というビジョンを掲げた技術カンファレンスです。タイミーのAndroidエンジニアはエンジニアの成長を支援する制度の一つであるKaigi Passを利用して参加しました。

本投稿では、Kotlin Fest 2024に参加したメンバー(中川、haru、みかみ、しゃむむらたtick-tack)が気になったセッションや感想のレポートします!

メンバーによるレポート

中川編

効果的なComposable関数のAPI設計

私が気になったセッションは、haru067さんによる「効果的なComposable関数のAPI設計」です。このセッションでは、Composable関数を書くときに引数をどのように定義すべきかという、現場で直面する具体的な疑問に対して、様々なケーススタディを通じて考察が行われました。

セッションでは以下のプラクティスに触れられました:

  • State hoisting
  • Slot API
  • DSLでのslot APIの活用
  • デフォルト引数
  • Property drilling

特に印象的だったのは、これらのプラクティスが常に最適な解決策とは限らないという点が強調されていたことです。むやみに使うのではなく、適切な場面で使うことが重要であるという、現場での経験に裏打ちされた具体的なアドバイスが参考になりました。

haru編

Kotlinで愉しむクリエイティブコーディング

まず最初にご紹介するのは、畠山 創太 さんによる Kotlinで愉しむクリエイティブコーディング です。

私はクラブイベントにたまに行くので、VJさんという存在を元々知っていたのですが、そんなVJさんの中でもジェネ系と呼ばれる画面をリアルタイムに生成するライブコーディング的なアプローチのVJさんとプライベートで繋がりがあり、それに利用されているフレームワークなどを知っていました。

そんな中、このセッションではKotlinでリアルタイムにグラフィックスを処理できて、ジェネ系VJにも使えそうなOPENRNDRが紹介されていました。

OPENRNDRはProcessingやTouch Designerなどのジェネ系VJで使われるフレームワークとよく似たフレームワークで、KotlinベースのDSLでグラフィックス処理を記述することができます。

このセッションでは、OPENRNDRで書かれたいくつかのデモ(Boidsなど)が紹介され、OPENRNDRでできることの自由度や簡単に記述できることを紹介していました。

OpenGLベースのグラフィックスバックエンドをもち、RealSense, Kinect, TensorFlow, DMXなど多種多様な連携先が存在しており、これらを使えばセッションで紹介されていた以上のこともできそうだなと感じました。

Okioに愛を込めて

次にご紹介するのは、RyuNen344さんによるOkioに愛を込めてです。

OkioはBlock社が開発しているKotlin向けのI/O ライブラリで、OkHttpやMoshiのベースにも使われているライブラリです。

まず、Kotlinの標準ライブラリが充実しているのに、なぜOkioを採用するのかという話から始まりました。

いくつかの理由を紹介されていましたが、地味に落とし穴だなと思ったのは、Kotlinが元々JVMをターゲットとした言語としてスタートしているが故にJava標準ライブラリを呼び出しているところが多々あったり、それをKMPから使えなかったりするというところでした。

そんな中、OkioはJava標準ライブラリなどへの依存がなく、それでいて使い勝手の良いI/Oライブラリになっているということで、これから直接的・間接的問わず利用する頻度は増えていきそうでした。

これからKotlin向けのライブラリを作る上では、JVM以外のターゲットで使われることも前提として考えないといけないと思いました。

そして、綺麗なダジャレでセッションは終了。お見事でした。

みかみ編

例外設計について考えて Kotlin(Spring Boot&Arrow)で実践する

「例外設計について考えて Kotlin(Spring Boot&Arrow)で実践する」というセッションを紹介します。例外設計の重要性とプロダクト開発に与える影響について深く掘り下げ、KotlinとArrowライブラリを活用した柔軟な例外設計の実践方法が詳しく説明されていた発表でした。

特に印象的だったのは「例外設計とモデリング」についてです。このセッションでは、例外を「技術的例外とビジネス例外」および「予期する例外と予期しない例外」の組み合わせで大きく4つに分類できるという説明がありました。そしてそれぞれの例外に対して、ドメイン駆動設計(DDD)の考え方を基に、具体的にどのようにコードに反映させるかが紹介されました。例外をドメインに結びつけて考えることにより、プロダクト開発に良い影響を与える例外設計を行うことができると感じました。

例外自体は普段の実装でも意識しますが、その複雑さのため設計に関しては深く意識できていないことが多いと感じています。本セッション内容を通してプロダクト開発をより良くしていくための例外設計の考えた方と実践に挑戦していきたいと感じました。

しゃむ編

しゃむ(@arus4869)です。FF16を最近ようやくクリアできたので、FFVIIリバースやり始めました。最高ですね。

KotlinのLinterまなびなおし2024

私が気になったセッションは「KotlinのLinterまなびなおし2024」です。このセッションでは、各種Lintツールの紹介だけでなく、Lintツールを効果的に活用するための実践的なアドバイスも多数紹介されました。

中でも特に気になったのはkonsistです。konsistは、標準セットルールがなく、各プロジェクトの特性に合わせたルール設定が可能である点が魅力的でした。また、テスト環境やユニットテストでの動作が主な特徴で、アノテーションを活用することで特定の用途に応じたルール設定ができる点も興味深かったです。

またセッションの中で、Lintルールを段階的に導入することでチームの負担を軽減しつつ、徐々にコード品質を向上させるアプローチも印象的でした。

このセッションを通じて、KotlinのLintの効果的な使い方について多くの知見を得ることができ、学び直しの良い機会になりました。ありがとうございました。

むらた編

むらた(@orerus)です。最近夫婦でカイロソフトさんのアプリにハマっています。

withContextってスレッド切り替え以外にも使えるって知ってた?

さて、早速ですが私が気付きを得たセッションとしてT45Kさんによる「withContextってスレッド切り替え以外にも使えるって知ってた?」 を紹介させていただきます。スライドも公開されています。

Kotlin coroutinesを使っていると頻繁に登場する withContext ですが、セッションタイトルでズバリ指摘されている通り、私もスレッドの切り替え用関数であるかのように意識してしまっていたことに気づきました。

使い方が間違っているわけではありませんが、セッションで紹介されている通り、withContextの挙動は正確にはスレッド切り替えではなく「CoroutineContextを切り替える」(厳密には既存のCoroutineContextと引数で渡されたCoroutineContextをマージする)ことにあります。そのうえで、渡されたブロックをcoroutineContextで指定されているcoroutineDispatcherにて実行するという形になります。(詳細については是非T45Kさんのスライド資料を参照ください)

そのため、 withContext(Dispatchers.IO) のように切り替え先のスレッド (厳密には CoroutineDispatcher ) を指定するだけでなく、 withContext(Job() + Dispatchers.Default + CoroutineName("BackgroundCoroutine")) のように、複数のCoroutineContextを合成する形で引数を指定することができるんですね。(CoroutineContextの要素についてはこちらを参照ください

なお、 withContext 以外のコルーチンビルダー( launchasync など)についても、引数で指定されたCoroutineContextと既存のCoroutineContextをマージして用いる挙動は同じです。

今回のセッションを通じて、Kotlin coroutinesへの理解がさらに深まりました。とても良いセッションをありがとうございました!

tick-tack編

まだ JUnit を使ってるの? kotest を使って快適にテストを書こう

Kotest についての HowTo を熱く語っておられるセッションで Kotest への愛を感じました。最近よく名前を聞くライブラリな気がします。

タイミーでも hamcrest を採用していますが Java 向けのテストライブラリは Kotlin の予約語が使われていてエスケープしないととても見づらいことがあります。やっぱり Kotlin first に書けるのは非常に気持ちがいいですね。Kotest は Runner が JUnit で安定した環境で動かせるのもグッド。

個人的にセッション内で刺さったポイントとしては EventuallyProperty Based Testing です。

Eventually

内部で非同期処理を実行するメソッドのテストを書くときに実行しても assertion のタイミングが変更前で失敗するといったケースはよくあります。そういう時に eventually を使うと一定時間評価しつづけ期待する結果に変わったら成功と見なしてループを抜けてくれます。めちゃめちゃかしこい。逆に一定時間変更がないことを評価する continually もあるそうです。

Property Based Testing

都度実行する度に自前で用意しなくても、ランダムに自動生成された property を利用して複数回テストするといったことができます。境界値テストを用意する場合に役立ちそうです。


さっそく assertion だけですが触ってみました。

記述方法だけでも inifix で name shouldBe "tick-taku" みたいに書けて最高にワクワクします。楽しくテストが書けそうですね。

触ってみていいなと思ったのが、例えばインスタンスが別だけど中の property が同じな事だけ確認したい場合はこんな感じに書けました。1つずつ取り出して equals とかしなくてもスッキリしていいですね。

data class User(val id: Long, val name: String, val age: Int)

checkAll(
    iterations = 3,
    Arb.long(),
    Arb.string(1..10, Codepoint.katakana()),
    Arb.int(1..100)
) { id, name, age ->
    val user = User(id = id, name = name, age = age)
    repository.save(user)

    repository.getUser() shouldBeEqualToComparingFields user
}

一応あまり有用な例ではないですが上で紹介した property testing の checkAll や property のランダム生成もせっかくなので書いてみました。

個人的には Google の Truth が好きでしたが推し変しそうです。Android プロジェクトに導入するのもよさそうでした。

まとめ

Kotlin Fest 2024はKotlinという言語の可能性を改めて再認識するとともに熱意と活気に満ちたイベントでした。また、普段リモートワークで働くタイミーのエンジニアにとってもチームメンバーと対面で交流する貴重な機会でした。今回得られた知見を活かして今後のプロダクト開発にもさらに力を入れていきたいと思います、次回のKotlin Festも楽しみにしています!

BigQueryとLookerStudioのニッチな落とし穴についてまとめてみた

こんにちは、タイミーでデータアナリストをしているyuzukaです。 主にプロダクトの分析に携わっています。

ビジネス職からデータアナリストに転向して約1年経った私が、1年前の自分に教えてあげたい、BigQueryや LookerStudioに関する落とし穴を、いくつか挙げてみようと思います。

はじめに

弊社では、分析環境として BigQueryを採用しています。LookerStudioを使って、 BigQueryのデータを参照してダッシュボードを作ることもよくあります。

BigQueryの SQLを使った分析を進めていく中で、想定と異なるデータが出てきてしまい、原因を特定するのに苦労し、無駄な時間を費やしてしまった経験が何度もあります(実際には、そんな過程もきっと無駄ではないと信じたい)。

こちらのブログを読んでいただいたみなさまには、同じ苦労を味わっていただきたくないので、私が今までにハマってきた落とし穴をいくつか紹介します。

1. BigQueryで使える一部の記法は、LookerStudioでサポートされておらず、接続エラーになる

BigQueryでは正常に動いていたクエリが、LookerStudioを使った途端に謎のエラーになることがあります。

これは、一部の記法が LookerStudioでサポートされていないことに起因しているようです。

私が遭遇した範囲では、以下の2つの記法でエラーになることが確認できています。

  • DECLARE , CREATE

    DECLARE , CREATE を使うと、事前に変数や関数の内容を宣言できます。 DECLARE , CREATE を含むクエリを書くと、BigQueryでは正常に動きますが、LookerStudioではエラーになります。

    これを回避するには、大人しくLookerStudioのパラメータ機能を使うなどするのが良さそうです。

  • QUALIFY句

    QUALIFY句は WHERE句と異なり、Window関数の結果で絞り込めるという特徴があります。 基本的に、QUALIFY句を使ったクエリは、BigQueryでは正常に動きますが、LookerStudioではエラーになります。

    これは QUALIFY句と WHERE句を併用することで回避できるようです(なにゆえ・・・)

    (参考記事:BigQuery "QUALIFY" Function is not supported by data studio?

    なので QUALIFY句を使うときは、なるべく習慣的に WHERE句をつけるようにしています。

SELECT
    column1
    ,ROW_NUMBER()OVER(PARTITION BY xx ORDER BY yy) AS rank
    FROM
        table
    WHERE
        true -- エラー回避のためだけに追加
    QUALIFY
        rank = 1

2. LAST_VALUEは使い方を間違えると、最後の値を返さないことがある

LAST_VALUEを使っても、なぜか最後の値が返ってこないことがあります。

これは、LAST_VALUEの処理範囲がデフォルトで「RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW(最初から現在までの行)」になっているためです(公式ドキュメント)。

つまり、以下のようなクエリを書いた場合、

SELECT
    LAST_VALUE(aa)OVER(PARTITION BY bb ORDER BY ymd) AS rank
    FROM
        table

① まずはymdが古い順に並び替える

② 最初から現在の行までで、ymdが最新の場所を探す → 現在の行になる

③ 現在の行のaaが返ってきてしまう

ということになっているようです。

これを回避するには、処理範囲を「ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(最初から最後の行まで)」と指定するか、以下のようにFIRST_VALUEとDESCを使う形にするのが良さそうです。

SELECT
    FIRST_VALUE(aa)OVER(PARTITION BY bb ORDER BY ymd DESCAS rank
    FROM
        table

3. 日付の表示フォーマットでYYYY を使うと、正しい西暦が返ってこないことがある

LookerStudioなどの日付の表示フォーマットで、西暦の表示形式に「YYYY」を指定すると、正しい西暦が返ってこないことがあります。

これは、YYYYが単純な西暦ではなく、「その暦週の基準年」を返しているからでした。

簡単に言うと、「新年度の1月1日と同じ週に属する日については、新年度に属することにする」という考え方になっているそうです。

単純な西暦を出したい場合は、大文字の「YYYY」ではなく小文字の「yyyy」を使わなければならないようです。暦週の基準年を出したいケースはそうないと思うので、とりあえず「西暦は小文字」と覚えてしまうのが良さそうです。

もはや SQLの話ではないですが、当時こちらの答えに辿り着くまでにちょっぴり苦労しており、どうしても紹介したかったので最後にご紹介しました。

おわりに

ここまで、私が経験してきた BigQuery・LookerStudio のニッチな落とし穴についてまとめてみました。

今回の記事が、少しでもみなさまの業務のお役に立てれば幸いです。

(「それはニッチな落とし穴でもなんでもないよ」「他にもこんなのがあるよ」など、ご意見ご感想ありましたら、当ブログやXなどでコメントいただけますと幸いです)

分析の正確性を担保するためには、このような落とし穴を知っておくことも大事ですが、実際には、これらを理解したところで、毎回1つもミスをせず、一発で正しいクエリを書きあげることは難しいのではないかと思います。

常に自分の書いたクエリを疑いつつ、実際のデータを見て検証したり、別の指標と比較して違和感がないか確かめたり、必要に応じて他の人にクエリのレビューをお願いしたり、といった工夫の方が、個人的には大事なのかなと思っています。

We’re Hiring!

タイミーでは、一緒に働くメンバーを募集しています。

https://hrmos.co/pages/timee/jobs

カジュアル面談も実施していますので、少しでも興味を持っていただけましたら気軽にお申し込みください!

個人的にもアナリストやデータ関連職の方と繋がりたいと思っているので、よければXのフォローもよろしくお願いします。

dbt 1.8のUnit Tests 実施とその知見(時間ロックとSQLの分割について)

株式会社タイミーのkatsumiです!

dbtのバージョン1.8以上を利用することで、unit testsが利用可能になります。今までもSingular テスト(単一テスト)やGeneric テスト(汎用テスト)は可能でしたが、テストデータを利用した単体テストも行うことができます。

導入準備

dbt-coreの場合

dbt v1.8 以上を利用してください。

dbt-cloudの場合

2024/06/12時点では dbt「Keep on latest version」を選択することで利用できます。

弊社ではunit-test用の環境のみlatest versionを利用しています。

Unit Testの基本

# run data and unit tests
dbt test

# run only data tests
dbt test --select test_type:data

# run only unit tests
dbt test --select test_type:unit

# run tests for one_specific_model
dbt test --select "one_specific_model"

# run data tests limited to one_specific_model
dbt test --select "one_specific_model,test_type:data"

# run unit tests limited to one_specific_model
dbt test --select "one_specific_model,test_type:unit"

unit-testに関係する新しいコマンドが追加されました。このコマンドは、以前のデータテストで使用していたselect機能と同様に、特定のテストケースを選択して実行することができます。

ymlによるテストレコードの書き方

  - name: test_name
    description: "テストの説明"
    model: my_model
    given:
      - input: ref('users')
        rows:
          - {id: 1, user_email: example@example.com}
          
    expect:
      rows:
        - {id: 1, domain: example.com}

name: test_name

これはテストの名前です。この名前はテストケースを識別するために使用します。

description: “テストの説明”

これはテストケースの説明です。この説明には、テストが何を意図しているのか、テストの目的や背景について記載します。

model: my_model

これはテスト対象となるモデルの名前です。ここでは「my_model」がテスト対象のモデルとして指定されています。

given

データの内容です。ここでは「id: 1」で「user_email」「example@example.com」のユーザーを指定しています。このデータがテストの入力として使用されます。

expect

これは期待される結果を指定します。テストが成功するためには、モデルが「id: 1」のユーザーに対して「domain」が「example.com」として返される必要があります。期待される結果と実際の結果が一致するかどうかを検証します。

ファイルによるテストレコードの書き方

unit_tests:
  - name: test_my_model
    model: my_model
    given:
        - input: ref('users')
            format: csv
            fixture: users

プロジェクトのtests/fixturesディレクトリにあるCSVファイル名を指定することで利用できます。test-pathsオプションを使用することで、ディレクトリ構成を柔軟に指定することもできます。

未定義のカラムの挙動

未入力のカラムに関しては、safe_cast(null as INT64)のように型が定義されたnullのデータで補完されます。リレーションが必要なものや、ロジックに影響を与えるカラムの記入が必要になります。

実施における知見

大規模なクエリは”ephemeral”で細かいテスト行う。

  • with句が複数ありテストケースが複雑で見通しが悪くなるケースがあります。弊社ではSQLのテスト単位のロジックを”ephemeral”で分けて個別のmodelにてテストを書く実装を試しています。
  • 通常のモデルと同じ書き方でテストを実施することが可能です。
WITH 処理1_cte AS (
    SELECT * FROM {{ ref('処理1のephemeral') }}
)

, 処理2_cte AS (
    SELECT * FROM {{ ref('処理2のephemeral') }}
)

, 処理3_cte AS (
    SELECT * FROM {{ ref('処理3のephemeral') }}
)

時系列系の時間の停止をマクロで行う。

  • テストしたいケースにはcurrent_datetimeなど現在の時刻を利用するものがあります。その場合、テストを書く際に時間を固定する必要があります。
  • dbtのユニットテストでは、YAMLファイル上でdbtのマクロを置き換える機能があります。この機能を利用して、時間を固定する実装を行っています。
  - name: test_case
    model: my_model
    overrides:
      macros:
        current_datetime_jst: "date('2024-01-01')"
{{ config(materialized='ephemeral') }}

SELECT 
-- ここにロジックを書く
FROM {{ ref('users') }} AS users
WHERE DATETIME_TRUNC(created_at, MONTH) = DATE_TRUNC({{ current_datetime_jst() }}, MONTH)

Testに関するSQLの確認ができる。

  • 実際の仕組みとしてはテスト用のSQLが生成され、フィクスチャ(テストデータ)も含めたSQLが実行されます。debugコマンドやコンパイルされたSQLを確認することで、テストの挙動をチェックできます。
  • テストケースの問題が起きた時にSQLにて要因分析を行いました。

まとめ

重要指標の計算や複雑な時系列処理、プロダクトのロジックを再現する箇所では、テストケースを用意していこうと考えています。またテストケースを先に定義したのちにクエリを書くことも簡単にできるようになったように感じます。信頼性の高いモデルにするために、重要な機能になっていきそうです。

以上、unit-testsを試した時に得られた知見のまとめでした。この情報が役立てば幸いです!

We’re Hired

タイミーでは、一緒に働くメンバーを募集しています!!

参考資料

  • Unit tests | dbt Developer Hub

https://docs.getdbt.com/docs/build/unit-tests

  • Unit Testing

https://github.com/dbt-labs/dbt-core/discussions/8275

dbt snapshotの内部クエリを理解して正確に挙動を把握しよう!

はじめに

こんにちは☀️okodooooonです

最近、社内のdbt snapshotモデルでパフォーマンスの問題が発生し、その解決に苦労しました。dbt snapshotの内部処理が公式ドキュメントなどで提示されておらず、詳細なクエリを理解していなかったためです。

そこで、今回、dbt snapshotの内部クエリについて解説してみることにしました。ただし、今回の解説内容は、ドキュメントで説明されている通りの挙動がどのようにSQLで表現されているのか確認したもので、新しい発見やTipsみたいなものは特にないです!

内部処理をしっかり理解することで、dbtによって抽象化された処理をより効果的に活用できることもあるかな〜と思っておりますので、どなたかの参考になれば幸いです!

(今回解説するクエリは、dbt-bigqueryで生成されるクエリです)

dbt snapshotとは(ざっくり)

SCD Type2 Dimensionという思想に従って、過去時点の状態の遷移を蓄積できるような仕組みです。

ソースシステム側ではステータス変更が行われると、そのナチュラルキーのレコードが上書き処理されますが、その上書き処理前後のレコードをそれぞれ有効期限付きで保存します

公式Doc: https://docs.getdbt.com/docs/build/snapshots

今回の例

以下のようなモデルを仮定して、snapshotのクエリを見ていきたいと思います。

モデルファイル上の定義はこんな感じです。

{% snapshot snapshotted_sample_table %}

    {{
        config(
          target_schema='sample_dataset',
          strategy='timestamp',
          unique_key='id',
          updated_at='updated_at',
          invalidate_hard_deletes=True,
        )
    }}
    select * from {{ source('sample_dataset', 'sample_data') }}

{% endsnapshot %}

ソーステーブル側で一意であるカラムをunique_key, レコード更新日時を記録するカラムをupdated_atに指定しています。

左のテーブルがsnapshot化されることで、右のように有効期限(dbt_valid_from, dbt_valid_to)とsnapshot後のレコードに対するユニークキー(dbt_scd_id)が付与されます

全体の流れ

dbt snapshotはBigQueryにおいて2つのクエリを実行しています。

  • ソーステーブルと宛先テーブルからデータを抽出して、snapshot先にmergeするためのtmpテーブルを、update,delete,insertそれぞれの処理ごとに分割して作成する処理
  • tmpテーブルでラベリングされた処理ごとにMERGEクエリを実行する処理

それぞれ実行されるクエリの詳細は以下のようになります。

tmpテーブル作成クエリ全文 (クリックで展開)

```sql
    create or replace table `sample_project`.`sample_dataset`.`sample_table__dbt_tmp`

    OPTIONS(
      description="""""",    
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
    )
    as (
      with snapshot_query as (
    SELECT
        *
    FROM
        `sample_project`.`sample_dataset`.`sample_table`
    ),

    snapshotted_data as (
        select *,
            id as dbt_unique_key
        from `sample_project`.`sample_dataset`.`snapshotted_sample_table`
        where dbt_valid_to is null
    ),

    insertions_source_data as (
        select
            *,
            id as dbt_unique_key,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            nullif(updated_at, updated_at) as dbt_valid_to,
            to_hex(md5(concat(coalesce(cast(id as string), ''), '|',coalesce(cast(updated_at as string), '')))) as dbt_scd_id
        from snapshot_query
    ),

    updates_source_data as (
        select
            *,
            id as dbt_unique_key,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            updated_at as dbt_valid_to
        from snapshot_query
    ),

    deletes_source_data as (
        select
            *,
            id as dbt_unique_key
        from snapshot_query
    ),


    insertions as (
        select
            'insert' as dbt_change_type,
            source_data.*
        from insertions_source_data as source_data
        left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where snapshotted_data.dbt_unique_key is null
           or (
                snapshotted_data.dbt_unique_key is not null
            and (
                (snapshotted_data.dbt_valid_from < source_data.updated_at)
            )
        )
    ),

    updates as (
        select
            'update' as dbt_change_type,
            source_data.*,
            snapshotted_data.dbt_scd_id
        from updates_source_data as source_data
        join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where (
            (snapshotted_data.dbt_valid_from < source_data.updated_at)
        )
    ),

    deletes as (
        select
            'delete' as dbt_change_type,
            source_data.*,
    current_timestamp()
 as dbt_valid_from,       
    current_timestamp()
 as dbt_updated_at,
    current_timestamp()
 as dbt_valid_to,
            snapshotted_data.dbt_scd_id
        from snapshotted_data
        left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where source_data.dbt_unique_key is null
    )

    select * from insertions
    union all
    select * from updates
    union all
    select * from deletes
    );

```

merge実行クエリ全文 (クリックで展開)

```sql
merge into `sample-project`.`sample_dataset`.`sample_table` as DBT_INTERNAL_DEST
using `sample-project`.`sample_dataset`.`sample_table__dbt_tmp` as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id

when matched
  and DBT_INTERNAL_DEST.dbt_valid_to is null
  and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
  and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
     then insert (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)
     values (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)

```

上記クエリ内の各CTEで行われる処理をざっくりまとめると以下のような処理のフローになります。 処理の詳細を詳しく見ていきたいのですが、クエリ自体がちょっと長いので、insert, update, deleteそれぞれの処理に分割して詳細を見ていこうと思います!

snapshot内部処理の詳細

delete処理:宛先テーブルに存在するレコードがソーステーブルでdeleteされていた場合

tmpテーブル生成クエリのうち、ソース側でdeleteされたレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),
deletes_source_data as (
    select
        *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ ソーステーブル }}
)
deletes as (
    select
        'delete' as dbt_change_type,
        source_data.*,
                current_timestamp() as dbt_valid_from,       
                current_timestamp() as dbt_updated_at,
                current_timestamp() as dbt_valid_to,
        snapshotted_data.dbt_scd_id
    from snapshotted_data
    left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where source_data.dbt_unique_key is null
)

tmpテーブル生成の処理の内訳は以下のようになります。

【処理の概要】
- 履歴が確定していない(valid_toに値が入っていない)レコード群を宛先テーブルから抽出
- 履歴が確定していないレコードのうち、ソーステーブルに存在しない(削除された)レコードに絞り込み
- dbt_valid_from, dbt_valid_toをクエリの実行時刻に設定
- dbt_change_typeを’delete’に設定

ソーステーブル側で削除されたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when matched
  and {{宛先テーブル}}.dbt_valid_to is null
  and {{マージ用tmpテーブル}}.dbt_change_type in ('delete')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- 宛先テーブルの履歴が未確定で、tmpテーブルのdbt_change_typeが’delete’の場合
    - 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(クエリ実行時刻)に上書き

以下図に表したような処理の流れによって、ソーステーブル側で削除されたレコードdbt_valid_toにsnapshot時の時刻が入るようになります。

update処理:宛先テーブルと比較してソーステーブルのレコードがupdateされていた場合

tmpテーブル生成クエリのうち、ソース側でupdateされたレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),
updates_source_data as (
    select
        *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key,
        updated_at as dbt_updated_at,
        updated_at as dbt_valid_from,
        updated_at as dbt_valid_to
    from {{ ソーステーブル }}
),
updates as (
    select
        'update' as dbt_change_type,
        source_data.*,
        snapshotted_data.dbt_scd_id
    from updates_source_data as source_data
    join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where (
        (snapshotted_data.dbt_valid_from < source_data.updated_at)
    )
)

【処理の概要】
- 履歴が確定していないレコード群を宛先テーブルから抽出
- ソーステーブルから抽出したレコードのdbt_valid_from, dbt_valid_toを現在時刻に設定
- 履歴が確定していないレコードのうち、宛先のdbt_valid_fromより後にupdated_atがソーステーブルに存在するレコードに絞る
- dbt_change_typeを’update’に設定

ソーステーブル側でupdateされたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when matched
  and {{宛先テーブル}}.dbt_valid_to is null
  and {{マージ用tmpテーブル}}.dbt_change_type in ('update')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- 宛先テーブルの履歴が未確定で、tmpテーブルのdbt_change_typeが’update’の場合
    - 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(現在時刻)に上書き

以下図に表したような処理の流れによって、宛先テーブルの履歴が未確定のデータのうち、ソースで更新が走ったレコードのdbt_valid_toにスナップショット時の日時が入ります。

insert処理:宛先テーブルに無いレコードがソーステーブル側に新規で作成されていた場合

tmpテーブル生成クエリのうち、insert対象のレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),

insertions_source_data as (
    select
        *,
        id as dbt_unique_key,
        updated_at as dbt_updated_at,
        updated_at as dbt_valid_from,
        nullif(updated_at, updated_at) as dbt_valid_to,
        to_hex(md5(concat(coalesce(cast(id as string), ''), '|',coalesce(cast(updated_at as string), '')))) as dbt_scd_id
    from {{ ソーステーブル }}
),

insertions as (
    select
        'insert' as dbt_change_type,
        source_data.*
    from insertions_source_data as source_data
    left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where snapshotted_data.dbt_unique_key is null
       or (
            snapshotted_data.dbt_unique_key is not null
        and (
            (snapshotted_data.dbt_valid_from < source_data.updated_at)
        )
    )
),

【処理の概要】
- ソーステーブルのunique_keyにしていたカラムとupdated_atに指定していたカラムを組み合わせてsurrogate_keyを生成
- ソーステーブルに対して履歴未確定の宛先テーブルをLEFT JOINして以下の条件に絞る
    - 宛先テーブルに指定したunique_keyが存在しないが、ソーステーブルには存在するレコード
    - 宛先テーブルに指定したunique_keyのレコードが存在して、ソーステーブル側のupdated_atが宛先テーブルのvalid_fromよりも後のレコード
- dbt_change_typeを’insert’に設定

ソーステーブル側でinsertされたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when not matched
  and {{マージ用tmpテーブル}}.dbt_change_type = 'insert'
     then insert (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)
     values (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- dbt_scd_idがマッチしなくて、dbt_change_type=’update’の場合にinsert処理を実行
- 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(現在時刻)に上書き

以下図に表したような処理の流れによって、指定したユニークキーが宛先に存在しないか、履歴が未確定のレコードのうちソース側で前回実行からupdateが走ったものがinsertされます。

check戦略の場合

上で紹介したのは snapshot_strategy=timestamp の場合のスナップショットの挙動であり、ソーステーブル側でupdated_at に指定したカラムが更新された場合に、すべてのプロパティの情報を履歴的に保持するものです。

dbtにはもう一つのスナップショット戦略として、check 戦略があります。

{% snapshot snapshotted_sample_table %}

    {{
        config(
          target_schema='sample_dataset',
          strategy='check',
          unique_key='id',
          invalidate_hard_deletes=True,
            check_cols=[
                            'foo',
                            'bar',
                'created_at',
                'updated_at',
            ],
        )
    }}
    select * from {{ source('sample_dataset', 'sample_data') }}

{% endsnapshot %}

このモデルでは全カラムを選択していますが、特定のカラムの変更のみを履歴的にトラッキングする仕組みです。

strategy=check においても、strategy=timestamp の時と同様に、snapshot処理はtmpテーブルを作成するクエリとmerge処理を実行するクエリに分割されます。

strategy=checkの場合のtmpテーブル作成クエリ(クリックで展開)
strategy=checkの場合のmerge実行クエリ(クリックで展開)

merge実行クエリはstrategy=timestampの時と変わらず、tmpテーブルの生成方法が異なっているので、詳しく見ていこうと思います

check戦略の詳細

insert 用データや update 用データを出力するCTEでは、以下のようなWHERE条件が使用されます。

((
    snapshotted_data.`foo` != source_data.`foo`
    or
    (
        ((snapshotted_data.`foo` is null) and not (source_data.`foo` is null))
        or
        ((not snapshotted_data.`foo` is null) and (source_data.`foo` is null))
    ) 
    or
    snapshotted_data.`bar` != source_data.`bar`
    or
    (
        ((snapshotted_data.`bar` is null) and not (source_data.`bar` is null))
        or
        ((not snapshotted_data.`bar` is null) and (source_data.`bar` is null))
    ) 
    or
    snapshotted_data.`created_at` != source_data.`created_at`
    or
    (
        ((snapshotted_data.`created_at` is null) and not (source_data.`created_at` is null))
        or
        ((not snapshotted_data.`created_at` is null) and (source_data.`created_at` is null))
    )
    or
    snapshotted_data.`updated_at` != source_data.`updated_at`
    or
    (
        ((snapshotted_data.`updated_at` is null) and not (source_data.`updated_at` is null))
        or
        ((not snapshotted_data.`updated_at` is null) and (source_data.`updated_at` is null))
    )
))

この条件により、insert と update の対象となるレコードの抽出条件は次のようになります。

  • insert用データの抽出条件
( 宛先にユニークキーが存在しない ) 
OR
  ( 
    (宛先にユニークキーが存在する)
    AND 
    (ユニークキー以外のcheck_colsに指定したカラムが、宛先とソースで何かしら変化が発生している)
  )
  • update用データの抽出条件
(宛先にユニークキーが存在する)
AND
(ユニークキー以外のcheck_colsに指定したカラムが、宛先とソースで何かしら変化が発生している)

checkで指定されたカラムの変更をどのように追跡しているかを確認できました。

まとめ

今回はdbt snapshotの内部処理をdelete, update, insertの処理に分解して説明してみました。

公式ドキュメントで説明されている通りの処理が生成されるSQLによって行われていることが確認できました。

dbt snapshotを使用している際に、期待した挙動が得られない場合や何かしらエラーが発生したときに、この情報が役立てば幸いです!

We’re Hired

タイミーでは、一緒に働くメンバーを募集しています!!