Building Serverless Data Stream pipeline using Kinesis data streams and Firehose for Snowflake

Описание к видео Building Serverless Data Stream pipeline using Kinesis data streams and Firehose for Snowflake

One of the toughest challenges data professionals face today is streaming data for real-time analytics.
A main barrier to real-time insights remains the complexity of the data itself, where companies do not have the tools and infrastructure to ingest and process structured and unstructured data.
Organizations today need a data warehouse that is able to handle all data types and scale quickly to address growth.

Here is one complete pipeline on Velocity component in Big Data where I have explained how to create Streaming pipeline from scratch


Steps:
-----------
Step 1:Create the Lambda Role
Step 2:Create the Lambda Function to read the data from API Gateway & put in Kinesis Data Stream
Step 3: Create API Gateway & make the integration with AWS Lambda created in Step 2
Step 4:Create the Kinesis Data Stream to consume data from AWS Lambda created in Step 2
Step 5:Create Lambda for processing the data before s3 dump
Step 6:Create firehose Destination s3 bucket
Step 7:Create Kinesis Firehose
Step 8:Create Snowflake Role


Lambda for Step 2:
-------------------------------
import json
import datetime
import random
import boto3

client = boto3.client('kinesis')

def lambda_handler(event, context):
TODO implement
data = json.dumps(event['body'])
client.put_record(StreamName="hellotesting", Data=data, PartitionKey="1")
print("Data Inserted")



Lambda for Firehose Transformation(Step 5):
-------------------------------------------------------------------------
import json
import boto3
import base64

output = []

def lambda_handler(event, context):
print(event)
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)

row_w_newline = payload + "\n"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))

output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)

print('Processed {} records.'.format(len(event['records'])))

return {'records': output}


Snowflake Code:
---------------------------
--Specify the role
use role ACCOUNTADMIN;

drop database if exists s3_to_snowflake;

--Database Creation
create database if not exists s3_to_snowflake;

--Specify the active/current database for the session.
use s3_to_snowflake;


--Storage Integration Creation
create or replace storage integration s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '{}'
STORAGE_ALLOWED_LOCATIONS = ('s3://{}')
COMMENT = 'Testing Snowflake getting refresh or not';

--Describe the Integration Object
DESC INTEGRATION s3_int;


--External Stage Creation

create stage mystage
url = 's3://{}'
storage_integration = s3_int;

list @mystage;

--File Format Creation
create or replace file format my_json_format
type = json;

--Table Creation
create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';

--Query the table
select parse_json(VALUE):Age as Age , trim(parse_json(VALUE):Name,'"') as Name from s3_to_snowflake.PUBLIC.Person;

Note:
----------
1)Please delete all used AWS resources if not in use else it will be creating billing!
2)As this is POC , so I gave full access for many roles creation , while moving to Production make sure to provide only that much access which is required!
3)parse_json in Snowflake Interprets an input string as a JSON document, producing a VARIANT value.


Check this playlist for more AWS Projects in Big Data domain:
   • Demystifying Data Engineering with Cl...  

Know more about AWS Kinesis :
---------------------------------------------------
https://aws.amazon.com/kinesis/

Комментарии

Информация по комментариям в разработке