Timee Product Team Blog

タイミー開発者ブログ

Vertex AI Pipelinesを効率的に開発するための取り組み

こんにちは、タイミーのデータ統括部データサイエンス(以下DS)グループ所属の小関です。

今回はDSグループがMLパイプライン構築時に活用しているVertex AI Pipelinesを効率的に開発するための取り組みを紹介したいと思います!

Vertex AI Pipelinesとは

Vertex AI Pipelinesとは、Google Cloudが提供しているMLパイプラインをサーバーレスに構築・実行できるサービスです。

Vertex AI Pipelinesを活用することで、下記のようなデータをBigQeuryから取得し、特徴量の作成・データセットの分割後、モデルを学習するようなML パイプラインが比較的容易に構築できます。

Vertex AI Pipelinesで構築したMLパイプラインのサンプル

Vertex AI Pipelinesの活用事例と挙げられた改善点

タイミーのDSグループでは、下記のようなML パイプラインをVetex AI Pipelinesで開発・運用しています。

  • ワーカーに対して、おすすめの募集を出力するパイプライン
  • クライアントの離脱を予測するパイプライン
  • 各種KPIを予測するパイプライン

ML パイプラインを構築していく上で、以下のような改善点が挙げられていました。

  • パイプラインのリポジトリディレクトリ構成や、CI/CDを共通化したい
  • Google Cloud上での処理を共通化し、より使いやすい形で処理を呼び出せるようにしたい
  • 各パイプラインに必ず入れ込む必要があるKubeflow Pipelines*1の記述を共通化したい

Vertex AI Pipelinesを効率的に開発するための取り組み

挙げられた課題に対して、DSグループでは以下の3つの取り組みを行なっています。

1. Vertex AI Pipelines開発用のテンプレートリポジトリの構築

cookiecutterを使用して、パイプライン開発に関わるディレクトリや、CI/CDに用いるyaml, shell scriptを生成してくれるテンプレートを作成しました。パイプラインの開発開始時にこのテンプレートを利用しています。

下記のようにcookiecutterコマンドでプロジェクトを生成し、プロジェクト名・プロジェクトの説明・Pythonのバージョン・作成者を入力することで、開発用のテンプレートが生成されます。

$ cookiecutter [開発用のテンプレートリポジトリのパス]

> project_name [project_name]:
> project_description []:
> python_version [3.10.1]:
> author [timee-dev]:
# Vertex AI Pipelines開発用のテンプレート
.
├── .github
│   ├── PULL_REQUEST_TEMPLATE.md
│   └── workflows
│       ├── CI/CDのyamlファイル
├── .gitignore
├── Makefile
├── README.md
├── pyproject.toml
├── src
│   └── pipeline_template
│       ├── components
│       │   └── component
│       │       ├── パイプラインを構成するコンポーネントのソースコードとDockerfile
│       ├── pipelines
│       │   ├── パイプラインをコンパイル、実行するためのソースコード
│       ├── pyproject.toml
└── tests
    ├── テストコード

2. Google Cloudの処理を集約した社内ライブラリの構築

Google CloudのPythonライブラリをラップして、BigQueryでのクエリ実行・クエリ結果のDataFrame化・テーブルの書き込みや、Cloud StorageにおけるファイルのI/O処理などを行える社内ライブラリを構築しています。こちらの社内ライブラリは、DSグループ全体で保守・運用を行なっており、バージョン管理とデプロイの自動化をした上で、Artifact Registryにプライベートパッケージとして置いて利用しています*2。この社内ライブラリに関しては、Vertex AI Pipelinesに限らずVertex AI WorkbenchやCloud Runなど、他のGoogle Cloudのサービスでの実装でも活用されています。

簡単な利用例として、SQLファイルのクエリを実行して、pd.DataFrameとして取得する処理を紹介します。

# 社内ライブラリからBigQuery関連の処理をまとめているクラスをimport
from [社内ライブラリ名].bigquery import BigQueryClient

# project_idにGoogle Cloudのプロジェクト名、sql_dirにSQL fileを格納しているディレクトリ名を指定
bq_client = BigQueryClient(project_id=PROJECT_ID, sql_dir=SQL_DIR)

