Exercise 3.1 โ€” Python โ†’ S3 โ†’ Snowflake End-to-End

A real pipeline run on your laptop. Generate meter reads locally, upload to S3, COPY INTO Snowflake. Proves end-to-end fluency of the three JD tools that chain together most often.

Prereq: Finished Exercise 2.1 (Snowflake trial + LEARN_WH) and Exercise 4.1 (AWS free-tier account).

Time: 45 min ยท Cost: <$0.01 S3 + <$0.10 Snowflake credits.

What you'll build

 generate_meter_reads.py
        โ”‚
        โ”‚ writes 100k rows, 3 columns
        โ–ผ
   reads.parquet
        โ”‚
        โ”‚ boto3.upload_file
        โ–ผ
 s3://ude-learner-<rand>/reads/2026-04-22/reads.parquet
        โ”‚
        โ”‚ Snowflake COPY INTO from external stage
        โ–ผ
   LEARN_DB.UTILITY.METER_READS_FROM_S3

Step 1 โ€” Project skeleton

mkdir ude-py-s3-snowflake && cd ude-py-s3-snowflake
uv venv && source .venv/bin/activate            # or python -m venv .venv && source .venv/bin/activate
uv pip install pandas pyarrow boto3 snowflake-connector-python

Step 2 โ€” Generate meter reads (generate.py)

import pandas as pd, numpy as np, random, datetime as dt

random.seed(42); np.random.seed(42)
N = 100_000
now = dt.datetime.utcnow()

df = pd.DataFrame({
    "meter_id": [f"MTR-{i:05d}" for i in np.random.randint(1, 1000, N)],
    "read_ts": [now - dt.timedelta(minutes=int(m)) for m in np.random.randint(0, 60*24*30, N)],
    "consumption_kwh": np.round(np.random.uniform(0, 5, N).astype("float32"), 3),
}).astype({"meter_id": "string"})

df.to_parquet("reads.parquet", index=False)
print(f"Wrote {len(df):,} rows, {df.memory_usage(deep=True).sum()/1e6:.2f} MB in-memory")

Run: python generate.py. Output should say "Wrote 100,000 rows, ~5 MB."

Step 3 โ€” Upload to S3 (upload.py)

First create a bucket (AWS Console โ†’ S3 โ†’ Create bucket) named something globally unique like ude-learner-<random-suffix>. Defaults are fine. Region: us-west-2 (match whatever region your Snowflake trial is in).

import boto3, os, datetime as dt

BUCKET = "ude-learner-XXXXXX"     # your bucket name
KEY = f"reads/{dt.date.today()}/reads.parquet"

s3 = boto3.client("s3", region_name="us-west-2")
s3.upload_file("reads.parquet", BUCKET, KEY)
print(f"Uploaded s3://{BUCKET}/{KEY}")

You'll need AWS credentials locally โ€” aws configure or AWS_ACCESS_KEY_ID env vars. If Exercise 4.1 set up a profile, boto3.Session(profile_name="ude") works.

Step 4 โ€” Create the Snowflake target & stage

In Snowsight:

USE WAREHOUSE LEARN_WH;
USE DATABASE LEARN_DB;
USE SCHEMA UTILITY;

CREATE OR REPLACE TABLE meter_reads_from_s3 (
  meter_id        STRING,
  read_ts         TIMESTAMP_NTZ,
  consumption_kwh NUMBER(10,3)
);

-- External stage that points to YOUR bucket.
-- For simplicity we use an access-key-based stage; in a real deployment use Storage Integration.
CREATE OR REPLACE STAGE ude_s3_stage
  URL='s3://ude-learner-XXXXXX/reads/'
  CREDENTIALS = (AWS_KEY_ID='AKIA...' AWS_SECRET_KEY='...')
  FILE_FORMAT = (TYPE = PARQUET);

LIST @ude_s3_stage;   -- should show your parquet file

Keys-in-SQL warning: embedding AWS keys in a stage is fine for a learning exercise โ€” not for production. Production uses a Storage Integration (IAM role trust), which we cover in deep mode.

Step 5 โ€” COPY INTO

COPY INTO meter_reads_from_s3
  FROM @ude_s3_stage
  FILE_FORMAT = (TYPE = PARQUET)
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
  ON_ERROR = CONTINUE;

SELECT COUNT(*) FROM meter_reads_from_s3;      -- expect 100,000
SELECT * FROM meter_reads_from_s3 LIMIT 10;  -- sanity check

Step 6 โ€” Now do it with write_pandas (no S3 round-trip)

For pipelines where the data lives in Python anyway, you can skip the S3 hop:

from snowflake.connector import connect
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd

df = pd.read_parquet("reads.parquet")
df.columns = [c.upper() for c in df.columns]   # Snowflake identifiers default to UPPER

conn = connect(
    user=os.environ["SF_USER"],
    password=os.environ["SF_PASSWORD"],
    account=os.environ["SF_ACCOUNT"],   # e.g. xy12345.us-west-2.aws
    warehouse="LEARN_WH", database="LEARN_DB", schema="UTILITY",
)
success, chunks, rows, _ = write_pandas(conn, df, "METER_READS_FROM_PANDAS",
                                         auto_create_table=True, chunk_size=50_000)
print(f"write_pandas loaded {rows:,} rows")

Step 7 โ€” Cleanup (important)

DROP TABLE IF EXISTS meter_reads_from_s3;
DROP TABLE IF EXISTS meter_reads_from_pandas;
DROP STAGE IF EXISTS ude_s3_stage;
ALTER WAREHOUSE LEARN_WH SUSPEND;

Delete the S3 objects too if you don't want the 5MB to count toward free tier (it won't meaningfully).

Reflection โ€” save to notepad

Use the ๐Ÿ““ notepad to capture for interview-time recall: