ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering

Описание к видео ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering

In this video, I will show , how we can create a ETL pipeline that processes CSV file uploaded to GCS bucket, using Bigquery Spark Stored Procedure and cloud function.
This is simple Data Engineering Project use case.


Cloud function code:

import functions_framework
from google.cloud import bigquery

Triggered by a change in a storage bucket
@functions_framework.cloud_event
def hello_gcs(cloud_event):
data = cloud_event.data


bucket = data["bucket"]
name = data["name"]


print(f"Bucket: {bucket}")
print(f"File: {name}")

client=bigquery.Client()
query_string="""CALL `{PROJECT_ID}.spark_proc_dataset.spark_proc`("{}","{}")""".format(bucket,name)
query_job=client.query(query_string)

-------------------------------------
APACHE SPARK STORED PROCEDURE

CREATE OR REPLACE PROCEDURE spark_proc_dataset.spark_proc(bucket STRING, file STRING)
WITH CONNECTION `{CONNECTION_ID}`
OPTIONS(engine="SPARK",runtime_version="2.1")
LANGUAGE PYTHON AS R'''
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count

spark=SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
bucket_name=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.bucket"]))
file=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.file"]))

file_uri='gs://{}/{}'.format(bucket_name,file)

customers=spark.read.csv(file_uri,inferSchema=True,header=True)
customers_filtered=customers.filter(~col("Country").isin(["Tuvalu"]))
customers_agg=customers_filtered.groupby("Country").agg(count("Customer_Id").alias("Customer_count"))

customers_agg.write.mode("append").format("bigquery").option("temporaryGcsbucket","spark_bq_temp_321").save("output_dataset.customer_agg")
'''

Комментарии

Информация по комментариям в разработке