Exercise 9.1 β Meter-to-Cash Mini-Pipeline
The capstone. A full slice through the utility data lifecycle, using the tools from every previous module. End state: raw AMI reads β validated silver β billed gold table, with governance, tests, and a query-profile audit. This is your interview demo project.
Scope: Does NOT require a real streaming infra β we'll simulate with a batch that represents "one hour of AMI data." The point is the full pipeline shape and every JD capability demonstrated.
Time: 2β3 hours. Cost: <$0.50 Snowflake credits.
The data model
BRONZE (raw, immutable) β bronze_ami_reads : exactly as received. Keyed (meter_id, read_ts). Append-only. β bronze_cis_accounts: nightly full snapshot. β bronze_gis_service_points: weekly full snapshot. SILVER (cleaned, validated) β silver_meter_reads : after VEE. quality_flag, estimated boolean, revision_num. β silver_accounts : deduped, current state, with SCD-2 for history. GOLD (business-ready) β gold_monthly_bill : account Γ month, total kwh, total $, rate plan applied.
Step 1 β Seed the bronze tables
USE ROLE SYSADMIN; USE WAREHOUSE LEARN_WH;
CREATE SCHEMA IF NOT EXISTS LEARN_DB.M2C;
USE SCHEMA LEARN_DB.M2C;
CREATE OR REPLACE TABLE bronze_ami_reads (
meter_id STRING, read_ts TIMESTAMP_NTZ,
consumption_kwh NUMBER(10,3), quality_raw STRING, source_file STRING,
ingested_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE bronze_cis_accounts (
account_id STRING, customer_id STRING, meter_id STRING,
rate_plan STRING, status STRING, snapshot_date DATE,
ingested_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Populate bronze_ami_reads with a realistic simulation:
-- 100 meters Γ ~ 96 reads/day (15-min) Γ 30 days = ~288k rows
INSERT INTO bronze_ami_reads (meter_id, read_ts, consumption_kwh, quality_raw, source_file)
SELECT
'MTR-' || LPAD(UNIFORM(1,100,RANDOM()),5,'0'),
DATEADD(minute, -UNIFORM(0, 60*24*30, RANDOM()), CURRENT_TIMESTAMP()),
ROUND(UNIFORM(0,5000,RANDOM())/1000.0, 3),
CASE WHEN UNIFORM(1,100,RANDOM())<5 THEN 'E'
WHEN UNIFORM(1,100,RANDOM())<2 THEN 'M' -- missing/bad
ELSE 'G' END,
'file_' || UNIFORM(1,30,RANDOM()) || '.parquet'
FROM TABLE(GENERATOR(rowcount => 288000));
-- Accounts: 100 accounts, paired to 100 meters, flat rate plan for simplicity
INSERT INTO bronze_cis_accounts
SELECT
'ACC-' || LPAD(seq4(), 5, '0'),
'CUST-' || LPAD(seq4(), 5, '0'),
'MTR-' || LPAD(seq4(), 5, '0'),
CASE WHEN UNIFORM(1,10,RANDOM())<8 THEN 'RES-STD' ELSE 'RES-TOU' END,
'ACTIVE',
CURRENT_DATE()
FROM TABLE(GENERATOR(rowcount => 100));
Step 2 β Silver layer (VEE)
CREATE OR REPLACE TABLE silver_meter_reads AS
SELECT
meter_id,
read_ts,
-- Validate: reject impossibly-high consumption; flag for review
CASE WHEN consumption_kwh > 50 THEN NULL ELSE consumption_kwh END AS consumption_kwh,
-- Estimated flag: if raw was missing, we'll estimate
CASE WHEN quality_raw = 'M' THEN TRUE ELSE FALSE END AS is_estimated,
quality_raw AS quality_flag,
1 AS revision_num,
CURRENT_TIMESTAMP() AS silver_built_at
FROM bronze_ami_reads
-- Dedupe: keep highest ingested_at per (meter, ts)
QUALIFY ROW_NUMBER() OVER (PARTITION BY meter_id, read_ts ORDER BY ingested_at DESC) = 1;
SELECT COUNT(*), COUNT_IF(is_estimated), COUNT_IF(consumption_kwh IS NULL) FROM silver_meter_reads;
Step 3 β Gold: monthly bill
CREATE OR REPLACE TABLE gold_monthly_bill AS
SELECT
a.account_id,
a.customer_id,
DATE_TRUNC('month', r.read_ts)::DATE AS billing_month,
SUM(r.consumption_kwh) AS total_kwh,
SUM(r.consumption_kwh * CASE a.rate_plan WHEN 'RES-TOU' THEN 0.14 ELSE 0.12 END) AS total_dollars,
COUNT(*) AS reads_counted,
COUNT_IF(r.is_estimated) AS reads_estimated
FROM silver_meter_reads r
JOIN bronze_cis_accounts a ON a.meter_id = r.meter_id
WHERE r.consumption_kwh IS NOT NULL
GROUP BY a.account_id, a.customer_id, DATE_TRUNC('month', r.read_ts)::DATE;
Step 4 β Data-quality tests
Real DE CI runs these on every merge. Run them manually now:
-- Test: no nulls in billing month
SELECT ASSERT(COUNT(*) = 0, 'nulls in gold_monthly_bill.billing_month')
FROM gold_monthly_bill WHERE billing_month IS NULL;
-- Test: uniqueness on (account_id, billing_month)
SELECT ASSERT(COUNT(*) = COUNT(DISTINCT account_id || billing_month),
'duplicate account-month pairs in gold_monthly_bill')
FROM gold_monthly_bill;
-- Test: no negative dollars
SELECT ASSERT(COUNT(*) = 0, 'negative dollars')
FROM gold_monthly_bill WHERE total_dollars < 0;
Snowflake's actual ASSERT is a dev-preview; in real dbt/Great Expectations code this is declarative. The intent is the same.
Step 5 β Apply governance
-- Mask customer_id for non-compliance roles
CREATE OR REPLACE MASKING POLICY mask_customer_id AS (val STRING) RETURNS STRING ->
CASE WHEN CURRENT_ROLE() IN ('ACCOUNTADMIN','FR_COMPLIANCE') THEN val
ELSE SHA2(val, 256) END;
ALTER TABLE gold_monthly_bill MODIFY COLUMN customer_id SET MASKING POLICY mask_customer_id;
-- Verify: query as a non-compliance role (e.g. create a test role quickly)
SELECT * FROM gold_monthly_bill LIMIT 5; -- as ACCOUNTADMIN: real IDs
Step 6 β Optimization audit
Open any of the queries above in query history. Check Query Profile for:
- Partitions scanned vs total β should be reasonable on each scan
- Join explosion on silverβaccounts β gold β any joins with unexpected row counts?
- Spills β at 288k rows, none expected on XSMALL; if you see any, reflect on why
Step 7 β Write your one-paragraph demo
Capture in the π notepad. This is a literal answer to the question you will get:
"Walk me through an end-to-end pipeline you've built."
Yours:
Step 8 β Cleanup
DROP SCHEMA IF EXISTS LEARN_DB.M2C CASCADE;
DROP MASKING POLICY IF EXISTS mask_customer_id;
ALTER WAREHOUSE LEARN_WH SUSPEND;
What you just demonstrated
- β r1 Snowflake: advanced SQL, bronze/silver/gold layering, masking policy
- β r2 AWS-adjacent: concept of S3 landing (seeded in-warehouse here for simplicity)
- β r3 Python/ELT: ELT shape, push-down, ready for Matillion translation
- β r4 Git/Jenkins: the schemachange + test pipeline that would wrap this
- β r5 Governance: masking policy applied inline
- β r6 Streaming-ready: bronze_ami_reads is append-only; could be fed by Kinesis/Kafka
- β r7 Optimization: pruning + partition-scan audit
- β r9 Utility domain: explicit meter-to-cash, VEE, rate plan
That's every JD line in one pipeline. Interview-demo complete.