Exercise 8.1 โ Local Kafka + Python Producer/Consumer
Stand up Kafka in Docker Compose. Produce synthetic AMI meter reads from one Python script. Consume them with another. All local โ zero cost. Gets you fluent with the Kafka primitives in 45 minutes.
Cost: $0. Runs entirely on your laptop. Prereq: Docker installed. Given your Kubernetes history, assumed.
Step 1 โ docker-compose.yml
Create a project dir and drop this in:
version: '3'
services:
kafka:
image: bitnami/kafka:3.7
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- ALLOW_PLAINTEXT_LISTENER=yes
Run: docker compose up -d. Give it 20 seconds.
Step 2 โ Create the topic
docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic ami.meter_reads --partitions 3 --replication-factor 1
Three partitions โ matches the mental model from Lesson 8.1.
Step 3 โ producer.py
import json, random, time, datetime as dt
from kafka import KafkaProducer # pip install kafka-python-ng
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
acks="all", # wait for full ISR ack; stronger durability
linger_ms=50, # small batch window
)
METERS = [f"MTR-{i:05d}" for i in range(1, 101)]
try:
while True:
mid = random.choice(METERS)
record = {
"meter_id": mid,
"read_ts": dt.datetime.utcnow().isoformat(),
"consumption_kwh": round(random.uniform(0, 5), 3),
}
# key = meter_id โ same meter always lands on same partition โ ordered reads per meter
producer.send("ami.meter_reads", key=mid, value=record)
print(f"โ {record}")
time.sleep(0.2)
except KeyboardInterrupt:
producer.flush(); producer.close()
Step 4 โ consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"ami.meter_reads",
bootstrap_servers=["localhost:9092"],
group_id="consumer-a", # change to "consumer-b" in a second terminal to read independently
auto_offset_reset="earliest", # start from beginning if no offset yet
enable_auto_commit=True,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
for msg in consumer:
print(f"partition={msg.partition} offset={msg.offset} key={msg.key.decode()} value={msg.value}")
Step 5 โ Run
Terminal 1: pip install kafka-python-ng && python producer.py
Terminal 2: python consumer.py
You'll see consumer print the records coming in. Note the partition= values โ they'll be 0, 1, or 2. Meter MTR-00005 will always land on the same partition (because we keyed by meter_id).
Step 6 โ Experiment with consumer groups
- Open a 3rd terminal. Run consumer.py again with the same
group_id="consumer-a". Notice the broker assigns each of you different partitions. Two consumers, same group โ split the work. - Kill one of the consumers. Watch rebalance happen โ the surviving consumer picks up the abandoned partitions.
- Start a 4th consumer with
group_id="consumer-b". It reads everything from the beginning because it's a new group with its own offsets.
This hands-on feel is the thing interviewers want you to have done. You've now done it.
Step 7 โ Inspect from the broker
# See your topic
docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic ami.meter_reads
# See consumer-group offsets
docker compose exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumer-a
Note "LAG" โ how far behind the consumer is. This is the #1 production metric you'll page on.
Step 8 โ Shut down
docker compose down -v
Reflection โ save to notepad
- What happens if consumer-a falls behind by 1 hour and the retention is 30 min?
- Why did we key by
meter_id? What if we'd keyed byread_ts? - How many partitions would you pick for a real utility with 1M meters reading every 15 min?