Timee Product Team Blog

タイミー開発者ブログ

dbt snapshotの内部クエリを理解して正確に挙動を把握しよう!

はじめに

こんにちは☀️okodooooonです

最近、社内のdbt snapshotモデルでパフォーマンスの問題が発生し、その解決に苦労しました。dbt snapshotの内部処理が公式ドキュメントなどで提示されておらず、詳細なクエリを理解していなかったためです。

そこで、今回、dbt snapshotの内部クエリについて解説してみることにしました。ただし、今回の解説内容は、ドキュメントで説明されている通りの挙動がどのようにSQLで表現されているのか確認したもので、新しい発見やTipsみたいなものは特にないです!

内部処理をしっかり理解することで、dbtによって抽象化された処理をより効果的に活用できることもあるかな〜と思っておりますので、どなたかの参考になれば幸いです!

(今回解説するクエリは、dbt-bigqueryで生成されるクエリです)

dbt snapshotとは(ざっくり)

SCD Type2 Dimensionという思想に従って、過去時点の状態の遷移を蓄積できるような仕組みです。

ソースシステム側ではステータス変更が行われると、そのナチュラルキーのレコードが上書き処理されますが、その上書き処理前後のレコードをそれぞれ有効期限付きで保存します

公式Doc: https://docs.getdbt.com/docs/build/snapshots

今回の例

以下のようなモデルを仮定して、snapshotのクエリを見ていきたいと思います。

モデルファイル上の定義はこんな感じです。

{% snapshot snapshotted_sample_table %}

    {{
        config(
          target_schema='sample_dataset',
          strategy='timestamp',
          unique_key='id',
          updated_at='updated_at',
          invalidate_hard_deletes=True,
        )
    }}
    select * from {{ source('sample_dataset', 'sample_data') }}

{% endsnapshot %}

ソーステーブル側で一意であるカラムをunique_key, レコード更新日時を記録するカラムをupdated_atに指定しています。

左のテーブルがsnapshot化されることで、右のように有効期限(dbt_valid_from, dbt_valid_to)とsnapshot後のレコードに対するユニークキー(dbt_scd_id)が付与されます

全体の流れ

dbt snapshotはBigQueryにおいて2つのクエリを実行しています。

  • ソーステーブルと宛先テーブルからデータを抽出して、snapshot先にmergeするためのtmpテーブルを、update,delete,insertそれぞれの処理ごとに分割して作成する処理
  • tmpテーブルでラベリングされた処理ごとにMERGEクエリを実行する処理

それぞれ実行されるクエリの詳細は以下のようになります。

tmpテーブル作成クエリ全文 (クリックで展開)

```sql
    create or replace table `sample_project`.`sample_dataset`.`sample_table__dbt_tmp`

    OPTIONS(
      description="""""",    
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
    )
    as (
      with snapshot_query as (
    SELECT
        *
    FROM
        `sample_project`.`sample_dataset`.`sample_table`
    ),

    snapshotted_data as (
        select *,
            id as dbt_unique_key
        from `sample_project`.`sample_dataset`.`snapshotted_sample_table`
        where dbt_valid_to is null
    ),

    insertions_source_data as (
        select
            *,
            id as dbt_unique_key,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            nullif(updated_at, updated_at) as dbt_valid_to,
            to_hex(md5(concat(coalesce(cast(id as string), ''), '|',coalesce(cast(updated_at as string), '')))) as dbt_scd_id
        from snapshot_query
    ),

    updates_source_data as (
        select
            *,
            id as dbt_unique_key,
            updated_at as dbt_updated_at,
            updated_at as dbt_valid_from,
            updated_at as dbt_valid_to
        from snapshot_query
    ),

    deletes_source_data as (
        select
            *,
            id as dbt_unique_key
        from snapshot_query
    ),


    insertions as (
        select
            'insert' as dbt_change_type,
            source_data.*
        from insertions_source_data as source_data
        left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where snapshotted_data.dbt_unique_key is null
           or (
                snapshotted_data.dbt_unique_key is not null
            and (
                (snapshotted_data.dbt_valid_from < source_data.updated_at)
            )
        )
    ),

    updates as (
        select
            'update' as dbt_change_type,
            source_data.*,
            snapshotted_data.dbt_scd_id
        from updates_source_data as source_data
        join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where (
            (snapshotted_data.dbt_valid_from < source_data.updated_at)
        )
    ),

    deletes as (
        select
            'delete' as dbt_change_type,
            source_data.*,
    current_timestamp()
 as dbt_valid_from,       
    current_timestamp()
 as dbt_updated_at,
    current_timestamp()
 as dbt_valid_to,
            snapshotted_data.dbt_scd_id
        from snapshotted_data
        left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where source_data.dbt_unique_key is null
    )

    select * from insertions
    union all
    select * from updates
    union all
    select * from deletes
    );

```

merge実行クエリ全文 (クリックで展開)

