Glueを用いてAuroraからBigQueryへのデータ転送処理を高速化した話

f:id:sumzap_engineer_blog:20200731210609p:plain

はじめに

ゲームアプリを運営していく上で、データの分析は必ず必要になります。
弊社には分析専門のチームがあって、各種ツールを使って分析をしているわけですが、その元となるデータは主にDBのデータです。
今回はゲームアプリのDBから分析用のデータを抽出、格納していく方法を改善した話を書いていこうと思います。

前提

ゲームアプリケーション側のDBはAmazon Web Services(以下、AWS)のAuroraを使用しています。
しかし分析チームが分析に使用するデータストアは、過去のプロジェクトを踏襲してGoogle Cloud Platform(以下、GCP)のBigQueryを使っています。
そのため、AuroraからBigQueryにデータを転送する必要があります。

データの取得は1日1回、深夜に前日分のデータを取得していますが、データの抽出方法は2パターンあります。

  • 追加レコード抽出
    レコードに対してupdateがかからないログデータの様なテーブルは、前日にinsertされたレコードのみ抽出します。
    抽出するデータは1日分のinsertされたレコードのみで良いので、容量的にはそれほど気になりません。

  • 全レコード抽出
    レコードに対してupdateがかかるテーブルについては、日ごとのデータの変化を分析するために、テーブル内のレコード全件を取得します。
    そのため容量は大きくなりますし、転送完了までの時間もかかります。

データの出力の際は、DBのテーブル定義に変更があった場合でもBigQuery側のテーブル定義も自動で変更されるようなデータ形式にしています。

ちなみにDBからデータを抽出する際には、RDSのAPIである restoreDBClusterToPointInTime を使ってDBのcloneを作成してselectしています。
cloneしたDBのテーブルは追加、更新等のデータの変化がないので、cloneした時点でのデータが取得できます。
万が一データ抽出バッチが途中で止まってしまった場合でも、残ったcloneに対してバッチを再実行すればclone時点のデータを再取得する事が出来ます。

これまでのデータ転送のやり方

これまでのBigQueryへのデータ転送は、以下のような形で行っていました。

  1. AWS BatchでPHPで書かれたデータ収集用バッチを実行し、DBからデータをjson形式で出力、圧縮

  2. 圧縮ファイルをGCPのGoogleCloudStorage(以下、GCS)に転送

  3. GCSからBigQueryにデータを取り込み

f:id:sumzap_engineer_blog:20200731205814p:plain
対策前のデータ収集処理フロー

この構成だとバッチ実行時間が12時間もかかってしまいました。
主な問題点は以下の2点です。

  • バッチ処理にメモリ食いすぎ
    バッチ処理はphpを使って動いています。
    DBのテーブルから任意のデータをselectしてファイル出力する処理を実行しています。
    ここで問題になったのがmemory_limitです。
    上記データ抽出パターンの「全レコード抽出」の処理で、件数が多いテーブルになるとバッチ処理内でmemory_limitを増やしてもmemory不足で処理が途中で止まってしまう事がありました。
    数十GBのmemoryを割り当てても止まってしまう事があったため、今後データが増え続けていくことを考えるとDBからのデータ取得をPHPのバッチ内で行う事が難しくなってきました。

  • 出力ファイルが大きすぎ
    前述のとおり出力ファイルをjson形式にしているため、csv等と比べると必然的にkey分のファイルサイズが大きくなります。
    ファイルサイズが大きくなるという事は、転送にも時間がかかりますし、BigQueryへの読み込みも時間がかかってしまう事になります。

これだけ時間がかかってしまうと、BigQueryを使った分析が出来ず、毎朝出している分析レポートも遅れてしまう事になります。
毎朝の分析レポートは、ユーザーの動向を知る上での重要なデータですので遅れるわけにはいきません。
一旦は必要最低限のテーブルを残して、対象となるテーブルを減らす事で対処しましたが、早急に対応が必要になりました。

対策

上記の問題を解決すべく、以下の対策を取りました。

  • DBからのデータ取得をAWSのGlueを使い、PHPでやっていたデータ抽出からファイル出力までをマネージドサービスに移行する

  • 出力形式をparquetにしてS3へファイル出力し、GCSにparquetファイルを転送して、GCSからBigQueryにデータを読み込む

f:id:sumzap_engineer_blog:20200731205511p:plain
対策後のデータ収集処理フロー

