- Gavin In The Cloud
- Posts
- Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc
Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc
Unlocking Real-Time Data Processing: A Comprehensive Guide to GCS to BigQuery Integration with Event Arc and Cloud Functions
Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc
Introduction:
In this blog, we will walk through a comprehensive example of triggering a Cloud Function using Event Arc whenever a file is uploaded to Google Cloud Storage (GCS). The primary goal of this Cloud Function is to load the uploaded file into a BigQuery table, following an event-driven pattern.
This scenario is a common use case in the Google Cloud community, and we aim to provide a real-world example along with valuable links to assist the community further.

Use Case Overview:
Here's a breakdown of the use case presented in this article:
A JSON file representing job failure data is uploaded to Google Cloud Storage.
A Cloud Function is triggered in real-time.
This Cloud Function saves the Cloud Storage file into BigQuery using the Python BigQuery client.
To achieve this sequencing, we deploy the Cloud Function with Event Arc and configure the following filters:
Bucket: The name of the bucket where the file is uploaded.
Type: The event type that triggers the Cloud Function only after the file has finished uploading to the bucket.
Project Structure:
The project is organized as follows:
Project Name: event-arc-trigger-function
Functions Folder: Contains various Cloud Functions.
saving_job_failures_bq Folder: Houses the logic for the Cloud Function discussed in this article.
Exploring the Cloud Function Code and Logic:
The Python Cloud Function follows this convention at the root folder:
main.py
: Contains the function code logic.requirements.txt
: Lists the required PyPi packages for the function.
Here's the code from main.py
:
# Import necessary libraries
import pathlib
from typing import Dict
import functions_framework
from google.cloud import bigquery
@functions_framework.cloud_event
def save_gcs_file_to_bq_function(cloud_event):
data: Dict = cloud_event.data
event_id = cloud_event["id"]
event_type = cloud_event["type"]
bucket = data["bucket"]
name = data["name"]
table_id = "gb-poc-373711.monitoring.job_failure"
# Print event details
print(f"Event ID: {event_id}")
print(f"Event type: {event_type}")
print(f"Bucket: {bucket}")
print(f"File: {name}")
# Initialize BigQuery client
client = bigquery.Client()
# Define schema path
current_directory = pathlib.Path(__file__).parent
schema_path = str(current_directory / "schema/job_failure.json")
# Load schema from JSON
schema = client.schema_from_json(schema_path)
# Configure BigQuery job
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
uri = f"gs://{bucket}/{name}"
# Load file into BigQuery table
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result()
print("#######The GCS file was correctly loaded to the BigQuery table#######")
The save_gcs_file_to_bq_function
function serves as the entry point and is based on the Google functions_framework
and cloud_event
. Here's a summary of its functionality:
It takes
cloud_event
as a parameter.Event fields are retrieved using
cloud_event["field"]
.The payload is obtained through
cloud_event.data
, which contains GCS event data.The code uses the Python BigQuery client to load the GCS file into BigQuery.
The JSON schema for loading is located at
save_gcs_file_to_bq_function/schema/job_failure.json
.
Requirements:
The requirements.txt
file lists the necessary packages:
functions-framework==3.3.0
google-cloud-bigquery==3.5.0
functions-framework
is required because the Cloud Function is based on this framework.google-cloud-bigquery
is needed to use the Python BigQuery client.
Deploying the Cloud Function with Event Arc:
Before deploying the Cloud Function, ensure your gcloud SDK is up-to-date:
gcloud components update
Creating a Service Account:
Create a User Service Account for the function and the trigger (instead of using the Default Compute Service Account).
export PROJECT_ID=my-project-id
SERVICE_ACCOUNT="$(gsutil kms serviceaccount -p ${PROJECT_ID})"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role='roles/pubsub.publisher'

Assign the necessary roles to this Service Account.

Deploying the Function:
At the root of your project, deploy the Cloud Function from your local machine using the following command:
gcloud functions deploy saving-job-failures-bq \
--gen2 \
--region=europe-west1 \
--runtime=python310 \
--source=functions/saving_job_failures_bq \
--entry-point=save_gcs_file_to_bq_function \
--run-service-account=sa-cloud-functions-dev@gb-poc-373711.iam.gserviceaccount.com \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=event-arc-trigger-function" \
--trigger-location=europe-west1 \
--trigger-service-account=sa-cloud-functions-dev@gb-poc-373711.iam.gserviceaccount.com
Here's a breakdown of the command:
gen2
: Allows using Cloud Function V2, and Event Arc is used by default for V2.runtime
: Indicates the Python runtime (Python 3.10 in this case).source
: Specifies the root folder of the function.entry-point
: Represents the entry point of the function (save_gcs_file_to_bq_function
).run-service-account
: Uses the User Service Account created earlier to run the Cloud Function.trigger-event-filters
: Configures event filters based on the event type and bucket name that will trigger the function.
After successfully creating the function, it will appear in the dedicated menu in the Google Cloud Console.

Uploading Files and Triggering the Function:
To test the Cloud Function, upload a JSON file containing job failure data to the specified GCS bucket (event-arc-trigger-function
in this case). You can use the following command to copy the file:
gsutil cp input_files/input_failures.json gs://event-arc-trigger-function
The file format should be NEWLINE_DELIMITED_JSON
, which is supported by BigQuery.

This action will trigger the Cloud Function, and you can access the logs to monitor the process.
The name of the BigQuery table where the data is loaded is monitoring.job_failures
.
Finally, deploy the Cloud Function using Terraform or the gcloud
command as shown earlier.
Useful Links
Documentation:
Code:
All the code shared in this article is accessible in my GitHub-Repo
Conclusion
This article demonstrated how to create an end-to-end Event Driven Cloud Function with Event Arc. This system provides a powerful way to trigger functions following actions in Google Cloud Storage.
While the documentation is comprehensive, it can be scattered. We hope this real and end-to-end use case provides additional help to the community.