Timee Product Team Blog

タイミー開発者ブログ

今後の機能開発を快適にするために検索機能をリファクタリングした

こちらはTimee Advent Calendar 2023の13日目の記事です。

タイミーでバックエンドエンジニアをしている @Juju_62q です。

記事内でワーカーさんや事業者さんに関して敬称を省略させていただきます。

タイミーは雇用者である事業者に求人を投稿してもらい、労働者であるワーカーが求人を選ぶという形でマッチングを実現しています。ワーカーが求人を選ぶためにはなんらかの形でワーカーが自分にあった求人を見つけられる必要があります。検索はワーカーが求人を見つけるために最もよく使われる経路です。今回はそんな検索機能において今後の開発をスムーズにするためのリファクタリングを実施した話を紹介します。

続きを読む

dbtに関連する運用の自動化

この記事は Timee Advent Calendar 2023 シリーズ 3 の12日目の記事です。

qiita.com

はじめに

DREグループでデータエンジニアをやっている西山です。

今回は、データ転送まわりの運用自動化について書きます。

タイミーのアプリログが分析できる状態になるまでのリードタイムが長く、効果検証や意思決定に遅れが出ていた問題に対して、dbtに関連する運用を自動化することで改善しました。

タイミーでのアプリログの転送について

タイミーではS3に貯まったサーバーログを定期的にデータ基盤(GCPのBigQuery)へ転送しており、ログがLake層へ追加されるとdbt(data build tool)によって型変換等の加工処理がなされてStaging層へと転送されます。

アプリの機能追加によって新しいログテーブルが追加された場合、dbtの処理追加が必要になるため、GitHub Actionsで定期的にLake層とStaging層でログテーブルの差異をチェックし、Lake層に新しく追加されたテーブルがあれば処理追加を促す通知をslackへ送信していました。

通知を確認したら、以下の対応を行います。

  1. 新しく追加されたテーブルのスキーマを確認
  2. dbtでmodelを作成

    1. 他のログテーブルのmodelをコピー
    2. カラム名やキャスト処理を修正
     {{
         config(
             alias={{ テーブル名を書く }}
         )
     }}
     {% set column_lists = [ {{ カラム名を書く }}] %}
     {{ production_logs_template({{ テーブル名を書く }}, column_lists) }}
     , result_record AS (
         SELECT
             {{ カラム名を書く }},
             dre_transfer_at,
             ROW_NUMBER() OVER (PARTITION BY {{ カラム名を書く }}) AS row_number
         FROM 
             records 
     ), stg_record AS (
         SELECT
             {{ 各カラムの変換処理を書く }},
         FROM
             result_record
         WHERE
             row_number = 1
     )
     SELECT
         *,
         {{ set_record_exported_at(column_lists) }} AS dre__record_exported_at,
     FROM
         stg_record
    
  3. チーム内レビュー

  4. dbtのジョブを実行してStging層にもテーブルを追加

基本的には通知を確認次第、上記の対応を行う方針でしたが、他に優先度の高い障害対応や他チームからの依頼があると後回しになってしまうことも多く、1週間以内にStaging層へ新規ログテーブルを転送する取り決めとなっていました。

しかし、これだとログが分析できるようになるまでのリードタイムが長く、プロダクト開発チームの効果検証や意思決定に遅れが出てしまいます。

そこで、今回はこのリードタイムの短縮を目指すことにします。

実装案

アプリのログに関しては、dbtで実装する加工処理の内容がほぼ決まりきっているため、modelの追加が自動化できそうです。

(例)

  • 末尾にidが付くカラムはINT型へキャスト
  • 末尾にatが付くカラムはJSTのDATETIME型へキャスト

これまで新しいログテーブルを検知していたGitHub Actionsのツール内で、dbtのmodel追加のPRを生成することにします。

実装してみた

大まかにやったこととしては以下の通りです。

  • GitHub AppsによるAPIリクエストの準備
  • pythonで書かれている監視ツールの修正
    • dbtのmodel(sqlファイル)とtest(ymlファイル)の生成処理追加
    • GitHubへのリクエスト処理追加
    • slackへの通知内容修正

GitHub AppsによるAPIリクエストの準備

今回はログテーブルの監視ツールとdbtプロジェクトが格納されているリポジトリが異なったため、GitHub Appsを作成してAPI操作ができるようにします。

  1. 以下の手順に沿ってAppを作成 https://docs.github.com/ja/apps/creating-github-apps/registering-a-github-app/registering-a-github-app
  2. Appの秘密鍵を取得 https://docs.github.com/ja/apps/creating-github-apps/authenticating-with-a-github-app/managing-private-keys-for-github-apps
  3. 認証に必要なINSTALLATION_IDを生成 https://docs.github.com/ja/apps/creating-github-apps/authenticating-with-a-github-app/authenticating-as-a-github-app-installation#generating-an-installation-access-token
  4. 3.で生成したINSTALLATION_IDを使ってinstallation access tokenを発行 今回は監視ツールに以下のクラスを追加しました。
import json
import os
from datetime import datetime, timedelta

import jwt
import requests

class GitHubAppTokenManager:
    def __init__(self) -> None:
        self.GITHUB_APP_ID = os.environ.get("APP_ID")
        self.GITHUB_APP_PRIVATE_KEY = os.environ.get("APP_PRIVATE_KEY")
        self.GITHUB_APP_INSTALLATION_ID = os.environ.get("APP_INSTALLATION_ID")

    def _generate_jwt(self) -> str:
        PEM = (
            self.GITHUB_APP_PRIVATE_KEY.replace("\\n", "\n")
            if self.GITHUB_APP_PRIVATE_KEY is not None
            else None
        )
        utcnow = datetime.utcnow()
        alg = "RS256"
        payload = {
            "typ": "JWT",
            "alg": alg,
            "iat": utcnow,
            "exp": utcnow + timedelta(seconds=60),
            "iss": self.GITHUB_APP_ID,
        }
        return jwt.encode(payload, PEM, algorithm=alg)

    def _generate_jwt_headers(self) -> dict:
        jwt_token = self._generate_jwt()
        return {
            "Authorization": f"Bearer {jwt_token}",
            "Accept": "application/vnd.github.machine-man-preview+json",
        }

    def _fetch_access_token(self) -> str:
        url = f"https://api.github.com/app/installations/{self.GITHUB_APP_INSTALLATION_ID}/access_tokens"
        response = requests.post(url, headers=self._generate_jwt_headers())
        response.raise_for_status()
        return json.loads(response.text).get("token")

    def generate_token_header(self) -> dict:
        token = self._fetch_access_token()
        return {
            "Authorization": f"token {token}",
            "Accept": "application/vnd.github.inertia-preview+json",
        }

ここまでできればAPIのリクエスト準備完了です。

dbtのmodelとtestの生成処理追加

GitHub APIをリクエストする前にコミット対象となるファイル生成の処理を追加します。

まずはmodelのsqlファイルとtestのymlファイルのテンプレートを用意します。

↓modelのテンプレートです。

dbtで使っているJinjaに反応して意図したところで値が置き換わらなくなるので、一部エスケープしています。

やっぱり多少見づらくなるので他に良い方法はないだろうか・・・