# test.sql内でJinjaテンプレートで定義されているパラメーターをquery_paramsで受け取り、クエリの実行結果をpd.DataFrameとして受け取る
test_df = bq_client.read_gbq_by_file(
    file_name='test.sql',
    query_params=dict(loading_start_date=LOADING_START_DATE, loading_end_date=LOADING_END_DATE),
)
-- SQL_DIR/test.sql
DECLARE LOADING_START_DATE DEFAULT '{{ loading_start_date }}';
DECLARE LOADING_END_DATE DEFAULT '{{ loading_end_date }}';

SELECT
  *
FROM
  `project_id.dataset_id.table_id`
WHERE
  event_data BETWEEN LOADING_START_DATE AND LOADING_END_DATE

3. Kubeflow Pipelinesにおいて共通化できる処理を集約した社内ライブラリの構築

コンポーネント間のアーティファクトの受け渡し・Cloud StargeへのI/O処理や、yamlで定義されたコンポーネントの情報を取得してくる処理*3などを行える社内ライブラリを構築しています。こちらもDSグループ全体で保守・運用を行なっており、バージョン管理とデプロイの自動化をした上で、Artifact Registryにプライベートパッケージとして置いて利用しています。

利用例として、学習データを受け取り、それを特徴量とターゲットに分割するコンポーネントにおけるアーティファクトの受け渡し・Cloud StargeへのI/O処理の実装を紹介します。

# pipeline_name/components/component_name/src/main.py
from dataclasses import dataclass

import pandas as pd
from [社内ライブラリ名].artifacts import Artifacts
from [社内ライブラリ名].io import df_to_pickle


@dataclass
class ComponentArguments:

    train_dataset_path: str

@dataclass
class OutputDestinations:

    x_train_path: str
    y_train_path: str


def main(args: ComponentArguments) -> pd.DataFrame:
    train_dataset = pd.read_pickle(args.train_dataset_path)
    x_cols = ['x1', 'x2']
    y_col = ['y']
    X_train, y_train = train_dataset[x_cols], train_dataset[y_col]

    return X_train, y_train


if __name__ == '__main__':
    # アーティファクトのパスを取得
    artifacts = Artifacts.from_args(ComponentArguments, OutputDestinations)

    # インプットとなるアーティファクトのパスを受け取り、main関数を実行
    X_train, y_train = main(artifacts.component_arguments)

    # パイプラインのアーティファクトを管理するCloud Storageのバケットへpickle形式でX_train, y_trainを書き込む
    df_to_pickle(artifacts.output_destinations.x_train_path, X_train)
    df_to_pickle(artifacts.output_destinations.y_train_path, y_train)

取り組みから感じたメリット

上で挙げた取り組みを通じて、グループ全体で感じている主なメリットを紹介していきます。

  • 開発のスピードが上がる。特にパイプラインのテンプレートが用意されている事で、開発の初動が大幅に速くなりました。

  • テンプレートやライブラリを通して、ファイル構成や処理に共通知があるので、メンバー間でのレビューがしやすくなっている。

  • 個別に開発した処理を社内ライブラリに追加していくことで、グループ全体の資産として蓄積している。

  • 属人化されているコードが減っていくので、新規メンバーのキャッチアップがし易くなる。

今回紹介した取り組み以外にも、MLパイプラインのソースコードのモノリポ化などDSグループでは常に社内のMLOps基盤を強固にしていく活動を続けています!

We’re Hiring!

タイミーのデータ統括部では、ともに働くメンバーを募集しています!!

現在募集中のポジションはこちらです!

少しでも気になった方はお気軽にカジュアル面談に申し込みいただけると嬉しいです!

*1:Kubeflow Pipelines SDKで実装したパイプラインをVertex AI Pipelinesで動作させています

*2:Artifact RegistryのプライベートパッケージをPoetryで扱う方法はこちらが参考になります

*3:Kubeflow Pipelinesにおけるパイプラインの定義ファイルで使用します