In this video , I am going to show you how to implement Slowly Changing Dimension(SCD) Type 2 Using Insert and Update Commands in Snowflake.
⌚Timestamps:
00:00 Intro
00:16 SCD Type2
06:16 Demo on SCD Type2 - Day 1 file processing
17:13 Demo on SCD Type2 - Day 2 file processing
20:38 Demo on SCD Type2 - Day 3 file processing
23:05 Outro
use role accountadmin;
use database demo_db;
use schema public;
create or replace TRANSIENT TABLE STG_EMP (
HK_EMPID VARCHAR(16777216),
CHANGE_HK VARCHAR(16777216),
EMPID NUMBER(38,0),
NAME VARCHAR(30),
SALARY NUMBER(20,2),
DESIGNATION VARCHAR(16777216),
OFFICE VARCHAR(30),
INSERT_TS TIMESTAMP_NTZ(9),
FILE_NAME VARCHAR(200),
FILE_ROW_NUMBER NUMBER(38,0)
);
create or replace TABLE EMP (
hk_empid varchar,
Change_HK varchar,
empID NUMBER(38,0),
NAME VARCHAR(30),
SALARY NUMBER(20,2),
DESIGNATION VARCHAR(16777216),
OFFICE VARCHAR(30),
active_indicator varchar(1),
effective_start_date date,
effective_end_date date,
INSERT_TS TIMESTAMP_NTZ(9),
UPDATE_TS TIMESTAMP_NTZ(9),
FILE_NAME VARCHAR(200),
FILE_ROW_NUMBER NUMBER(38,0),
dss_version number
);
use role accountadmin;
use database demo_db;
use schema public;
remove @mycsvstage;
--put file://c:\data\EMP_20210929.csv @mycsvstage;
--put file://c:\data\EMP_20211001.csv @mycsvstage;
put file://c:\data\EMP_20211002.csv @mycsvstage;
truncate table STG_EMP;
COPY INTO STG_EMP(
empid
,name
,salary
,Designation
,office
,file_name
,file_row_number
,insert_ts
,hk_empid
,change_hk
)
FROM (
SELECT t.$1,t.$2,t.$3,t.$4,t.$5
,metadata$filename
,metadata$file_row_number
,CAST(current_timestamp as timestamp)
,CAST(MD5(
NVL(CAST($1 as varchar),'null') ) as varchar
) as hk_empid
,CAST(MD5(
NVL(CAST($2 as varchar),'null') || '||'||
NVL(CAST($3 as varchar),'null') || '||'||
NVL(CAST($4 as varchar),'null') || '||'||
NVL(CAST($5 as varchar),'null')
) as varchar
) as change_hk
from @mycsvstage/ t )
pattern = '.*EMP.*[.]csv.gz'
on_error = 'skip_file'
;
insert into EMP (
hk_empid
,change_hk
,empid
,name
,salary
,Designation
,office
,active_indicator
,effective_start_date
,effective_end_date
,file_name
,file_row_number
,insert_ts
,update_ts
,dss_version
)
SELECT
s.hk_empid
,s.change_hk
,s.empid
,s.name
,s.salary
,s.Designation
,s.office
,'Y'
,current_date()-1
,to_date('9999-12-31','YYYY-MM-DD')
,s.file_name
,s.file_row_number
,s.insert_ts
,update_ts
,NVL(current_rows.dss_version, 0) + 1 as dss_version
from (
SELECT
s.hk_empid
,s.change_hk
,s.empid
,s.name
,s.salary
,s.Designation
,s.office
,s.file_name
,s.file_row_number
,s.insert_ts
,CAST(current_timestamp as timestamp) update_ts
,row_number() over (partition by s.hk_empid order by s.file_row_number desc ) rn
FROM stg_emp s qualify rn=1
) s
LEFT OUTER JOIN (
select a.hk_empid
,max(a.effective_start_date) as effective_start_date
,max(dss_version) dss_version
from EMP a group by hk_empid
) as current_rows
on s.hk_empid = current_rows.hk_empid
where not exists(
select 1
from EMP e
where s.hk_empid = e.hk_empid
and s.change_hk = e.change_hk
and current_rows.effective_start_date = e.effective_start_date
);
update emp e
set
e.effective_end_date = current_date(),
e.active_indicator = 'N'
from (
SELECT
s.hk_empid
,s.change_hk
,s.empid
,s.name
,s.salary
,s.Designation
,s.office
,s.file_name
,s.file_row_number
,s.insert_ts
,s.update_ts
,dss_version
,row_number() over (partition by s.hk_empid order by s.dss_version desc ) rn
FROM emp s where active_indicator ='Y' qualify rn !=1
) S
where e.hk_empid = s.hk_empid
and e.change_hk = s.change_hk
and e.dss_version = s.dss_version
and e.effective_end_date = to_date('9999-12-31','YYYY-MM-DD')
;
#snowflake#datacloud#database#datawarehouse#vcklytech
Информация по комментариям в разработке