{{ '{{' }}
    config(
        alias='{{ table_name }}',
    )
{{ '}}' }}
{{ '{%' }} set column_lists = {{ column_list }} {{ '%}' }}
{{ '{{' }} production_logs_template('{{ table_name }}', column_lists) {{ '}}' }}
, result_record AS (
    SELECT 
        {{ select_list }},
        dre_transfer_at,
        ROW_NUMBER() OVER (PARTITION BY {{ partition_column_list }}) AS row_number
    FROM 
        records 
), stg_record AS (
    SELECT 
        {{ jst_converted_select_list }},
        dre_transfer_at,
        {{ '{{' }} jst_now() {{ '}}' }} AS dre_delivered_at,
    FROM
        result_record
    WHERE
        row_number = 1
)
SELECT
    *,
    {{ '{{' }} set_record_exported_at(column_lists) {{ '}}' }} AS dre__record_exported_at,
FROM
    stg_record

↓testのテンプレートです。ログの一意性を担保するテストを追加します。

version: 2
models:
  - name: TABLE_NAME
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns: COLUMN_LISTS

次に以下のクラスを追加します。

BQから追加対象テーブルのカラム名を取得してgenerate_dbt_file_info関数に渡すことで、先ほど作成したテンプレートを元にファイルが生成されます。

import json
import os
import re
import tempfile
from typing import Any, Dict, List, Tuple

from jinja2 import Environment, FileSystemLoader
from ruamel.yaml import YAML

class DBTModelFileGenerator:
    def _convert_column_with_at(self, column_name: str) -> str:
        return f"TIMESTAMP({column_name})"

    def _convert_column_with_id(self, column_name: str) -> str:
        return f"CAST({column_name} AS INT)"

    def _convert_utc_to_jst(self, column_name: str) -> str:
        return f"{{{{ timestamp_utc2datetime_jst('{column_name}') }}}}"

    def _generate_select_list(self, column_list: List[str]) -> str:
        converted_columns = []

        for col in column_list:
            if col.endswith("_at"):
                converted_columns.append(
                    f"{self._convert_column_with_at(col)} as {col}"
                )
            elif re.search(r"(_id$|^id$)", col):
                converted_columns.append(
                    f"{self._convert_column_with_id(col)} as {col}"
                )
            else:
                converted_columns.append(col)

        return ",\n        ".join(converted_columns)

    def _generate_jst_converted_select_list(self, column_list: List[str]) -> str:
        converted_list = []
        for col in column_list:
            if col.endswith("_at"):
                converted_list.append(f"{self._convert_utc_to_jst(col)} as {col}")
            else:
                converted_list.append(col)
        return ",\n        ".join(converted_list)

    def _generate_staging_sql(self, table_name: str, column_list: List[str]) -> str:
        select_list = self._generate_select_list(column_list)
        partition_column_list = ", ".join(column_list)
        jst_converted_select_list = self._generate_jst_converted_select_list(
            column_list
        )

        # テンプレートのロード
        file_loader = FileSystemLoader("テンプレートファイルのパス")
        env = Environment(loader=file_loader)
        template = env.get_template("template_stg_production_logs.sql")

        # テンプレートのレンダリング
        sql = template.render(
            table_name=table_name,
            column_list=column_list,
            select_list=select_list,
            partition_column_list=partition_column_list,
            jst_converted_select_list=jst_converted_select_list,
        )

        # tempdirにファイルを保存
        output_path = os.path.join(
            tempfile.gettempdir(), f"{table_name}_stg_production_logs.sql"
        )
        with open(output_path, "w") as tmp:
            tmp.write(sql)

        return output_path

    def _generate_staging_test(self, table_name: str, column_list: List[str]) -> str:
        yaml = YAML()
        yaml.indent(sequence=4, offset=2)

        with open(
            "テンプレートファイルのパス",
            "r",
        ) as template_file:
            template_content = yaml.load(template_file)

        template_content["models"][0]["name"] = f"{table_name}_stg_production_logs"
        test_definition = template_content["models"][0]["tests"][0]
        test_definition["dbt_utils.unique_combination_of_columns"][
            "combination_of_columns"
        ] = column_list

        output_path = os.path.join(
            tempfile.gettempdir(), f"{table_name}_stg_production_logs.yml"
        )
        with open(output_path, "w") as tmp:
            yaml.dump(template_content, tmp)

        return output_path

    def generate_dbt_file_info(
        self, new_table_and_column_names: List[Dict[str, Any]]
    ) -> List[Tuple[str, str, str]]:
        files = []

        for record in new_table_and_column_names:
            table_name = record["table_name"]
            record_dict = json.loads(record["record"])
            column_list = list(record_dict.keys())

            sql_file_path = self._generate_staging_sql(table_name, column_list)
            test_file_path = self._generate_staging_test(table_name, column_list)

            files.append(
                (
                    f"追加先のdbt modelのパス/{table_name}_stg_production_logs.sql",
                    sql_file_path,
                    f"Add staging SQL for {table_name}",
                )
            )
            files.append(
                (
                    f"追加先のdbt testのパス/{table_name}_stg_production_logs.yml",
                    test_file_path,
                    f"Add staging test for {table_name}",
                )
            )

        return files

GitHubへのリクエスト処理追加

以下のクラスを追加し、create_pr関数に前段で生成したファイルの情報を渡すことでPRが生成されます。

認証にはAPIのリクエスト準備で発行したinstallation access tokenを使います。

import base64
import json
import re
from typing import Any, Dict, List, Tuple

import jwt
import requests

from deleted_logs.adapter.output.github_app_token_manager import GitHubAppTokenManager

class GitHubPRCreator:
    def __init__(self, session_start_time: str) -> None:
        self.OWNER = "XXX"
        self.REPO = "XXX"
        self.BASE_BRANCH_NAME = "main"
        numeric_only_session_start_time = re.sub(r"\D", "", session_start_time)
        self.NEW_BRANCH_NAME = f"feature/add_new_table_of_production_logs_to_staging_{numeric_only_session_start_time}"
        self.github_app_token_manager = GitHubAppTokenManager()

    def _is_branch_present(self) -> bool:
        headers = self.github_app_token_manager.generate_token_header()
        branch_url = f"https://api.github.com/repos/{self.OWNER}/{self.REPO}/git/ref/heads/{self.NEW_BRANCH_NAME}"
        response = requests.get(branch_url, headers=headers)

        if response.status_code == 200:
            return True
        elif response.status_code == 404:
            return False
        else:
            response.raise_for_status()
            return False

    def _create_branch(self) -> None:
        if self._is_branch_present():
            return

        headers = self.github_app_token_manager.generate_token_header()

        # ベースブランチのSHAを取得
        base_ref_url = f"https://api.github.com/repos/{self.OWNER}/{self.REPO}/git/ref/heads/{self.BASE_BRANCH_NAME}"
        response = requests.get(base_ref_url, headers=headers)
        response.raise_for_status()
        sha = response.json()["object"]["sha"]

        # ブランチ作成
        branch_ref_url = (
            f"https://api.github.com/repos/{self.OWNER}/{self.REPO}/git/refs"
        )
        data = {"ref": f"refs/heads/{self.NEW_BRANCH_NAME}", "sha": sha}
        response = requests.post(branch_ref_url, headers=headers, json=data)
        response.raise_for_status()

    def _upload_file_to_repository(
        self, git_file_path: str, local_file_path: str, message: str
    ) -> None:
        headers = self.github_app_token_manager.generate_token_header()

        with open(local_file_path, "r") as file:
            content = file.read()

        encoded_content = base64.b64encode(content.encode()).decode()

        url = f"https://api.github.com/repos/{self.OWNER}/{self.REPO}/contents/{git_file_path}"

        # 同様のファイルが既に存在するか確認
        response = requests.get(url, headers=headers)
        sha = None
        if response.status_code == 200:
            sha = response.json()["sha"]

        data = {
            "message": message,
            "content": encoded_content,
            "branch": self.NEW_BRANCH_NAME,
        }
        if sha:
            data["sha"] = sha

        response = requests.put(url, headers=headers, json=data)
        response.raise_for_status()

    def _push_files(self, files: List[Tuple[str, str, str]]) -> None:
        self._create_branch()

        for git_file_path, local_file_path, message in files:
            self._upload_file_to_repository(git_file_path, local_file_path, message)

    def _generate_pr_title(self, table_names: List[str]) -> str:
        return f"production_logs新規テーブル({','.join(table_names)})追加"

    def _generate_pr_body(self, table_and_column_names: List[Dict[str, Any]]) -> str:
        header = "このPRはdeleted_logsによって自動生成されたものです。\n\n以下のテーブルのステージング処理を追加しています:\n"
        lines = []
        for record in table_and_column_names:
            table_name = record["table_name"]
            record_dict = json.loads(record["record"])
            columns = ", ".join(record_dict.keys())
            lines.append(f"**{table_name}**\nColumns: {columns}\n")
        return header + "\n".join(lines)

    def create_pr(
        self,
        table_names: List[str],
        column_names: List[Dict[str, Any]],
        files: List[Tuple[str, str, str]],
    ) -> str:
        self._push_files(files)

        headers = self.github_app_token_manager.generate_token_header()
        url = f"https://api.github.com/repos/{self.OWNER}/{self.REPO}/pulls"

        title = self._generate_pr_title(table_names)
        body = self._generate_pr_body(column_names)

        data = {
            "title": title,
            "body": body,
            "head": self.NEW_BRANCH_NAME,
            "base": self.BASE_BRANCH_NAME,
        }

        response = requests.post(url, headers=headers, json=data)
        response.raise_for_status()

        return response.json()["html_url"]