以下詳しく説明していきます。
なお、今回の対応では特に時間がかかっている前述の「全レコード抽出」のみを対象としました。

AWS Glueの利用

AWS GlueはフルマネージドのETL(抽出、変換、ロード)サービスで、データ抽出、変換、ジョブスケジューリングなどの時間のかかる作業を自動化してくれます。
Glueのジョブでは、対象となるテーブルのデータをSQLで取得し、S3にファイル出力、圧縮するまでの処理をpythonで書いています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import lit
import json

arg_keys =  ['JOB_NAME', 'json_str']
args = getResolvedOptions(sys.argv, arg_keys)

(job_name, json_str) = [args[k] for k in arg_keys]

json_data = json.loads(json_str)

# 各種変数の各種変数の代入
bucket_root = json_data['bucket_root']
bucket_dir = json_data['bucket_dir']
jdbc_url = json_data['jdbc_url']
db_user = json_data['db_user']
db_password = json_data['db_password']
db_name = json_data['db_name']
table_list = json_data['table_list']

# 初期化処理
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(job_name, args)

# 1テーブルずつ抽出してS3にparquet形式で出力
for table_name in table_list:
    ds = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
            "url": jdbc_url, "user": db_user, "password": db_password, "dbtable": db_name + '.' + table_name
        })
    df = ds.toDF()

    s3path = bucket_root + "/" + bucket_dir + "/" + db_name + "/" + table_name + "/"
    new_df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save(s3path)

Glueを使うことにより、ジョブの実行に必要なリソースはGlue側で自動で設定され、問題だった「メモリ食いすぎ問題」は気にせずに済むようになり、1テーブル当たりのファイル出力時間を短くする事が出来ます。
またGlueでDBからS3へ直接ファイル出力するため、AWS Batchで使用するインスタンスのディスク容量も気にする必要がなくなります。

ファイルの出力、取り込み形式変更

出力するファイル形式は、json形式と同じようにDBのテーブル定義の追加があった場合にもBigquery側で自動でカラムが追加され、出力ファイルサイズが抑えられるparquet形式に変更します。
これはparquet形式の方がBigQueryにデータを読み込む際に速度が速いという記事を参考にさせて頂きました。 ([Cloud OnAir] BigQuery へデータを読み込む 2019年3月14日 放送 スライド15ページ目)

またGlueはS3にファイルを出力するので、出力されたファイルはS3からGCSに転送してBigQueryに読み込むようにします。
転送するファイルのサイズは、転送時間が短く済みますし、読み込み速度も速くなるので小さい方が望ましいです。
json形式は各値にkeyが含まれるのでファイルサイズが大きくなってしまいますが、parquet形式は列方向にデータのみを保存しているため、ファイルサイズを小さくする事ができます。

対策した結果

対策前後でバッチのトータル時間をテストデータで比較したところ、大幅に時間短縮する事が出来ました。
その要因は以下の2点です。

  • Glueを使うことで、データ抽出とファイル出力部分が速くなった
    データ抽出に必要なメモリー容量やスケーリングをGlueが自動でやってくれることで、PHPで処理するよりも実行時間が格段に速くなりました。

  • 出力ファイル形式をjson形式からparquet形式にしたことで、GCSからBigQueryへのデータロードが速くなった
    parquet形式にしたことでファイルサイズが小さくなり、BigQueryへの読み込みが以前より格段に速くなりました。

結果としてバッチの処理時間は、対策前に比べて約60 ~ 70%短縮する事できました。
一時的に対象から外していたテーブルを元に戻してもトータルの処理時間に余裕ができ、毎朝の分析データを出すタイミングにも間に合うようになりました。
またバッチ時間に余裕が出来たことで、今後分析に必要なテーブルが増えても随時追加することができるようになりました。

今後の課題

特にバッチ実行時間が長くかかっていた「全レコード抽出」タイプのテーブルのみを対象にして対応しました。
Glueではスクリプト内でSQLを指定する事が出来るので、SQLをパラメータにしてGlueジョブに渡すことで抽出条件を指定したデータ出力も可能になります。
「追加レコード抽出」タイプについては、「レコードの作成日時が前日」という抽出条件でGlueジョブを実行すれば同じバッチでデータ取得する事が可能になりますので、今後はこのような方法で対応していく予定です。