Skip to main content

Snowflake's Data Engineering Workshop

Hop on the Journey #

I’m thrilled to share that I’ve completed the Snowflake Badge 5: Data Engineering Workshop, marking the enriching journey on Snowflake’s ecosystem. This workshop is a fast-paced, interactive course that gives data engineering fundamentals on the Snowflake platform, covering topics like data loading with Snowpipe, data transformation with SQL, task scheduling and more. The goal is to provide practical experience in data management for data-driven organizations and earn a badge upon completion.

The best thing about it is that it is a hands-on course so you don’t have to worried if you are not familiar to Snowflake and you know what it is also free on https://learn.snowflake.com/en/pages/hands-on-essentials-track/.

The journey began from data ingestion to landing on a simple dashboard. In Snowflake you can directly loading data from cloud storage services like Amazon S3, Google Cloud Storage, and Microsoft Azure by leveraging bulk or continuous loading.

However, having the raw data loaded is not well-prepared for users to access. Therefore, before transferring data, engineers need to understand the requirement and then analyze the data in order to find solutions to build the pipeline.

This workshop provides a scenario where the time zone is missing, but which is a critical geographical information for user base. So after the data is fed into a raw table, data need to be converted or merge with other tables provided with geographic information. These steps are so called transformation in data engineering.

After the user requirements are fulfilled, the next step is to optimize the pipeline in order to reduce costs spent on computing units. Instead of checking the need for feeding data time by time, it is better to load data only if a change happened in the source. In other word, the goal is to make the pipeline more responsive. Hence publish and subscribe services and message queue systems are introduced to replace the original time-triggered data loading and transformation tasks.

Here are some key notes that I’ve learned from this course. If you are interested, take a look and have fun building better data pipelines!

Merge Insert VS Insert #

The core difference between the query INSERT and INSERT MERGE lies in how they handle existing data in the target table.

QueryINSERT INSERT MERGE 
Operation TypeUnconditional AppendConditional Update
Data HandlingAlways adds new rows from the source query to the target table, even if the exact same event was already present.Checks if a record from the source query already exists in the target table based on specific join conditions.
Duplicate PreventionDoes not prevent duplicates by itself. Running the task multiple times without clearing the target table would result in duplicate rows for the same game events.Prevents duplicates for records that satisfy the ON condition (GAMER_NAMEGAME_EVENT_UTCGAME_EVENT_NAME).
Target Table StateTarget table likely grows with duplicate data upon re-execution.Target table remains clean; only truly new unique records are added.

Summary of Differences

  1. The INSERT Approach (Original)

A simple INSERT INTO ... SELECT DML always adds new rows from the source query to the target table, even if the exact same data was already present. This could be problematic because the source table likely contains historical data that doesn’t get cleared immediately. Each time the task runs, it re-processes all rows from source data and inserts them again into target table, creating many duplicate entries.

  1. The INSERT MERGE Approach (Second Query)

The second task uses INSERT MERGE to synchronize two tables by checking if a record from the source already existed in the target table based on join conditions. INSERT MERGE is idempotent which can be run multiple times without causing duplicates. It uses a WHEN NOT MATCHED THEN INSERT clause to effectively synchronizes the target table, preventing duplicates for records that satisfy the ON condition.

Conclusion

The INSERT MERGE manipulation is better for maintaining a clean, non-duplicated target table that acts as a historical record of events. Using only INSERT is generally used only when the source data is entirely new.

Use Serverless resource to run tasks & use task dependency to increase stability #

TASK DEPENDENCIES #

At a first look, our pipeline design grants us full control over the data loading and transformation processes, but our loading and transformation tasks currently run on schedule and independently. This independent execution introduces uncertainty regarding whether the preceding task has truly produced the necessary data. Implementing task dependencies can eliminate this issue. What need to do is replacing the time-based SCHEDULE by configuring the transformation tasks to only run upon the successful completion of their respective load jobs.

SERVERLESS COMPUTE #

The WAREHOUSE that are using to run the tasks has to spin up each time the tasks are triggered. Then, if it’s designed to auto-suspend in 5 minutes, it won’t EVER suspend, because the task will run again before it has time to shut down. This can cost a lot of credits of computation units which means spending more unnecessary costs.