こんな感じのPRが生成されました。

※テストで作成したものなのでクローズしてます。

slackへの通知内容の修正

前段で生成したPRのURLを通知メッセージに追加します。

↓テストで飛ばした通知はこんな感じです。

ここで通知されたPRのレビューとマージを行うことで、Staging層のdbt modelが作られるようになりました。

まとめ

今回dbtのmodel生成を自動化したことで、2つの効果が得られました。

1つ目は、トイルの削減です。

これまで以下のフローで対応していましたが、

既存のdbt modelをコピペしてPR作成 → レビュー依頼 → マージ

自動化したことで、

PRを確認してマージ

だけででよくなりました。

元々大した工数はかかっていなかったものの、繰り返し発生する価値のない定型作業を減らすことができました。

そして2つ目は、Staging層へ新規ログテーブルが反映されるまでのラグ短縮です。

自動化したことで、他の対応でひっ迫している際に後回しにされることがなくなり、

これまではログ出力し始めてから分析可能になるまで最長7日かかっていたところ、導入後は大体1日以内で分析可能な状態になりました。

個人的には、タイミーにジョインするまでDevOps的なことがやれていなかったこともあり、運用は地味でつまらないイメージがあったのですが(すみません)、こうやって改善がまわせると運用も面白いなと最近思うようになりました。

We’re Hiring

DREグループではまだまだやっていきたいことがたくさんあるのですが、まだまだ手が足りておら

ず、ともに働くメンバーを募集しています!!

データに係る他のポジションやプロダクト開発などのポジションも絶賛募集中なのでこちらからご覧ください

定例会議の議事録をNotion DBで構造化して、いい感じにした話

この記事は "Timee Advent Calendar 2023" の11日目の記事です。

qiita.com

こんにちは、タイミーのデータ統括部データサイエンス(以下DS)グループ所属の菊地です。 今回は、定例会議の議事録をNotion DBで構造化して、いい感じにした話を紹介したいと思います!

前提

  • タイミーでは社内ドキュメントツールとしてNotionを採用しており、私が担当しているプロジェクトで週1回開催される定例では、議事録をNotion DBとして管理しています。当初は以下のような定例議事録用テンプレートを作成して運用していました。
  • 定例の内容としては、プロジェクト進行上同期的に議論すべきアジェンダを定例出席者が持ち寄って議論し、決定事項とToDoを記載していくような内容となっています。

旧定例議事録テンプレート

上記の定例議事録で感じていた課題

上記の定例議事録用テンプレートから定例の議事録を作成して運用していた際に、以下のような課題を感じていました。

アジェンダ」パート

  • アジェンダが多いと一覧性が悪く、優先度の高いアジェンダを先に話すなどの判断がしづらい
  • アジェンダに関する議論が複数回にわたる際に、議論内容が複数ページに渡り、情報が追いにくい
  • アジェンダが多く、その回の定例で話せず持ち越しになった場合に、次回の議事録に転記する必要がある

「決定事項」パート

  • 何のアジェンダについて、どんな意思決定が行われたのかが追いにくい
  • ある決定事項がどこに書かれていたかを調べたい際に、決定事項が書かれた会の議事録のページを探し出す必要がある

「ToDo」パート

  • 前回の定例で決まったToDoを確認するために、前回の議事録を参照or次回の議事録に転記する必要がある
  • 進行状況が分からない
  • 期限が不明瞭

やったこと

上記で感じていた課題を解消するために、下記のように「構成要素」と「構成要素間の関係」をNotion DBで整理(構造化)し、合わせてNotion DBテンプレートページの作成を行いました。

アジェンダ」パート

  • アジェンダを管理するNotion DB(Agendas)を作成し、議事録ページからはLinkded viewで参照する

「決定事項」パート

  • 決定事項を管理するNotion DB(Decisions)を作成し、議事録ページからはLinkded viewで参照する
  • Agendas DBとrelationsを持たせる。

「ToDo」パート

  • プロジェクトのタスク管理で使用しているNotion DB(ここでは「Tasks」とします)をそのまま流用し、議事録ページからはLinkded viewで参照する
    • 議事録から参照する際は、定例で作成されたToDoタスクだと判別できるようにtagをつけてフィルターを設定して絞り込む
  • Agendas DBとリレーションを持たせる

Notion DBのリレーション

Notion DBのリレーションは以下のような構成になっています。

  • Meetings: 議事録を管理するNotion DBで、Agendas、Decisions、TasksをLinked Viewで参照する
  • Agendas: 会議のアジェンダを管理するNotion DB
  • Decisions: 会議の決定事項を管理するNotion DB
  • Tasks: プロジェクトのタスクを管理するNotion DB