```sql
merge into `sample-project`.`sample_dataset`.`sample_table` as DBT_INTERNAL_DEST
using `sample-project`.`sample_dataset`.`sample_table__dbt_tmp` as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id

when matched
  and DBT_INTERNAL_DEST.dbt_valid_to is null
  and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
  and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
     then insert (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)
     values (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)

```

上記クエリ内の各CTEで行われる処理をざっくりまとめると以下のような処理のフローになります。 処理の詳細を詳しく見ていきたいのですが、クエリ自体がちょっと長いので、insert, update, deleteそれぞれの処理に分割して詳細を見ていこうと思います!

snapshot内部処理の詳細

delete処理:宛先テーブルに存在するレコードがソーステーブルでdeleteされていた場合

tmpテーブル生成クエリのうち、ソース側でdeleteされたレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),
deletes_source_data as (
    select
        *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ ソーステーブル }}
)
deletes as (
    select
        'delete' as dbt_change_type,
        source_data.*,
                current_timestamp() as dbt_valid_from,       
                current_timestamp() as dbt_updated_at,
                current_timestamp() as dbt_valid_to,
        snapshotted_data.dbt_scd_id
    from snapshotted_data
    left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where source_data.dbt_unique_key is null
)

tmpテーブル生成の処理の内訳は以下のようになります。

【処理の概要】
- 履歴が確定していない(valid_toに値が入っていない)レコード群を宛先テーブルから抽出
- 履歴が確定していないレコードのうち、ソーステーブルに存在しない(削除された)レコードに絞り込み
- dbt_valid_from, dbt_valid_toをクエリの実行時刻に設定
- dbt_change_typeを’delete’に設定

ソーステーブル側で削除されたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when matched
  and {{宛先テーブル}}.dbt_valid_to is null
  and {{マージ用tmpテーブル}}.dbt_change_type in ('delete')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- 宛先テーブルの履歴が未確定で、tmpテーブルのdbt_change_typeが’delete’の場合
    - 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(クエリ実行時刻)に上書き

以下図に表したような処理の流れによって、ソーステーブル側で削除されたレコードdbt_valid_toにsnapshot時の時刻が入るようになります。

update処理:宛先テーブルと比較してソーステーブルのレコードがupdateされていた場合

tmpテーブル生成クエリのうち、ソース側でupdateされたレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),
updates_source_data as (
    select
        *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key,
        updated_at as dbt_updated_at,
        updated_at as dbt_valid_from,
        updated_at as dbt_valid_to
    from {{ ソーステーブル }}
),
updates as (
    select
        'update' as dbt_change_type,
        source_data.*,
        snapshotted_data.dbt_scd_id
    from updates_source_data as source_data
    join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where (
        (snapshotted_data.dbt_valid_from < source_data.updated_at)
    )
)

【処理の概要】
- 履歴が確定していないレコード群を宛先テーブルから抽出
- ソーステーブルから抽出したレコードのdbt_valid_from, dbt_valid_toを現在時刻に設定
- 履歴が確定していないレコードのうち、宛先のdbt_valid_fromより後にupdated_atがソーステーブルに存在するレコードに絞る
- dbt_change_typeを’update’に設定

ソーステーブル側でupdateされたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when matched
  and {{宛先テーブル}}.dbt_valid_to is null
  and {{マージ用tmpテーブル}}.dbt_change_type in ('update')
     then update
     set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- 宛先テーブルの履歴が未確定で、tmpテーブルのdbt_change_typeが’update’の場合
    - 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(現在時刻)に上書き

以下図に表したような処理の流れによって、宛先テーブルの履歴が未確定のデータのうち、ソースで更新が走ったレコードのdbt_valid_toにスナップショット時の日時が入ります。

insert処理:宛先テーブルに無いレコードがソーステーブル側に新規で作成されていた場合

tmpテーブル生成クエリのうち、insert対象のレコードをmerge用レコードに変換する処理の抜粋(クリックで展開)

-- 宛先履歴テーブルから履歴が確定していないレコードを抽出
snapshotted_data as (
    select *,
        -- unique_keyに指定したカラムをdbt_unique_keyとする
        id as dbt_unique_key
    from {{ 宛先テーブル }}
    where dbt_valid_to is null
),

insertions_source_data as (
    select
        *,
        id as dbt_unique_key,
        updated_at as dbt_updated_at,
        updated_at as dbt_valid_from,
        nullif(updated_at, updated_at) as dbt_valid_to,
        to_hex(md5(concat(coalesce(cast(id as string), ''), '|',coalesce(cast(updated_at as string), '')))) as dbt_scd_id
    from {{ ソーステーブル }}
),

insertions as (
    select
        'insert' as dbt_change_type,
        source_data.*
    from insertions_source_data as source_data
    left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where snapshotted_data.dbt_unique_key is null
       or (
            snapshotted_data.dbt_unique_key is not null
        and (
            (snapshotted_data.dbt_valid_from < source_data.updated_at)
        )
    )
),

