Cloud Functionsでpythonのランタイムで関数を作成しているが、処理や変数/定数が増えていくと1つのファイルが肥大化していく。
そのためファイルを分割して処理を見やすいものに変更した。
処理内容としてはStorageにアップロードしたファイル(csv)をトリガーにBigQueryのテーブルにインポートする処理を書いた。
requirements.txt
google-cloud-storage >= 1.14.0
google-cloud-bigquery >= 1.11.2
gcp_conf.py
from google.cloud import storage, bigquery
# project_id
PROJECT_ID = 'test_project'
# BQへの登録対象ファイル
TARGET_BUCKET = 'test_bucket'
TARGET_FILE = 'test.csv'
# BQ情報
DATASET_ID = 'test_dataset'
TARGET_TABLE = 'test_table'
TARGET_TABLE_SCHEMA = [
bigquery.SchemaField("id", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("create_date", "DATE", mode="NULLABLE"),
]
main.py
from google.cloud import storage, bigquery
from datetime import datetime
import sys
# テーブルなどの定義
import gcp_conf as GCP_CONF
def load_data(data, context):
# 'data' reference is here:
# https://cloud.google.com/storage/docs/json_api/v1/objects?hl=ja
# アップロードファイルの拡張子チェック
file_name = data['name']
file_ext = file_name.split('.')[-1]
if file_ext != 'csv':
print('Nothing To Do')
sys.exit
uri = 'gs://' + GCP_CONF.TARGET_BUCKET + '/' + file_name
# GCPクライアントを設定
client = bigquery.Client()
storage_client = storage.Client(GCP_CONF.PROJECT_ID)
table_ref = client.dataset(GCP_CONF.DATASET_ID).table(GCP_CONF.TARGET_TABLE)
job_config = bigquery.LoadJobConfig()
job_config.schema = GCP_CONF.TARGET_TABLE_SCHEMA
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND # 追記
job_config.skip_leading_rows = 0
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
# API実行
load_job = client.load_table_from_uri(
uri, table_ref, job_config=job_config
)
print("Starting job {}".format(load_job.job_id))
try:
load_job.result()# Waits for table load to complete.
except Exception as e:
for error in load_job.errors:
print("Error detail [message]", error.get("message"))
raise e
print("Job finished.")
bigquery.WriteDisposition
に続く指定子でテーブルのレコードを追記するか上書きするかを指定できる。(WriteDisposition)