Exercise 9.1 β€” Meter-to-Cash Mini-Pipeline

The capstone. A full slice through the utility data lifecycle, using the tools from every previous module. End state: raw AMI reads β†’ validated silver β†’ billed gold table, with governance, tests, and a query-profile audit. This is your interview demo project.

Scope: Does NOT require a real streaming infra β€” we'll simulate with a batch that represents "one hour of AMI data." The point is the full pipeline shape and every JD capability demonstrated.

Time: 2–3 hours. Cost: <$0.50 Snowflake credits.

The data model

 BRONZE (raw, immutable)
 ─ bronze_ami_reads   : exactly as received. Keyed (meter_id, read_ts). Append-only.
 ─ bronze_cis_accounts: nightly full snapshot.
 ─ bronze_gis_service_points: weekly full snapshot.

 SILVER (cleaned, validated)
 ─ silver_meter_reads : after VEE. quality_flag, estimated boolean, revision_num.
 ─ silver_accounts    : deduped, current state, with SCD-2 for history.

 GOLD (business-ready)
 ─ gold_monthly_bill  : account Γ— month, total kwh, total $, rate plan applied.

Step 1 β€” Seed the bronze tables

USE ROLE SYSADMIN; USE WAREHOUSE LEARN_WH;
CREATE SCHEMA IF NOT EXISTS LEARN_DB.M2C;
USE SCHEMA LEARN_DB.M2C;

CREATE OR REPLACE TABLE bronze_ami_reads (
  meter_id STRING, read_ts TIMESTAMP_NTZ,
  consumption_kwh NUMBER(10,3), quality_raw STRING, source_file STRING,
  ingested_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

CREATE OR REPLACE TABLE bronze_cis_accounts (
  account_id STRING, customer_id STRING, meter_id STRING,
  rate_plan STRING, status STRING, snapshot_date DATE,
  ingested_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Populate bronze_ami_reads with a realistic simulation:
-- 100 meters Γ— ~ 96 reads/day (15-min) Γ— 30 days = ~288k rows
INSERT INTO bronze_ami_reads (meter_id, read_ts, consumption_kwh, quality_raw, source_file)
SELECT
  'MTR-' || LPAD(UNIFORM(1,100,RANDOM()),5,'0'),
  DATEADD(minute, -UNIFORM(0, 60*24*30, RANDOM()), CURRENT_TIMESTAMP()),
  ROUND(UNIFORM(0,5000,RANDOM())/1000.0, 3),
  CASE WHEN UNIFORM(1,100,RANDOM())<5 THEN 'E'
       WHEN UNIFORM(1,100,RANDOM())<2 THEN 'M'  -- missing/bad
       ELSE 'G' END,
  'file_' || UNIFORM(1,30,RANDOM()) || '.parquet'
FROM TABLE(GENERATOR(rowcount => 288000));

-- Accounts: 100 accounts, paired to 100 meters, flat rate plan for simplicity
INSERT INTO bronze_cis_accounts
SELECT
  'ACC-' || LPAD(seq4(), 5, '0'),
  'CUST-' || LPAD(seq4(), 5, '0'),
  'MTR-' || LPAD(seq4(), 5, '0'),
  CASE WHEN UNIFORM(1,10,RANDOM())<8 THEN 'RES-STD' ELSE 'RES-TOU' END,
  'ACTIVE',
  CURRENT_DATE()
FROM TABLE(GENERATOR(rowcount => 100));

Step 2 β€” Silver layer (VEE)

CREATE OR REPLACE TABLE silver_meter_reads AS
SELECT
  meter_id,
  read_ts,
  -- Validate: reject impossibly-high consumption; flag for review
  CASE WHEN consumption_kwh > 50 THEN NULL ELSE consumption_kwh END AS consumption_kwh,
  -- Estimated flag: if raw was missing, we'll estimate
  CASE WHEN quality_raw = 'M' THEN TRUE ELSE FALSE END AS is_estimated,
  quality_raw AS quality_flag,
  1 AS revision_num,
  CURRENT_TIMESTAMP() AS silver_built_at
FROM bronze_ami_reads
-- Dedupe: keep highest ingested_at per (meter, ts)
QUALIFY ROW_NUMBER() OVER (PARTITION BY meter_id, read_ts ORDER BY ingested_at DESC) = 1;

SELECT COUNT(*), COUNT_IF(is_estimated), COUNT_IF(consumption_kwh IS NULL) FROM silver_meter_reads;

Step 3 β€” Gold: monthly bill

CREATE OR REPLACE TABLE gold_monthly_bill AS
SELECT
  a.account_id,
  a.customer_id,
  DATE_TRUNC('month', r.read_ts)::DATE AS billing_month,
  SUM(r.consumption_kwh) AS total_kwh,
  SUM(r.consumption_kwh * CASE a.rate_plan WHEN 'RES-TOU' THEN 0.14 ELSE 0.12 END) AS total_dollars,
  COUNT(*) AS reads_counted,
  COUNT_IF(r.is_estimated) AS reads_estimated
FROM silver_meter_reads r
JOIN bronze_cis_accounts a ON a.meter_id = r.meter_id
WHERE r.consumption_kwh IS NOT NULL
GROUP BY a.account_id, a.customer_id, DATE_TRUNC('month', r.read_ts)::DATE;

Step 4 β€” Data-quality tests

Real DE CI runs these on every merge. Run them manually now:

-- Test: no nulls in billing month
SELECT ASSERT(COUNT(*) = 0, 'nulls in gold_monthly_bill.billing_month')
FROM gold_monthly_bill WHERE billing_month IS NULL;

-- Test: uniqueness on (account_id, billing_month)
SELECT ASSERT(COUNT(*) = COUNT(DISTINCT account_id || billing_month),
  'duplicate account-month pairs in gold_monthly_bill')
FROM gold_monthly_bill;

-- Test: no negative dollars
SELECT ASSERT(COUNT(*) = 0, 'negative dollars')
FROM gold_monthly_bill WHERE total_dollars < 0;

Snowflake's actual ASSERT is a dev-preview; in real dbt/Great Expectations code this is declarative. The intent is the same.

Step 5 β€” Apply governance

-- Mask customer_id for non-compliance roles
CREATE OR REPLACE MASKING POLICY mask_customer_id AS (val STRING) RETURNS STRING ->
  CASE WHEN CURRENT_ROLE() IN ('ACCOUNTADMIN','FR_COMPLIANCE') THEN val
       ELSE SHA2(val, 256) END;

ALTER TABLE gold_monthly_bill MODIFY COLUMN customer_id SET MASKING POLICY mask_customer_id;

-- Verify: query as a non-compliance role (e.g. create a test role quickly)
SELECT * FROM gold_monthly_bill LIMIT 5;   -- as ACCOUNTADMIN: real IDs

Step 6 β€” Optimization audit

Open any of the queries above in query history. Check Query Profile for:

Step 7 β€” Write your one-paragraph demo

Capture in the πŸ““ notepad. This is a literal answer to the question you will get:

"Walk me through an end-to-end pipeline you've built."

Yours:

Step 8 β€” Cleanup

DROP SCHEMA IF EXISTS LEARN_DB.M2C CASCADE;
DROP MASKING POLICY IF EXISTS mask_customer_id;
ALTER WAREHOUSE LEARN_WH SUSPEND;

What you just demonstrated

That's every JD line in one pipeline. Interview-demo complete.