【処理の概要】
- ソーステーブルのunique_keyにしていたカラムとupdated_atに指定していたカラムを組み合わせてsurrogate_keyを生成
- ソーステーブルに対して履歴未確定の宛先テーブルをLEFT JOINして以下の条件に絞る
    - 宛先テーブルに指定したunique_keyが存在しないが、ソーステーブルには存在するレコード
    - 宛先テーブルに指定したunique_keyのレコードが存在して、ソーステーブル側のupdated_atが宛先テーブルのvalid_fromよりも後のレコード
- dbt_change_typeを’insert’に設定

ソーステーブル側でinsertされたmerge用レコードをmergeするクエリ(クリックで展開)

merge into {{宛先テーブル}}
using {{マージ用tmpテーブル}}
on {{宛先テーブル}}.dbt_scd_id = {{マージ用tmpテーブル}}.dbt_scd_id

when not matched
  and {{マージ用tmpテーブル}}.dbt_change_type = 'insert'
     then insert (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)
     values (`id`, `foo`, `bar`, `created_at`, `updated_at`, `dbt_updated_at`, `dbt_valid_from`, `dbt_valid_to`, `dbt_scd_id`)

【処理の概要】
- dbt_scd_idをキーにして宛先テーブルとマージ用tmpテーブルを結合
- dbt_scd_idがマッチしなくて、dbt_change_type=’update’の場合にinsert処理を実行
- 宛先テーブルのdbt_valid_toをtmpテーブルのdbt_valid_to(現在時刻)に上書き

以下図に表したような処理の流れによって、指定したユニークキーが宛先に存在しないか、履歴が未確定のレコードのうちソース側で前回実行からupdateが走ったものがinsertされます。

check戦略の場合

上で紹介したのは snapshot_strategy=timestamp の場合のスナップショットの挙動であり、ソーステーブル側でupdated_at に指定したカラムが更新された場合に、すべてのプロパティの情報を履歴的に保持するものです。

dbtにはもう一つのスナップショット戦略として、check 戦略があります。

{% snapshot snapshotted_sample_table %}

    {{
        config(
          target_schema='sample_dataset',
          strategy='check',
          unique_key='id',
          invalidate_hard_deletes=True,
            check_cols=[
                            'foo',
                            'bar',
                'created_at',
                'updated_at',
            ],
        )
    }}
    select * from {{ source('sample_dataset', 'sample_data') }}

{% endsnapshot %}

このモデルでは全カラムを選択していますが、特定のカラムの変更のみを履歴的にトラッキングする仕組みです。

strategy=check においても、strategy=timestamp の時と同様に、snapshot処理はtmpテーブルを作成するクエリとmerge処理を実行するクエリに分割されます。

strategy=checkの場合のtmpテーブル作成クエリ(クリックで展開)
strategy=checkの場合のmerge実行クエリ(クリックで展開)

merge実行クエリはstrategy=timestampの時と変わらず、tmpテーブルの生成方法が異なっているので、詳しく見ていこうと思います

check戦略の詳細

insert 用データや update 用データを出力するCTEでは、以下のようなWHERE条件が使用されます。

((
    snapshotted_data.`foo` != source_data.`foo`
    or
    (
        ((snapshotted_data.`foo` is null) and not (source_data.`foo` is null))
        or
        ((not snapshotted_data.`foo` is null) and (source_data.`foo` is null))
    ) 
    or
    snapshotted_data.`bar` != source_data.`bar`
    or
    (
        ((snapshotted_data.`bar` is null) and not (source_data.`bar` is null))
        or
        ((not snapshotted_data.`bar` is null) and (source_data.`bar` is null))
    ) 
    or
    snapshotted_data.`created_at` != source_data.`created_at`
    or
    (
        ((snapshotted_data.`created_at` is null) and not (source_data.`created_at` is null))
        or
        ((not snapshotted_data.`created_at` is null) and (source_data.`created_at` is null))
    )
    or
    snapshotted_data.`updated_at` != source_data.`updated_at`
    or
    (
        ((snapshotted_data.`updated_at` is null) and not (source_data.`updated_at` is null))
        or
        ((not snapshotted_data.`updated_at` is null) and (source_data.`updated_at` is null))
    )
))

この条件により、insert と update の対象となるレコードの抽出条件は次のようになります。

  • insert用データの抽出条件
( 宛先にユニークキーが存在しない ) 
OR
  ( 
    (宛先にユニークキーが存在する)
    AND 
    (ユニークキー以外のcheck_colsに指定したカラムが、宛先とソースで何かしら変化が発生している)
  )
  • update用データの抽出条件
(宛先にユニークキーが存在する)
AND
(ユニークキー以外のcheck_colsに指定したカラムが、宛先とソースで何かしら変化が発生している)

checkで指定されたカラムの変更をどのように追跡しているかを確認できました。

まとめ

今回はdbt snapshotの内部処理をdelete, update, insertの処理に分解して説明してみました。

公式ドキュメントで説明されている通りの処理が生成されるSQLによって行われていることが確認できました。

dbt snapshotを使用している際に、期待した挙動が得られない場合や何かしらエラーが発生したときに、この情報が役立てば幸いです!

We’re Hired

タイミーでは、一緒に働くメンバーを募集しています!!