[Python]Cloud Functionsでファイルを分割する

Cloud Functionsでpythonのランタイムで関数を作成しているが、処理や変数/定数が増えていくと1つのファイルが肥大化していく。
そのためファイルを分割して処理を見やすいものに変更した。

処理内容としてはStorageにアップロードしたファイル(csv)をトリガーにBigQueryのテーブルにインポートする処理を書いた。

requirements.txt

google-cloud-storage >= 1.14.0
google-cloud-bigquery >= 1.11.2

gcp_conf.py

from google.cloud import storage, bigquery
# project_id
PROJECT_ID = 'test_project'
# BQへの登録対象ファイル
TARGET_BUCKET = 'test_bucket'
TARGET_FILE = 'test.csv'
# BQ情報
DATASET_ID = 'test_dataset'
TARGET_TABLE = 'test_table'
TARGET_TABLE_SCHEMA = [
    bigquery.SchemaField("id", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("create_date", "DATE", mode="NULLABLE"),
]

main.py

from google.cloud import storage, bigquery
from datetime import datetime
import sys
# テーブルなどの定義
import gcp_conf as GCP_CONF

def load_data(data, context):
    # 'data' reference is here:
    # https://cloud.google.com/storage/docs/json_api/v1/objects?hl=ja    

    # アップロードファイルの拡張子チェック
    file_name = data['name']
    file_ext = file_name.split('.')[-1]
    if file_ext != 'csv':
        print('Nothing To Do')
        sys.exit

    uri = 'gs://' + GCP_CONF.TARGET_BUCKET + '/' + file_name
    # GCPクライアントを設定
    client = bigquery.Client()
    storage_client = storage.Client(GCP_CONF.PROJECT_ID)
    table_ref = client.dataset(GCP_CONF.DATASET_ID).table(GCP_CONF.TARGET_TABLE)

    job_config = bigquery.LoadJobConfig()
    job_config.schema = GCP_CONF.TARGET_TABLE_SCHEMA
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND # 追記
    job_config.skip_leading_rows = 0
    # The source format defaults to CSV, so the line below is optional.
    job_config.source_format = bigquery.SourceFormat.CSV
    # API実行
    load_job = client.load_table_from_uri(
        uri, table_ref, job_config=job_config
    )  
    print("Starting job {}".format(load_job.job_id))
    try:
        load_job.result()# Waits for table load to complete.
    except Exception as e:
        for error in load_job.errors:
            print("Error detail [message]", error.get("message"))
        raise e
    print("Job finished.")

bigquery.WriteDispositionに続く指定子でテーブルのレコードを追記するか上書きするかを指定できる。(WriteDisposition)

参考

簡単なPythonのパッケージを作る方法

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

three + nine =

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください