An automated data pipeline using Lambda, S3 and Glue - Big Data with Cloud Computing

Описание к видео An automated data pipeline using Lambda, S3 and Glue - Big Data with Cloud Computing

For more details , you can refer this documentation:
https://docs.aws.amazon.com/glue/late...

Steps followed in this use-case:
1)Create Source s3 bucket where JSON will be landed
2)Create destination s3 bucket where csv will be written
3)Create role for AWS Lambda with cloud-watch , s3 , Glue permission
4)Creating Lambda Function which will be triggered by s3 object create event & will trigger glue job , this lambda will have the role specified in step 3
5)Creating role for Glue Job with cloud-watch , s3 full access
6)Create Glue Job with role created in step 5

Code for AWS Glue:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def read_nested_json(df):
column_list = []
for column_name in df.schema.names:
if isinstance(df.schema[column_name].dataType, ArrayType):
df = df.withColumn(column_name,explode(column_name))
column_list.append(column_name)
elif isinstance(df.schema[column_name].dataType, StructType):
for field in df.schema[column_name].dataType.fields:
column_list.append(col(column_name + "." + field.name).alias(column_name + "_" + field.name))
else:
column_list.append(column_name)
df = df.select(column_list)
return df

def flatten(df):
read_nested_json_flag = True
while read_nested_json_flag:
df = read_nested_json(df);
read_nested_json_flag = False
for column_name in df.schema.names:
if isinstance(df.schema[column_name].dataType, ArrayType):
read_nested_json_flag = True
elif isinstance(df.schema[column_name].dataType, StructType):
read_nested_json_flag = True;
return df;

def main():
@params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["VAL1","VAL2"])
file_name=args['VAL1']
bucket_name=args['VAL2']
print("Bucket Name" , bucket_name)
print("File Name" , file_name)
input_file_path="s3a://{}/{}".format(bucket_name,file_name)
print("Input File Path : ",input_file_path);

df = spark.read.option("multiline", True).option("inferSchema", False).json(input_file_path)
df1=flatten(df)
df1.coalesce(1).write.format("csv").option("header", "true").save("s3a://destinationflattenjson/{}".format(file_name.split('.')[0]))

main()



Code for AWS Lambda:

import json
import boto3


def lambda_handler(event, context):
file_name = event['Records'][0]['s3']['object']['key']
bucketName=event['Records'][0]['s3']['bucket']['name']
print("File Name : ",file_name)
print("Bucket Name : ",bucketName)
glue=boto3.client('glue');
response = glue.start_job_run(JobName = "s3_lambda_glue_s3", Arguments={"--VAL1":file_name,"--VAL2":bucketName})
print("Lambda Invoke ")

Check this playlist for more AWS Projects in Big Data domain:
   • Demystifying Data Engineering with Cl...  

Комментарии

Информация по комментариям в разработке