Snowflake has a different option called “SERVERLESS”. It means that tasks don’t have to spin up a warehouse, instead it can use a thread or two of another compute resource that is already running. Serverless compute is much more efficient for those small tasks that don’t do very much, but do what they do quite often.

To use the SERVERLESS task mode, we’ll need to grant that privilege to SYSADMIN.

Grant Serverless Task Management to SYSADMIN

use role accountadmin;
grant EXECUTE MANAGED TASK on account to SYSADMIN;

--switch back to sysadmin
use role sysadmin;

Combing these two improvements, all need to is

  • Replace the WAREHOUSE Property in Your Tasks
  • Remove the SCHEDULE property and have LOAD_LOGS_ENHANCED run each time GET_NEW_FILES completes
create or replace task AGS_GAME_AUDIENCE.RAW.LOAD_LOGS_ENHANCED
	USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE='XSMALL'
	after AGS_GAME_AUDIENCE.RAW.GET_NEW_FILES
	as MERGE INTO AGS_GAME_AUDIENCE.ENHANCED.LOGS_ENHANCED e
    USING (
        SELECT logs.ip_address
        , logs.user_login as GAMER_NAME
        , logs.user_event as GAME_EVENT_NAME
        , logs.datetime_iso8601 as GAME_EVENT_UTC
        , city
        , region
        , country
        , timezone as GAMER_LTZ_NAME
        , CONVERT_TIMEZONE('UTC', timezone, logs.datetime_iso8601) as game_event_ltz
        , DAYNAME(game_event_ltz) as DOW_NAME
        , tod_name
        from AGS_GAME_AUDIENCE.RAW.PL_LOGS logs
        JOIN IPINFO_GEOLOC.demo.location loc
        ON IPINFO_GEOLOC.public.TO_JOIN_KEY(logs.ip_address) = loc.join_key
        AND IPINFO_GEOLOC.public.TO_INT(logs.ip_address)
        BETWEEN start_ip_int AND end_ip_int
        JOIN ags_game_audience.raw.time_of_day_lu tod
        ON HOUR(game_event_ltz) = tod.hour
) r
ON r.gamer_name = e.GAMER_NAME
and r.game_event_utc = e.game_event_utc
and r.game_event_name = e.game_event_name
WHEN NOT MATCHED THEN
INSERT (IP_ADDRESS, GAMER_NAME, GAME_EVENT_NAME, GAME_EVENT_UTC, CITY, REGION, COUNTRY, GAMER_LTZ_NAME, GAME_EVENT_LTZ, DOW_NAME, TOD_NAME)
VALUES (IP_ADDRESS, GAMER_NAME, GAME_EVENT_NAME, GAME_EVENT_UTC, CITY, REGION, COUNTRY, GAMER_LTZ_NAME, GAME_EVENT_LTZ, DOW_NAME, TOD_NAME);

From time-driven pipeline to event-driven pipeline #

In modern data pipelines building, it is more than creating a Task-Driven or Time-Driven Pipeline like the picture showing below. These tasks could be built into fewer pieces for easier maintenance and leveraging cloud services offered by major cloud providers to create a responsive event-driven pipeline.

Origin Pipeline Design

That being said step 2 and step 3 are both essentially a data loading task, so they can be integrated as simple loading task that directly extract basic information from the source data. Furthermore, running this task on a planned schedule is not quite smart and efficient. Snowflake offers a service called Snowpipe that can leverage cloud engineering infrastructure objects to make continuous loading possible. This data pipeline will then be built like the DAG below which incorporate Publish and Subscribe services and Message Queueing system.

Event-based Pipeline Design

Publish and Subscribe services are based on a Hub and Spoke pattern. The HUB is a central controller that manages the receiving and sending of messages. The SPOKES are the solutions and services that either send or receive notifications from the HUB.

If a SPOKE is a PUBLISHER, that means they send messages to the HUB. If a SPOKE is a SUBSCRIBER, that means they receive messages from the HUB. A SPOKE can be both a publisher and a subscriber.

With messages flowing into and out of a Pub/Sub service from so many places, it could get confusing, fast. So Pub/Sub services have EVENT NOTIFICATIONS and TOPICS. A topic is a collection of event types. A SPOKE publishes NOTIFICATIONS to a TOPIC and subscribes to a TOPIC, which is a stream of NOTIFICATIONS.