erDiagram
    Meetings
    Agendas ||--o{ Tasks: "1つのAgendaは0以上のTaskを持つ"
    Agendas ||--o{ Decisions: "1つのAgendaは0以上のDecisionsを持つ"
  

Notion DBテンプレートページ

定例議事録と、定例議事録内の各アジェンダのNotion DBテンプレートページを以下のように作成しました。

「Meetings」DBの定例議事録テンプレート

  • 「ToDo」パートの「Tasks」は、「ステータス」が「完了」以外を表示するようにフィルタを設定
  • アジェンダ」パートの「Agendas」は、「解決済み」が「チェックなし」のみを表示するようにフィルタを設定
  • 「決定事項」パートの「Decisions」は、決定事項が定例会議実施日のみに作成されるという想定の元、「作成日時」のフィルタ条件として、定例実施日に設定する運用にする

新定例議事録テンプレート

「Agendas」DBのアジェンダテンプレート

  • 「ToDo」パートの「Tasks」は、フィルタ条件「Agendas」をテンプレートページに設定しておくことで、テンプレートから作成されたページが自動で設定されるように
  • 「決定事項」パートの「Decisions」は、フィルタ条件「Agendas」をテンプレートページに設定しておくことで、テンプレートから作成されたページが自動で設定されるように
    アジェンダテンプレート

結果

定例議事録ページで感じていた課題は、議事録の「構成要素」と「構成要素間の関係」をNotion DBで整理(構造化)することで、以下のように解消することができました。

アジェンダ」パート

  • アジェンダの一覧性が悪く、優先度の高いアジェンダを先に話すなどの判断がしづらい
    • アジェンダがDBのページとして表現されることで、一覧として表示することができ、一覧性が向上
    • → DBは一覧の並び替えができるので、優先度の高いアジェンダを上に持ってくることが可能になった
  • アジェンダに関する議論が複数回にわたる際に、議論内容が複数ページに渡り、情報が追いにくい
  • アジェンダが多く、その回の定例で話せず持ち越しになった場合に、次回の議事録に転記する必要がある
    • → 解決していないアジェンダは自動的に次回に持ち越されるようになり、転記が不要になった

「決定事項」パート

  • 何のアジェンダについて、どんな意思決定が行われたのかが追いにくい
    • アジェンダに紐づけて決定事項をDBとして管理することで、意思決定が追いやすくなった
  • ある決定事項がどこに書かれていたかを調べたい際に、決定事項が書かれた会の議事録のページを探し出す必要がある
    • → 決定事項はDBにまとまっていて、アジェンダも紐づいているので、情報の検索性が向上し、素早く決定事項にたどり着けるようになった

「ToDo」パート

  • 前回の定例で決まったToDoを確認するために、前回の議事録を参照or次回の議事録に転記する必要がある
    • → ToDoはLinked viewとして次回定例に引き継がれるので、前回の議事録を参照する必要性も、次回の議事録に転記する必要性もなくなった
  • 進行状況が分からない
    • → DBとして表現することで進行状況をpropertyとして表現することが可能になり、進行状況を追えるようになった
  • 期限が不明瞭
    • → DBとして表現することで、期限をpropertyとして表現することが可能になり、期限が明確になった

まとめ

今回は、定例会議の議事録をNotion DBで構造化して、いい感じにした話を紹介しました。

タイミーではNotionをフルに活用しており、私が所属しているデータ統括部DSグループでも、スクラム運用や開発チケット管理など、さまざまな場面でNotionを活用して業務を効率化しています。

今後も定期的に、Notionの活用について発信していきたいと思います!

We’re Hiring!

タイミーのデータ統括部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

「話を聞きたい」と思われた方は、是非一度カジュアル面談でお話ししましょう!

インシデントコマンダーになる前に経験したこと 〜 システム障害が起きて、それからどうする?

この記事は Timee Advent Calendar 2023 シリーズ 2 の10日目の記事です。

qiita.com

こんにちは! @lucky_pool です。 タイミーでプロダクトマネージャーをしています。

はじめに

何らかのシステム障害が起こったとき、サービスを利用するあらゆる人に影響が出て、普段通りにサービスを利用できなくなってしまいます。そんな状況になった際、 “なんとかする” しかありません。

私はプロダクトマネージャーという役割で働いていますが、サービスのコード修正をすることや、データ変更のオペレーションをすることはありません。また、過去や新規に開発された機能や仕様をすべて熟知しているわけでもありません。ですが、障害対応においてインシデントコマンダーを担うことが何度かありました。

そこで、私がタイミーでインシデントコマンダーをやった経験から、一般的に役立ちそうな内容ををここでは紹介したいと思います。

インシデントコマンダーは誰でもなれる

ここでは、障害対応をなんとかする人を「インシデントコマンダー」と呼びます。

PagerDuty Incident Response *1 *2を参照すれば、インシデントコマンダーの説明は以下のとおりです。※ この記事はとても良い内容でして、なんと邦訳版としても公開されております。*3 *4 ありがたい!

Keep the incident moving towards resolution.

(意訳) インシデントを解決に向かわせ続ける人

そして次のようにも説明があります。

You don't need to be a senior team member to become an IC, anyone can do it providing you have the requisite knowledge (yes, even an intern!)

(意訳) インシデントコマンダーになるには、シニアメンバーになる必要はなく、必要な知識があれば誰でもなれる(もちろん、インターンでも)

そうです、やり方さえ分かれば誰でもなれるとのことです。私も、タイミーでのインシデントコマンダーの経験上、これは一定「確かにそう」と思っています。

PagerDuty のドキュメントには Requisite knowledge (必要な知識, 予備知識) と説明がありますが、実際は所属するチームや会社によって、対応できる権限や関係者、関係するツールが異なります。方法論を知ることは必要ですが、それらの知識をベースにした “経験” が必須だと考えています。故に実行することができます。

実際に私がどんな経験をしたかをかいつまんで説明します。

事前に経験したこと

私が入社したのは2022年10月です。約1年ちょと前です。入社後から(今もですが)、システム障害等、何らか問題が起きたとき、その事象を “なんとかしたい” という気持ちだけは持っていました。

そんな気持ちからか、いつのまにか以下のことを経験していました。

①障害対応に何度もオブザーブする

タイミーでは、何らかのシステム障害が観測されたとき、Slackの WF にて報告されることになっています。そして @障害報_通知グループ なるグループに通知が飛ぶようになっています。

障害報WFで投稿された内容

  • この通知グループは、障害報告を知りたい人が多く入っており、例えば、プロダクト開発の関係者だけではなく、カスタマーサポート、広報、管掌役員なども入っています
  • また、メンション対象にいなくとも、投稿先のチャンネルは、全社への情報発信をする場所であり 数100人以上が見ているチャンネルだったりします

私はこの通知を受け取る一人となりました。WFによって自動的に Google Meet のURLを提示してくれるため、作業担当者が会話しはじめます。このGoogle Meet に入ることで状況を知るようにしました。

その過程で大まかな障害対応の手順がわかるようになっていきました。

例えば以下の通り。

  1. 影響範囲の調査
    • 誰に影響があるのか、何に影響があるのかを調べます
    • システム的な影響(xxx のエラーが xxx 件ある等)だけでなく、問合せ状況(xxxに関する問合せが、通常よりxxx件多い)、また手元での確認状況(xxxxの操作をするとxxxxになる)を把握するのに努めます
  2. 暫定対応方針の検討と実行
    • 顧客説明方針
      • 広範囲な障害であれば、なんらか顧客周知をすることを検討しなければなりませんが、そうではなく、社内一部業務フローのみの影響であれば、関係者に対し周知する方針にします
    • システム対応方針
      • できる限り早く、影響範囲を小さくできる方針を検討します
      • revert するほうが手っ取り早ければそうしますが、そうはいかない場合は、なんらかコード差分を作りデプロイが必要になるかもしれません
  3. 定期的な情報共有
    • 1および2の対応ステータスをリアルタイムに更新していきます、そうすることで、解消見込み時間などが分かり、安心する人たちもでてきます
  4. オンコール対応の収束
    • 暫定対応を講じ、影響範囲を少なくすることに成功した場合、チームは解散します
    • もし土日や深夜であれば、何らかの残対応があっても、翌営業日にすることがあるでしょう
  5. ポストモーテムの実施及び恒久対応の計画
    • 恒久的な再発防止策だけでなく、プロセスの改善の検討と実施を計画します

②障害対応時のインシデントコマンダーの補佐をする

障害対応状況をオブザーブしていると、インシデントコマンダーを補佐できるようなことがいくつかあります。例えば以下の対応ができます。

対応状況の記録を取る

  • タイミーの場合はドキュメントは Notion にまとめており、障害対応時においてもNotion ページに情報を集めていきます
  • インシデントコマンダーは話をすること、情報を整理することに集中するため、文字列に整理することは難しいことがあり、故にこの補佐をすることはとても頼りになります
  • また以下にも関連しますが、関係者にライブで状況を伝える役割にも一助になります

関係者にライブで状況を伝える

  • WF で生成されたスレッドに、今の対応状況を箇条書きで投下しました
    • 例えば
      • 影響範囲がわからず、その調査を開始した、解消は未定
      • 影響範囲は未確定だが規模として xxx が見込まれることがわかった
      • 対応方針を検討しているが、大きく xxxx と xxxx の方針がでている
      • xxx の方針を取ったため、 xxxx に解消が見込まれる
  • これらのただの書き殴りだとしても、関係者にとっては有用な情報になります
  • 障害対応時のタイムラインとして後で役に立ちます

関係者にメンションし呼び出してくる

  • 対応に必要な人が Meet に来ていない場合は、容赦なくメンションします
  • インシデントコマンダーや作業者が「xxxさん、xxxxチームに来て欲しい」という発言があれば、すぐに「じゃ、私が呼びますね」と対応します。
  • インシデントコマンダーや作業者が解決に向かうことに集中してもらうことに役立てます
  • また、作業者が複数チームに分かれる場合もあるため、両チームのハブとなるような動きをしてインシデントコマンダーの補佐ができます
    • 例:
      • A: 障害原因の調査および対処方針を検討するチーム
      • B: 顧客周知方針を検討するチーム

③ポストモーテムにオブザーブする

タイミーにおいて、障害対応をしたあとは関係者でポストモーテムを実施しています。もし、障害対応をオブザーブしていなかったり何らかの補佐をしていなくても、どのような障害があり、対応がされていたのかが理解できるため、ポストモーテムの参加からでも良い経験になると思います。

ポストモーテムにおいては、再発防止だけでなく、障害対応のプロセスの改善についても会話がされます。例えば、私が参加したポストモーテムでは以下のような会話がありました。

  • xxx さん(xxxチーム) にて早期に報告があったのはいい動きだった!
  • 以前に対応した xxx によって、今回の対応が早期に解決できてよかった!
  • 今回 xxx の対応ができたことによって、顧客問い合せが 0件 で影響範囲を少なくできた!

もちろん、このような良い会話だけではなく、 xxx をすることによって、もっと効果的にできるというような具体的なアクションにつながるものもあります。

システム障害、それは突然やってくる

誰もが対応できる時間帯にやってくるとは限りません。なんとかする間には、サービスのあらゆる利用者に対して真摯に説明する必要がありますし、可能な限りはやく解消するしかありません。故に、事前にこういった経験をしておくと焦らずにすむと思います。また、事前にある程度経験した上で、書籍*5も読むと体系だった知識としても理解しやすくなるかもしれません。

本稿が転ばぬ先の杖として、参考になれば幸いです。

BigQueryにおけるdbtの増分更新についてまとめてみた

はじめに

Timeeのカレンダー | Advent Calendar 2023 - Qiitaの12月8日分の記事です。

okodooooooonです

BigQueryの料金爆発。怖いですよね。

dbtでの開発が進んでたくさんのモデルを作るようになると、デイリーのビルドだけでも凄まじいお金が消えていったりします(僕はもう現職で数え切れないくらいやらかしてます)。

コストの対策として「パーティショニング」「クラスタリング」などが挙げられますが、今回は「増分更新」の観点で話せたらと思います。

「dbtのmaterialized=’incremental’って増分更新できておしゃれでかっこよくてコストもなんとなく軽くなりそう!」くらいの認識でさまざまな失敗を経てきた僕が、BigQueryにおけるincrementalの挙動を説明した上で、タイミーデータ基盤における増分更新の使い方についてまとめたいと思います。

※この記事はdbt + BigQueryの構成に限定した内容となります。BigQuery以外のデータウェアハウス環境では今回紹介する2つ以外のincremental_strategyが用いられるので、そちらご注意ください!

BigQueryの増分更新の挙動についておさらい

dbtはincremental処理において、BigQuery向けに mergeinsert_overwrite という2つのincremental_strategy(増分更新のやり方のオプション)を提供しています。

名前だけ見てもちんぷんかんぷんなのでそれぞれ解説していこうと思います。

MERGE戦略

MERGEはBigQueryにおけるデフォルトのincremental方針となります。

dbtのconfigブロックの定義は以下のようになります。

{{
 config(
        materialized='incremental'
        , unique_key='xx_key'
    )
}}

incremental_strategy = mergeの時に実際にBigQueryに発行されるMERGE文はこんな感じになります。

merge into `xxxx_project`.`xxx_dataset`.`atesaki_table` as DBT_INTERNAL_DEST
using (
  {{model_sql}}
) as DBT_INTERNAL_SOURCE
on (
  DBT_INTERNAL_SOURCE.ユニークキー1 = DBT_INTERNAL_DEST.ユニークキー1
    AND
  DBT_INTERNAL_SOURCE.ユニークキー2 = DBT_INTERNAL_DEST.ユニークキー2
  ...
  ...
)
-- ユニーク指定キーがマッチングした場合
when matched then update set
  ソース側の行でマッチングした宛先テーブルの行を上書き
-- 宛先テーブルにユニーク指定キーが存在しなかった場合
when not matched then insert
  ソース側の行を追加

この際のON句の中身が問題で

on (
  DBT_INTERNAL_SOURCE.ユニーク指定キー1 = DBT_INTERNAL_DEST.ユニーク指定キー1
    AND
  DBT_INTERNAL_SOURCE.ユニーク指定キー2 = DBT_INTERNAL_DEST.ユニーク指定キー2
  ...
)

といった式になっているので、宛先テーブルの全件を走査して、マッチする列がないかを確認しています。宛先テーブルが巨大になればなるほど、クエリコストが問題となってきます。

概念図で示すとこんな感じ

INSERT+OVERWRITE戦略

dbtのconfigブロックは以下のようになります。

{{config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partitions=var('last_31_days'), # dbt_project.ymlで定義した日付のリストが格納された変数
    partition_by={
        'field':'xx_datetime',
        'data_type': 'datetime',
        'granularity': 'day'
    }
)}}

発行されるMERGE文は以下のような形。参考:https://docs.getdbt.com/reference/resource-configs/bigquery-configs#the-insert_overwrite-strategy

create temporary table {{ model_name }}__dbt_tmp as (
  {{ model_sql }}
);

declare dbt_partitions_for_replacement array<date>;
-- ソーステーブルから上書きすべき日付の配列をdbt_partitions_for_replacementにセット
set (dbt_partitions_for_replacement) = (
    select as struct
        array_agg(distinct date(max_tstamp))
    from {{ model_name }}__dbt_tmp
);

merge into {{ destination_table }} AS DEST
using {{ model_name }}__dbt_tmp AS SRC
on FALSE -- on Falseにすることでキー同士のマッチングは発生しない

-- 宛先テーブルの該当パーティションを削除する。参考:https://cloud.google.com/bigquery/docs/using-dml-with-partitioned-tables?hl=ja#using_a_filter_in_the_search_condition_of_a_when_clause
when
    not matched by source 
  and
  {{ partition_column }} in unnest(dbt_partitions_for_replacement)
then delete

-- ↑で削除した後にソースをinsertする
when not matched then insert ...

指定したパーティションを削除した後に、ソーステーブルをinsertするような処理をしています。

ソースにも宛先にもfull scanが走らないので、クエリコストとしては軽くなります

概念図で示すとこんな感じ

増分更新ロジックの使い分けについて

上記のロジックを理解した上で増分更新についてまとめると以下のようになります。

  • incremental_strategy=mergeのユースケース
    • ソーステーブルが巨大で宛先テーブルが小さい場合の増分更新
      • ソーステーブルにフルスキャンを走らせることなくコストを抑えた状態で、ユニークを保って増分更新ができます
  • incremental_strategy=insert_overwriteのユースケース
    • 宛先テーブルが巨大な場合の増分更新
      • ソーステーブル, 宛先テーブルともにフルスキャンを走らせることなくコストを抑えた状態で増分更新できます

insert_overwriteの注意点

insert_overwriteではdbt_project.ymlにて以下のように静的なリストとしてパーティション指定のリスト変数を作成した後に

last_2_days : [
      current_date('Asia/Tokyo')
      , date_sub(current_date('Asia/Tokyo'), interval 1 day)
  ]

以下のように

{{config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partitions=var('last_2_days'),

partitionsに渡す必要があります

上の直近2日間のpartitions指定だと、もし何かしらの要因でJobが3日間転んだりしていたら、3日目に復旧したとしても

障害1日目:レコード無。障害2日目:レコード有。障害3日目:レコード有

といった形の歯抜けテーブルになってしまいます。障害を踏まえたバッファ分を余裕を持って指定しておくことが大切です。

insert_overwriteはパーティションをごっそり入れ替えるようなロジックであるため、ユニークがこのロジックだけでは担保されません。これによって生じる想定される不具合を見てみましょう。

例えば「ユーザーごとの月毎と日毎の両方のステータスを保持するようなテーブル」を想定してみます。

月単位のステータスが12/9で petapetapotepoteに変更されたとします。

この時に直近1週間をpartitionsに指定するinsert_overwriteで増分更新をすると仮定すると12/9の更新で以下のようになります。

このように直近1週間のパーティションをそのまま置き換えただけなので、12/1, 12/2の月単位のステータスがpetapetaのままになってしまい、同じ月に2つの月単位ステータスを持つ形になってしまいます。このようなケースを防ぐには、partitionsに「直近1ヶ月分の日付のリスト」を渡してあげる必要があります。

過去の値が変わりうるテーブル(月末の締め処理で金額が変動しうる会計処理とか?)に対してinsert_overwriteを適用するには、「どの期間をpartitionsに渡せば安全なのか」を考えることがとても重要になります!

タイミーの増分更新の対象について

挙動を正確に理解できたところでタイミーデータ基盤における増分更新のユースケースを見ていきましょう!

  • mergeを使用して増分更新しているケース
    • firebaseなどのログテーブルから一部種別のログだけを抽出するようなテーブル
    • 巨大なソーステーブルから集約した値を蓄積するようなテーブル
  • insert_overwriteを使用して増分更新しているケース
    • 宛先がソースと比較して膨大になるsnapshot factテーブル
    • その他ログテーブルなどから作られる巨大なfactテーブル

こんな形で当社では現状は使い分けています。 実装難度がinsert_overwriteの方が高いので、mergeで済むケースはmergeにしちゃってます。

おわりに

BigQueryにおけるincrementalの挙動と、それぞれのメリデメ、なぜコストが下がるのかあたりを解説しました。

incrementalに関する公式Docを見てもなんとなく言ってることわかりそうでわからない。

MERGE文などを通して公式Docなどで紹介されているが、普段SELECT文を書くことが主となってしまって、DMLにあたるMERGE文は読みづらいしわかりづらい。などといった方々の参考になれたら嬉しいです。

We’re Hiring

タイミーのデータ統括部はやることがまだまだいっぱいで仲間を募集しています!興味のある募集があればこちらから是非是非ご応募ください。

https://hrmos.co/pages/timee/jobs

私個人としてはこちらのアナリティクスエンジニアの募集への応募をとてもお待ちしております!

https://hrmos.co/pages/timee/jobs/16822514041183191415

RecBoleでサクッとレコメンドアルゴリズムの検証をしてみた

こんにちは、データ統括部データサイエンス(以下DS)グループ所属の小関 (@ozeshun)です。

本記事では、タイミーで取り組んでいるレコメンドに使用するアルゴリズムを検証する際に活用した、RecBoleでの実験方法について紹介したいと思います。

Timee Advent Calendar2023の12月8日分の記事です。

RecBoleとは

RecBoleとは、レコメンドアルゴリズムを統一されたインターフェースで提供する事を目的としたプロジェクトであり、後述のようにアルゴリズム間の比較を簡単に実現出来ます。2023/12/8現在、91種類のアルゴリズムが実装されており、Pythonのライブラリ*1として公開されています。 実装されているアルゴリズムは、Model Introductionから確認できます。

今回は、実装されているアルゴリズムの中でもexplicitなフィードバックを予測すること*2を目的とした、Context-aware RecommendationアルゴリズムをRecBoleを使用して検証する一連の流れを紹介したいと思います。

RecBoleを活用したアルゴリズムの実験手順

0. ディレクトリ構成

  • 今回は、以下のようなディレクトリ構成の元、ノートブック上で実験を進める手順を説明します。
├── notebook.ipynb
├── artifact
│   ├── saved # 学習したモデルの保存先
│   │   ├── AFM-%m-%d-%Y_%H-%M-%S.pth
│   │   ├── DeepFM-%m-%d-%Y_%H-%M-%S.pth
│   │   ├── FM-%m-%d-%Y_%H-%M-%S.pth
│   │   ├── NFM-%m-%d-%Y_%H-%M-%S.pth
│   └── train_data # 学習用のデータの保存先. pickle fileは、Atomic fileに変換する際のソース.
│       ├── interact.pkl
│       ├── items.pkl
│       ├── users.pkl
│       ├── train_dataset # RecBoleが学習で使用する、Atomic fileの保存先
│           ├── train_dataset.inter
│           ├── train_dataset.item
│           └── train_dataset.user
├── base_dataset.py # データセットのI/Oをコントロールするクラスのベース (詳細はStep.2に記述)
├── config
    ├── model.hyper # 探索したいハイパーパラメータと探索範囲を記述したファイル
    └── train.yaml # RecBoleで使用する、学習方法などを記述したconfig file

1. 学習データの準備とRecBoleで使用するconfig fileの用意

  • 学習用データの準備

    • 今回は、Context-awareなモデルを学習することを目的としているので、下記のように、user_id、item_id、explicitなフィードバックを表すカラム、ユーザー・アイテムの特徴量を含む pandas.DataFrame形式のオリジナルデータを用意します。

        user_id item_id target user_feature item_feature
      1 1 0 0 -1.0 2000
      2 1 2 1 0.5 3000
      3 2 1 1 -0.8 4000
      4 2 2 0 0.0 5000

  • RecBole用のconfig fileの用意

    • このファイルには、データの保存先などの環境の設定、使用するデータに関する情報、学習方法や評価方法の設定を記述します。
# config/train.yaml
# 使用するモデル名とデータセット名を指定-----------------------------------
model: FM
dataset: train_dataset

# Environment Settings-----------------------------------------------
# https://recbole.io/docs/user_guide/config/environment_settings.html
gpu_id: 0
use_gpu: False
seed: 2023
state: INFO
reproducibility: True
data_path: 'artifact/train_data/'
checkpoint_dir: 'artifact/saved/'
show_progress: True
save_dataset: True
save_dataloaders: False

# Data Settings------------------------------------------------------
# https://recbole.io/docs/user_guide/config/data_settings.html
# Atomic File Format
field_separator: "\t"
seq_separator: "@"

# Common Features
USER_ID_FIELD: user_id
ITEM_ID_FIELD: item_id
LABEL_FIELD: target

# Selectively Loading
load_col:
    # interaction
    inter: [user_id, item_id, target]
    # ユーザー特徴量
    user: [
        user_id,
        user_feature,
    ]
    # アイテム特徴量
    item: [
        item_id,
        item_feature,
    ]

# Preprocessing
# 標準化する特徴量を指定
normalize_field: [
    item_feature,
]

# Training Setting---------------------------------------------------
# https://recbole.io/docs/user_guide/config/training_settings.html
epochs: 100
train_batch_size: 1024
learner: 'adam'
train_neg_sample_args: ~
eval_step: 1
stopping_step: 3
loss_decimal_place: 4
weight_decay: 0

# Evaluation Settings------------------------------------------------
# https://recbole.io/docs/user_guide/config/evaluation_settings.html
eval_args:
    group_by: user
    split: {'RS': [0.8, 0.1, 0.1]}
    mode: labeled
repeatable: True
metrics: ['LogLoss', 'AUC']
topk: 20
valid_metric: LogLoss
eval_batch_size: 1024
metric_decimal_place: 4
eval_neg_sample_args: ~
  • 次のステップでAtomic fileに変換する際のソースとなるように、用意したDataFrameをpkl形式で保存

    • 下記のコードをノートブック上で実行すると、ユーザー×アイテムのインタラクション、ユーザーの特徴量、アイテムの特徴量を抽出したデータセットARTIFACT_PATH配下に保存されます。
import os
import yaml
import pandas as pd

# データの読み込み (データソースはなんでも良い)
train_df = pd.read_csv('/path/to/train.csv')

# yaml形式で書かれたRecBoleのcofig fileを読み込む
TRAIN_YAML_PATH = 'config/train.yaml'
with open(TRAIN_YAML_PATH, 'r') as yaml_file:
    train_config = yaml.safe_load(yaml_file)

# cofig fileから各データセットに使用するカラム名を抽出
INTERACTION_COLUMNS = train_config['load_col']['inter']
USER_COLUMNS = train_config['load_col']['user']
ITEM_COLUMNS = train_config['load_col']['item']
TOKEN_COLUMNS = [
    'item_id',
    'user_id',
]

# Atomic fileに変換する際のソースデータとしてpkl形式で保存
ARTIFACT_PATH = 'artifact/train_data/'
train_df[INTERACTION_COLUMNS].to_pickle(os.path.join(ARTIFACT_PATH, 'interact.pkl'))
train_df[ITEM_COLUMNS].to_pickle(os.path.join(ARTIFACT_PATH, 'items.pkl'))
train_df[USER_COLUMNS].to_pickle(os.path.join(ARTIFACT_PATH, 'users.pkl'))

2. 学習データをAtomic file *3 へ変換

  • 変換時に使用する、データセットのI/Oをコントロールするクラスをノートブック上に記述
    • このクラスには、インプット・アウトプット先や、使用するカラムの情報とそのデータ型を記述しておきます。
# https://github.com/RUCAIBox/RecSysDatasets/blob/master/conversion_tools/src/base_dataset.py をそのままimportして継承
from base_dataset import BaseDataset

class TrainDataset(BaseDataset):
    def __init__(self, input_path, output_path):
        super(TrainDataset, self).__init__(input_path, output_path)
        self.dataset_name = 'train_dataset'

        # input_path
        self.inter_file = os.path.join(self.input_path, 'interact.pkl')
        self.item_file = os.path.join(self.input_path, 'items.pkl')
        self.user_file = os.path.join(self.input_path, 'users.pkl')

        self.sep = ','

        # output_path
        output_files = self.get_output_files()
        self.output_inter_file = output_files[0]
        self.output_item_file = output_files[1]
        self.output_user_file = output_files[2]

        # selected feature fields
        inter_fields = {
            i: f'{c}:token' if c in TOKEN_COLUMNS else f'{c}:float' for i, c in enumerate(INTERACTION_COLUMNS)
        }

        item_fields = {i: f'{c}:token' if c in TOKEN_COLUMNS else f'{c}:float' for i, c in enumerate(OFFER_COLUMNS)}

        user_fields = {i: f'{c}:token' if c in TOKEN_COLUMNS else f'{c}:float' for i, c in enumerate(USER_COLUMNS)}

        self.inter_fields = inter_fields
        self.item_fields = item_fields
        self.user_fields = user_fields

    def load_inter_data(self):
        return pd.read_pickle(self.inter_file)

    def load_item_data(self):
        return pd.read_pickle(self.item_file)

    def load_user_data(self):
        return pd.read_pickle(self.user_file)
  • 下記コードをノートブック上で実行して、ARTIFACT_PATH配下に格納したデータセットをAtomic file形式に変換
ARTIFACT_PATH = 'artifact/train_data/'
# 前のステップで保存したデータセットの保存先
input_path = ARTIFACT_PATH
# Atomic fileの書き出し先
output_path = os.path.join(ARTIFACT_PATH, 'train_dataset')
i_o_args = [input_path, output_path]

# 前のステップで作成したI/Oクラスにinput,outputの情報を渡す
datasets = TrainDataset(*i_o_args)
# DatasetをAtomic fileに変換
datasets.convert_inter()
datasets.convert_item()
datasets.convert_user()

3. モデルの学習

  • モデルを学習する関数をノートブック上に定義
    • パラメータチューニングの各種設定については、コメントで記述したページに詳しく書いてあります。
from recbole.quick_start import objective_function, run_recbole
from recbole.trainer import HyperTuning

# (再掲) 事前に準備したRecBoleのcofig fileへのパス
TRAIN_YAML_PATH = 'config/train.yaml'
# 探索したいハイパーパラメータと探索範囲を記述したファイルへのパス
HYPER_PARAMS_PATH = 'config/model.hyper'


def train_model(model_name: str, config_file_list: str = TRAIN_YAML_PATH, params_file: str = HYPER_PARAMS_PATH) -> None:
    # ハイパーパラメータチューニングの条件を設定
    # 参考: https://recbole.io/docs/user_guide/usage/parameter_tuning.html
    hp = HyperTuning(
        objective_function=objective_function,
        algo='bayes',
        early_stop=3,
        max_evals=15,
        params_file=params_file,
        fixed_config_file_list=config_file_list,
    )
    # チューニングを実行
    hp.run()
    # print best parameters
    print('best params: ', hp.best_params)
    # print best result
    print('best result: ')
    print(hp.params2result[hp.params2str(hp.best_params)])

    # bestなパラメータを取得
    parameter_dict = {
        'train_neg_sample_args': None,
    } | hp.best_params

    # bestなパラメータでモデルを学習
    run_recbole(
        model=model_name,
        dataset='train_dataset',
        config_file_list=config_file_list,
        config_dict=parameter_dict,
    )
  • 探索したいハイパーパラメータとその探索範囲をmodel.hyperというファイルに記述して用意
# config/model.hyper (使用アルゴリズムがFMの場合)
learning_rate choice [0.1, 0.05, 0.01]
embedding_size choice [10, 16, 32]
  • 定義した学習用の関数に試したいアルゴリズム名を指定して、学習を実行するとtrain.yamlに記述したcheckpoint_dir配下に学習済のモデルが保存されます。
# FM
train_model('FM')
# NFM
train_model('NFM')
# AFM
train_model('AFM')
# DeepFM
train_model('DeepFM')

4. 学習したモデルの検証

  • 学習したモデルでテストデータに対する予測値を計算し、その評価指標を算出する関数をノートブック上に定義
import torch
from recbole.data.interaction import Interaction
from recbole.quick_start import load_data_and_model


def eval_model(
    model_file_name: str,
    model_saved_dir: str = train_config['checkpoint_dir'],
    target: str = train_config['LABEL_FIELD'],
    user_columns: list = USER_COLUMNS,
    item_columns: list = ITEM_COLUMNS,
    token_columns: list = TOKEN_COLUMNS,
):
    # 学習したモデルとテストデータを読み込み
    _, model, _, _, _, test_data = load_data_and_model(model_file=os.path.join(model_saved_dir, model_file_name))

    columns = user_columns + item_columns + [target]
    # テストデータをモデルが予測出来る形式に変換
    interactions = {}
    test_df = pd.DataFrame([])
    for c in columns:
        test_features = torch.tensor([])
        for data in test_data:
            test_features = torch.cat([test_features, data[0][c]])
        if c in token_columns:
            test_features = test_features.to(torch.int)
        interactions[c] = test_features
        if c in ['user_id'] + [target]:
            test_df[c] = test_features

    test_interaction_input = Interaction(interactions)

    # テストデータに対する予測結果を作成
    model.eval()
    with torch.no_grad():
        test_result = model.predict(test_interaction_input.to(model.device))
    test_df['pred'] = test_result

    # テストデータに対するランキングメトリクス、AUC, Loglossを算出する関数を実行 (今回は実装は割愛)
    # 現状のRecBoleの仕様だとmode: labeledで学習した場合、ランキングメトリクスを指定できないので、自前で計算する必要があります

    return test_df
  • 定義した検証用の関数に学習済のモデルのファイル名を入れて実行することで、テストデータに対する予測結果とメトリクスが計算されます。
# FM
test_df = eval_model('FM-%m-%d-%Y_%H-%M-%S.pth')
# NFM
test_df = eval_model('NFM-%m-%d-%Y_%H-%M-%S.pth')
# AFM
test_df = eval_model('AFM-%m-%d-%Y_%H-%M-%S.pth')
# DeepFM
test_df = eval_model('DeepFM-%m-%d-%Y_%H-%M-%S.pth')
  • 作成されたtest_dfにはランキングメトリクスが計算出来るようにuser_id、真値、予測値が書き込まれています。

      user_id target pred
    1 1 1 0.9
    2 1 1 0.7
    3 1 0 0.3
    4 1 0 0.1

  • test_dfを元にテストデータに対するメトリクスを計算することで、アルゴリズム間の比較検証が出来ます。

      ROC-AUC Logloss Recall@20 MAP@20 MRR@20
    FM 0.865 0.105 0.491 0.489 0.577
    NFM 0.843 0.110 0.502 0.470 0.540
    AFM 0.865 0.103 0.507 0.487 0.568
    DeepFM 0.862 0.101 0.523 0.522 0.598

おわりに

今回は、多様なレコメンドアルゴリズムを検証できるRecBoleを活用した実験の手順について紹介しました。

この記事が、レコメンドアルゴリズム構築に関わる方々の助けに少しでもなれたら嬉しいです!

We’re Hiring!

タイミーのデータ統括部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

「話を聞きたい」と思われた方は、是非一度カジュアル面談でお話ししましょう!

*1:PyPI: https://pypi.org/project/recbole/

*2:例: CTR予測など

*3:RecBoleが学習に用いるデータ形式

メンバーの相互理解を深めるためにDSグループでやっていること

この記事はTimee Advent Calendar 2023の7日目の記事です。

qiita.com

こんにちは、データ統括部データサイエンス(以下DS)グループ所属の小栗です。

本記事では、メンバーの相互理解を深めるためにDSグループで取り組んでいる施策を紹介します。

そもそもの課題感

以下の要素により、DSメンバー間の相互理解が今後難しくなりそう…という課題感が当時あり、諸々の施策をスタートさせました。

  • フルリモート前提の働き方をしている
  • チームメンバーの数がすごい勢いで増えてきた(1年で2.5倍に)
  • メンバーがそれぞれ担当する部署横断PJがいくつも並行に走っており、逆にチーム内での接点が少なくなってきた

メンバーの相互理解を深めるためにやっていること

データ統括部やDSグループで取り組んでいる取り組みは他にもたくさんありますが、今回は以下の2つに絞って紹介します。

  1. スキルマップ共有会
  2. ストレングスファインダー共有会

スキルマップ共有会

スキルマップとは、業務に関係あるハードスキルとそれに対する各メンバーの習熟度を可視化するツールです。

スキルマップの作成・共有をする目的は主に2つあります。

  1. 各メンバーが持つハードスキルを把握し、相互理解に繋げる
  2. 業務に関係あるスキルと、そのスキルに自信がある人を把握できるようにする

具体的には、下図のようなスキルマップをスプレッドシートで簡単に作成しました。

スキルマップの一例

社内や社外のスキルマップを参考にしつつも、タイミーのDSグループに強く関係するスキルや技術を独自にピックアップしています。

具体的には、「データサイエンス」「データエンジニアリング」「ビジネス」「アカデミックドメイン」「業界ドメイン」の軸でそれぞれ10~20個ほど要素(≒ スキル)を選定し、軸ごとにスキルマップを作成しています。

また、スキルの習熟度とは別に、「今後伸ばしたい」スキルも可視化するようにしています。

工夫した点(というか難しい点)として、各スキルの習熟度に対して厳密な基準を設けない形にしています。

厳密な基準を設けるのは難しいこと、そして、相互理解が目的なのでメンバーそれぞれの各スキルに対する「自信度」がザックリわかればいい、といった理由から、現在の形で運用しています。

定期的に各メンバーはスキルマップへの記入を行います。

その後に共有会を開催し、1人ずつ自分のスキルを発表して、それに対する感想や質問をする、という形で運用しています。

共有会で各メンバーの保有スキルや今後伸ばしたいスキルが分かるため、それを通して相互理解が深まり仕事がしやすくなる、という想いで運用しています。

ストレングスファインダー共有会

ストレングスファインダーは、米国のギャラップ社が開発した「強みの診断」ツールです。

WEB上で診断を受けると、34の資質の中から自分の強みや資質を知ることができます。

スキルマップはハードスキルに着目していましたが、ストレングスファインダーではソフトスキルに焦点が当たっており、DSグループでは棲み分けをする形でどちらも運用しています。

新メンバーがオンボーディングの過程で診断を受けるフローになっており、メンバーが増えるごとに都度共有会を開いています。

共有会では、1人ずつ自分の強みを発表して、それに対する感想や質問をする、という形で賑やかに開催しています。

また、診断結果を盲目的に信じて決めつけたコミュニケーションをするのではなく「この診断結果は的確/そうでもない」といった会話も挟むなど、あくまで診断結果を踏み台にして生まれる会話によって相互理解を深めています。

ストレングスファインダーの診断結果を通して、普段の業務では把握できない各人の「強み」や「考え」がわかるのが魅力だと思っています。

※データ統括部BIチームのyuzukaさんが執筆したストレングスファインダーに関するアドベントカレンダー記事もあるため、そちらもぜひご一読ください!

おわりに

この記事では、DSグループが実施しているメンバー相互理解のための取り組みを紹介しました。

データ統括部およびDSグループは今後もメンバーを増員する予定なので、施策をさらにアップデートしていきたいと考えています!

We’re Hiring!

タイミーのデータ統括部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

「話を聞きたい」と思われた方は、是非一度カジュアル面談でお話ししましょう!