Exercise 8.1 โ€” Local Kafka + Python Producer/Consumer

Stand up Kafka in Docker Compose. Produce synthetic AMI meter reads from one Python script. Consume them with another. All local โ€” zero cost. Gets you fluent with the Kafka primitives in 45 minutes.

Cost: $0. Runs entirely on your laptop. Prereq: Docker installed. Given your Kubernetes history, assumed.

Step 1 โ€” docker-compose.yml

Create a project dir and drop this in:

version: '3'
services:
  kafka:
    image: bitnami/kafka:3.7
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - ALLOW_PLAINTEXT_LISTENER=yes

Run: docker compose up -d. Give it 20 seconds.

Step 2 โ€” Create the topic

docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic ami.meter_reads --partitions 3 --replication-factor 1

Three partitions โ€” matches the mental model from Lesson 8.1.

Step 3 โ€” producer.py

import json, random, time, datetime as dt
from kafka import KafkaProducer   # pip install kafka-python-ng

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8"),
    acks="all",        # wait for full ISR ack; stronger durability
    linger_ms=50,       # small batch window
)

METERS = [f"MTR-{i:05d}" for i in range(1, 101)]
try:
    while True:
        mid = random.choice(METERS)
        record = {
            "meter_id": mid,
            "read_ts": dt.datetime.utcnow().isoformat(),
            "consumption_kwh": round(random.uniform(0, 5), 3),
        }
        # key = meter_id โ†’ same meter always lands on same partition โ†’ ordered reads per meter
        producer.send("ami.meter_reads", key=mid, value=record)
        print(f"โ†’ {record}")
        time.sleep(0.2)
except KeyboardInterrupt:
    producer.flush(); producer.close()

Step 4 โ€” consumer.py

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "ami.meter_reads",
    bootstrap_servers=["localhost:9092"],
    group_id="consumer-a",       # change to "consumer-b" in a second terminal to read independently
    auto_offset_reset="earliest",  # start from beginning if no offset yet
    enable_auto_commit=True,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

for msg in consumer:
    print(f"partition={msg.partition} offset={msg.offset} key={msg.key.decode()} value={msg.value}")

Step 5 โ€” Run

Terminal 1: pip install kafka-python-ng && python producer.py

Terminal 2: python consumer.py

You'll see consumer print the records coming in. Note the partition= values โ€” they'll be 0, 1, or 2. Meter MTR-00005 will always land on the same partition (because we keyed by meter_id).

Step 6 โ€” Experiment with consumer groups

This hands-on feel is the thing interviewers want you to have done. You've now done it.

Step 7 โ€” Inspect from the broker

# See your topic
docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic ami.meter_reads

# See consumer-group offsets
docker compose exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumer-a

Note "LAG" โ€” how far behind the consumer is. This is the #1 production metric you'll page on.

Step 8 โ€” Shut down

docker compose down -v

Reflection โ€” save to notepad