Publish and Subscribe services and Message Queueing

By making the loading data task event-driven, all need to do is create a PIPE that subscribe to the topic hosting on a cloud hub.

CREATE OR REPLACE PIPE PIPE_GET_NEW_FILES
auto_ingest=true
aws_sns_topic='arn:aws:sns:us-west-2:321463406630:dngw_topic'
AS
COPY INTO ED_PIPELINE_LOGS
FROM (
	...EXTRACT DATA
)
file_format = (format_name = ff_json_logs);

Change Data Capture #

Take a look back at the DAG that have been improved above. Steps 4 is still a time-driven task, but the good news is that the pipeline now has two of the most important pipeline devices in Snowflake: tasks and pipes. In order to improve the data flow further, Snowflake provides a service called a STREAM that can make it more responsive and more efficient.

STREAM will not replace the origin task but making it event-driven by using “Change Data Capture”. In the simplest use, streams appear very simple. They are just a running list of things that have changed in your table or view. But in production use, to use streams effectively, you will need to understand offsets, data retention periods, staleness, and stream types.

In this workshop, it only covers the most basic use of a stream, one that will only handle record inserts and will not be guaranteed to track and process every change.

Let STREAM do the work

By running the following command to create a STREAM under raw schema that detects data changes in AGS_GAME_AUDIENCE.RAW.ED_PIPELINE_LOGS.

create or replace stream ags_game_audience.raw.ed_cdc_stream
on table AGS_GAME_AUDIENCE.RAW.ED_PIPELINE_LOGS;

The next step is to alter the origin task that runs every 5 minutes to update the enhanced table for analysis. Since the STREAM service is up, it won’t need to check the whole table to see if any data changes happened in every run. Instead, by creating a WHEN condition it can let the STREAM service handle it.

create or replace task AGS_GAME_AUDIENCE.RAW.CDC_LOAD_LOGS_ENHANCED
	USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE='XSMALL'
	SCHEDULE = '5 minutes'
WHEN
    system$stream_has_data('ed_edc_stream')
	as
MERGE INTO AGS_GAME_AUDIENCE.ENHANCED.LOGS_ENHANCED e
USING (
        ... TRANSFORM DATA
      ) r
ON r.GAMER_NAME = e.GAMER_NAME
AND r.GAME_EVENT_UTC = e.GAME_EVENT_UTC
AND r.GAME_EVENT_NAME = e.GAME_EVENT_NAME
WHEN NOT MATCHED THEN
INSERT (IP_ADDRESS, GAMER_NAME, GAME_EVENT_NAME
        , GAME_EVENT_UTC, CITY, REGION
        , COUNTRY, GAMER_LTZ_NAME, GAME_EVENT_LTZ
        , DOW_NAME, TOD_NAME)
VALUES
        (IP_ADDRESS, GAMER_NAME, GAME_EVENT_NAME
        , GAME_EVENT_UTC, CITY, REGION
        , COUNTRY, GAMER_LTZ_NAME, GAME_EVENT_LTZ
        , DOW_NAME, TOD_NAME);

Data is Flowing #

With the PIPE and STREAM in place, it all need is a task at the end that pulls new data from the STREAM, instead of from the RAW data table. The data now makes two stops in the flow. The first table the data lands on ED_PIPELINE_LOGS done by the Snowpipe named GET_NEW_FILES. The second stop for the data is the AGS_GAME_AUDIENCE.ENHANCED.LOGS_ENHANCED table which is a work of STREAM and a scheduled task. 

What Happens if the Merge Fails? #

This is one of the reasons Streams can be very complex. Since the information disappears as you process it, when using a stream in production systems, you will likely want to have more protection in place to make sure you don’t lose information. For example, our old task that looks at EVERY row EVERY time could be run just once every 24 hours so that it could pick up anything missed by the task that processes the stream.

Final Thoughts #

Overall, I enjoyed this course that gives a good overview of what is available in Snowflake. Moreover, I’ve learned more concepts for modern data engineering, which I’m eager to apply on a practical data engineering project. I encourage anyone who is interested in what Snowflake has to offer for data engineering to enroll in this course.