Cloud WorkflowsとCloud Tasksを使ったバッチ処理
株式会社一休 / kentana20
開発部長 / EM / 従業員規模: 301名〜500名 / エンジニア組織: 51名〜100名
事業形態 |
---|
B to C |
事業形態 | B to C |
---|
アーキテクチャ
.png?disposition=inline)
アーキテクチャの意図・工夫
- Scheduler はトリガーのみ。Cloud Workflowsの呼び出し用エンドポイントがあるので、これを呼び出すだけにしています
- Workflows で API 呼び出しとCloud Tasksへのキュー登録をしています。YAMLで簡単にワークフローの定義を書けるので、Terraformに定義を書いて誰でもメンテナンス可能にしています
- Cloud Tasks ではホテル・旅館単位で並列実行して、スループットを確保しています
1つ1つの処理を小さく、疎結合にしたことでリトライや異常発生時の影響範囲を小さく保てています。
導入の背景・解決したかった問題
導入背景
一休.com 宿泊では業務に必要なデータ作成・更新を行うバッチ処理が数多く存在します。そのほとんどがASP.NET (C#/VB) で作られており
- 開発スピードが出ない
- バッチ処理の開発に慣れているメンバーが限られている
という課題を抱えていました。
この状況の中で新たに「ホテル・旅館の売れ筋プランを日次で洗替えしてデータ更新する」という処理を新たに作る必要が出てきました。
比較検討したサービス
検討案 | 主な構成 | 特徴 |
---|---|---|
① 新たにバッチ基盤を内製 | VM + ジョブスケジューラ | ゼロからバッチ処理基盤を準備する必要がある。 運用負荷が大きい。 |
② マネージドサービスを組み合わせる (採用したのはこちら) | Cloud Workflows + Cloud Tasks + Web API | サーバーレスで実装が最小。 並列処理が容易。 |
当初は「新しい開発基盤を作ってバッチ処理をスピーディに開発できるようにする」(つまり①)を考えていましたが、CTO に壁打ちをしたところ
- 新しい開発基盤を作る前に、そもそもこれはバッチで作るのがベストなのか
- 一休.com宿泊では、歴史的経緯*1から、オンライン処理できないものをほとんどバッチで作っている
- 現在では、そもそもバッチでまとめて処理せずに、非同期化・分散処理をする選択肢もある
- バッチで作るのが本当にベストなのか、ほかの選択肢も含めて検討したほうがよい
といったフィードバックをもらいました。 これを踏まえて
- 今回実施したい作業はシンプルな処理の組み合わせで実現可能であること
- 並列、分散処理を考えやすい要件であること(ホテル・旅館単位で処理しても問題ない)
といった理由から、最終的に②を選択しました。
*1:非同期ジョブキューの仕組みがない時代に作られたバッチが多く残っています
比較した軸
今回は
- ホテル、旅館ごとに売れ筋の商品(プラン)を更新する
- 更新頻度は日次、まとめて一括で更新する必要はない(ホテル・旅館単位で順次更新すればOK)
- 更新タイミングは比較的柔軟に決めて良い
という要件だったので
- マネージドなサービスを使うことで監視、エラー発生時のリトライなど運用コストを低く保てること
- ホテル・旅館単位で並列・分散処理がしやすいこと
を重視しました。
選定理由
- Cloud Workflows の YAML 定義だけで複雑な手続きを記述できること
- Cloud Tasks が自動リトライ・スロットリングを担い、並列化が簡単に行えること
- 一休.com 宿泊では、Google Cloudを使った事例が少なかった(積極的に活用していこうと考えていた)
といった点から導入を決めました。
Cloud Workflowsはワークフローに定義された処理順(ステップ)に従って
- Google Cloudのサービスを実行する
- 任意のHTTPエンドポイントにリクエストする
などを実行することができます。 公式ドキュメント にも日次のバッチジョブの例が載っており、バッチ処理がユースケースの1つとなっています。
ワークフローはYAML形式で実行したい内容(ステップ)を記述します。今回作ったワークフローのイメージは以下です。
main:
steps:
- init:
assign:
- queueName: "cloud-tasks-queue-name"
- getTargetHotels:
call: http.get
args:
url: "https://api.example.com/hotels"
auth:
type: OIDC
query:
target: true
result: hotelData
- createCloudTasks:
palallel:
for:
in: ${hotelData.body.hotels}
value: hotel
steps:
- createTask:
call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
args:
parent: "projects/${sys.get_env('GOOGLE_CLOUD_PROJECT_ID')}/locations/${sys.get_env('LOCATION')}/queues/${queueName}"
body:
task:
httpRequest:
httpMethod: "PUT"
url: "https://api.example.com/hotels/${hotel.id}/popular"
headers:
Content-Type: "application/json"
oidcToken:
serviceAccountEmail: ${"application@" + projectId + ".iam.gserviceaccount.com"}
この例だと getTargetHotels
のステップで、Web APIへリクエストして対象のホテル・旅館を取得しています。 auth にOIDCを指定していますが、これによってWorkflowsからのAPIリクエストにAuthorizationヘッダが付与されるので、Web API側でID Tokenを検証することでWorkflowsからの正当なリクエストであることを保証できます。
導入の成果
キュー処理の設定を見直したあとは、ほぼエラー・リトライもなく安定して稼働しています。
項目 | Before | After |
---|---|---|
運用・障害対応 | バッチ処理が異常終了すると、多くの場合に手動で再実行が必要 | Cloud Tasks の自動リトライにより手動での再実行は不要 |
ログ出力 | 各バッチ処理でログ出力してモニタリングが必要 | Cloud Loggingで確認可能。Slack通知も簡単に連携可能。 |
導入時の苦労・悩み
Cloud Tasksの設定が適切ではなく、Web APIへの単位時間あたりのリクエスト数が多すぎてWeb APIのレスポンスが遅くなるという事象が発生しました。これに対して
- 最大ディスパッチ数
- 最大同時ディスパッチ数
などを調整しました。
導入に向けた社内への説明
上長・チームへの説明
一休では、細かに説明 → 承認などの重厚なプロセスはほとんどないので、説明の苦労みたいなものはありませんでした。前述したとおり、どう進めようかを考えているときにCTOに壁打ちしながら方針を決められた点は良かったと考えています。
また「どう説明したか」という観点からは少しズレますが、宿泊サービスに存在する多くのバッチ処理について、マネージドなサービスに切り替えるための1つの事例を作ることができたと思っています。
活用方法
- Cloud Schedulerで毎日夜間にワークフローを自動実行しています
- Workflowsが失敗したときには Cloud Logging → Slack通知を送るようにしているので、異常があったことを把握できるようにしています
Cloud Tasksでのリトライ機構もあり、ほぼ手動での対応をすることなく運用できています。
よく使う機能
Cloud Workflowsのコンソールから
- Workflowsの各ステップの処理結果
- 各処理のログ
を確認できるため、トラブルシューティングの際に利用しています。
ツールの良い点
- マネージドなサービスを組み合わせることで最小限のインフラで構成できています
- バッチ処理基盤を構築することなく YAML 設定のみで簡単なワークフローを組めます
- ID トークン(OIDC)を使うことで安全にAPI呼び出しやCloud Tasksの作成ができています
ツールの課題点
- 大規模なワークフローになると、Workflows のYAMLをメンテナンスするのが大変になると思います
- Cloud Workflowsの課題ではありませんが、Cloud Tasksのキュー設定によって過度なリクエストがAPIサーバに飛んでしまうリスクがあるので、キュー設定を適切に調整する必要があります
ツールを検討されている方へ
OIDC 認証を使う場合は、検証用のミドルウェアを先に用意しておくとスムーズだと思います。
今回は予めWeb APIでOIDCトークンの検証をするためのミドルウェアを Golang で実装しました。
以下はトークンを検証するための実装サンプルです。
package main
import (
"fmt"
"net/http"
"strings"
"google.golang.org/api/idtoken"
)
// IDトークンを検証するミドルウェア
func AuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Authorization ヘッダからBearerトークンを取得
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header is required", http.StatusUnauthorized)
return
}
token := strings.TrimPrefix(authHeader, "Bearer ")
// IDトークンの検証
_, err := idtoken.Validate(r.Context(), token, "")
if err != nil {
// トークンの検証に失敗した場合はエラーを返す
http.Error(w, "Invalid ID Token", http.StatusUnauthorized)
return
}
// トークンが有効であれば、次のハンドラーを呼び出す
next.ServeHTTP(w, r)
})
}
株式会社一休 / kentana20
開発部長 / EM / 従業員規模: 301名〜500名 / エンジニア組織: 51名〜100名
2006年5月に一休に入社。 2018年より宿泊事業の開発マネジメントとして一休宿泊サービスの開発、開発組織のマネジメント、採用などを担当しています。
株式会社一休 / kentana20
開発部長 / EM / 従業員規模: 301名〜500名 / エンジニア組織: 51名〜100名
2006年5月に一休に入社。 2018年...
レビューしているツール
目次
- アーキテクチャ
- 導入の背景・解決したかった問題
- 活用方法