• Gavin In The Cloud
  • Posts
  • Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc

Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc

Unlocking Real-Time Data Processing: A Comprehensive Guide to GCS to BigQuery Integration with Event Arc and Cloud Functions

Event Driven Cloud Function: Load GCS Files to BigQuery with Event Arc

Introduction:

In this blog, we will walk through a comprehensive example of triggering a Cloud Function using Event Arc whenever a file is uploaded to Google Cloud Storage (GCS). The primary goal of this Cloud Function is to load the uploaded file into a BigQuery table, following an event-driven pattern.

This scenario is a common use case in the Google Cloud community, and we aim to provide a real-world example along with valuable links to assist the community further.

Use Case Overview:

Here's a breakdown of the use case presented in this article:

  1. A JSON file representing job failure data is uploaded to Google Cloud Storage.

  2. A Cloud Function is triggered in real-time.

  3. This Cloud Function saves the Cloud Storage file into BigQuery using the Python BigQuery client.

To achieve this sequencing, we deploy the Cloud Function with Event Arc and configure the following filters:

  • Bucket: The name of the bucket where the file is uploaded.

  • Type: The event type that triggers the Cloud Function only after the file has finished uploading to the bucket.

Project Structure:

The project is organized as follows:

  • Project Name: event-arc-trigger-function

  • Functions Folder: Contains various Cloud Functions.

  • saving_job_failures_bq Folder: Houses the logic for the Cloud Function discussed in this article.

Exploring the Cloud Function Code and Logic:

The Python Cloud Function follows this convention at the root folder:

  • main.py: Contains the function code logic.

  • requirements.txt: Lists the required PyPi packages for the function.

Here's the code from main.py:

# Import necessary libraries
import pathlib
from typing import Dict
import functions_framework
from google.cloud import bigquery

@functions_framework.cloud_event
def save_gcs_file_to_bq_function(cloud_event):
    data: Dict = cloud_event.data
    event_id = cloud_event["id"]
    event_type = cloud_event["type"]
    bucket = data["bucket"]
    name = data["name"]
    table_id = "gb-poc-373711.monitoring.job_failure"
    
    # Print event details
    print(f"Event ID: {event_id}")
    print(f"Event type: {event_type}")
    print(f"Bucket: {bucket}")
    print(f"File: {name}")

    # Initialize BigQuery client
    client = bigquery.Client()

    # Define schema path
    current_directory = pathlib.Path(__file__).parent
    schema_path = str(current_directory / "schema/job_failure.json")

    # Load schema from JSON
    schema = client.schema_from_json(schema_path)

    # Configure BigQuery job
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    )

    uri = f"gs://{bucket}/{name}"

    # Load file into BigQuery table
    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )

    load_job.result()

    print("#######The GCS file was correctly loaded to the BigQuery table#######")

The save_gcs_file_to_bq_function function serves as the entry point and is based on the Google functions_framework and cloud_event. Here's a summary of its functionality:

  • It takes cloud_event as a parameter.

  • Event fields are retrieved using cloud_event["field"].

  • The payload is obtained through cloud_event.data, which contains GCS event data.

  • The code uses the Python BigQuery client to load the GCS file into BigQuery.

  • The JSON schema for loading is located at save_gcs_file_to_bq_function/schema/job_failure.json.

Requirements:

The requirements.txt file lists the necessary packages:

functions-framework==3.3.0
google-cloud-bigquery==3.5.0
  • functions-framework is required because the Cloud Function is based on this framework.

  • google-cloud-bigquery is needed to use the Python BigQuery client.

Deploying the Cloud Function with Event Arc:

Before deploying the Cloud Function, ensure your gcloud SDK is up-to-date:

gcloud components update

Creating a Service Account:

Create a User Service Account for the function and the trigger (instead of using the Default Compute Service Account).

export PROJECT_ID=my-project-id

SERVICE_ACCOUNT="$(gsutil kms serviceaccount -p ${PROJECT_ID})"

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role='roles/pubsub.publisher'

Assign the necessary roles to this Service Account.

Deploying the Function:

At the root of your project, deploy the Cloud Function from your local machine using the following command:

gcloud functions deploy saving-job-failures-bq \
  --gen2 \
  --region=europe-west1 \
  --runtime=python310 \
  --source=functions/saving_job_failures_bq \
  --entry-point=save_gcs_file_to_bq_function \
  --run-service-account=sa-cloud-functions-dev@gb-poc-373711.iam.gserviceaccount.com \
  --trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
  --trigger-event-filters="bucket=event-arc-trigger-function" \
  --trigger-location=europe-west1 \
  --trigger-service-account=sa-cloud-functions-dev@gb-poc-373711.iam.gserviceaccount.com

Here's a breakdown of the command:

  • gen2: Allows using Cloud Function V2, and Event Arc is used by default for V2.

  • runtime: Indicates the Python runtime (Python 3.10 in this case).

  • source: Specifies the root folder of the function.

  • entry-point: Represents the entry point of the function (save_gcs_file_to_bq_function).

  • run-service-account: Uses the User Service Account created earlier to run the Cloud Function.

  • trigger-event-filters: Configures event filters based on the event type and bucket name that will trigger the function.

After successfully creating the function, it will appear in the dedicated menu in the Google Cloud Console.

Uploading Files and Triggering the Function:

To test the Cloud Function, upload a JSON file containing job failure data to the specified GCS bucket (event-arc-trigger-function in this case). You can use the following command to copy the file:

gsutil cp input_files/input_failures.json gs://event-arc-trigger-function

The file format should be NEWLINE_DELIMITED_JSON, which is supported by BigQuery.

This action will trigger the Cloud Function, and you can access the logs to monitor the process.

The name of the BigQuery table where the data is loaded is monitoring.job_failures.

Finally, deploy the Cloud Function using Terraform or the gcloud command as shown earlier.

Useful Links

Documentation:
Code:

All the code shared in this article is accessible in my GitHub-Repo

Conclusion

This article demonstrated how to create an end-to-end Event Driven Cloud Function with Event Arc. This system provides a powerful way to trigger functions following actions in Google Cloud Storage.

While the documentation is comprehensive, it can be scattered. We hope this real and end-to-end use case provides additional help to the community.