Replay-Resistant Event Pipelines: Building Idempotent Guards Into Kafka Consumers



Hook: When a Retry Storm Double-Charged Your Customers
The billing service went sideways after a network hiccup. Your Kafka consumer lag spiked, so the ops team toggled auto.offset.reset to earliest to drain the backlog. The consumer dutifully rewound, replayed the last 20 minutes of events, and your invoice projection emitted duplicate ChargeCreated records. Stripe idempotency keys caught some, but dozens of corporate customers wound up double-charged because the downstream ledger key used order_id only. Fraud detection pinged you, product escalated, and you spent the night writing a script to refund everyone manually.
Replays are not accidents; they are an inevitability. Brokers retry, operators misconfigure offsets, and sometimes adversaries craft duplicate events to probe your system. Developers often focus on schema evolution and forget to bake idempotency into consumers. This post shows how to implement durable replay defenses using Go and Kotlin examples, how to track state safely without destroying throughput, and how to verify that your guards hold under chaos testing. By the end, you will have patterns you can drop into your consumer library, plus tests that prove your ledger will not explode when the next replay storm hits.
The Problem Deep Dive
Event-driven systems process messages at least once by design. Kafka does not guarantee exactly-once unless you opt into transactional writes and idempotent producers, and even then, consumer bugs leak duplicates. Common developer mistakes include:
- Using natural keys that are not globally unique. Charging by
order_idworks until a customer reorders the same product. - Treating offsets as a durability signal. Offsets only track broker position. Manual rewinds reprocess events without consumer awareness.
- Lack of replay detection. Consumers often write to multiple stores (DB plus cache). Deduplicating in the database but not the cache causes eventual inconsistency.
Here is a representative Kotlin consumer:
fun onChargeCreated(event: ChargeCreated) {
val ledgerId = event.orderId
if (ledgerRepository.existsById(ledgerId)) return
ledgerRepository.save(ledgerId, event.amount)
cache.put(ledgerId, event.amount)
}
This works until the topic replays. The repo guard stops duplicates, but the cache is overwritten, triggering TTL refresh and inconsistent reads. Worse, if the ledger write is part of a batch and a crash happens mid-batch, the consumer might reprocess the first half without realizing the second half succeeded.
Technical Solutions
Quick Patch: External Idempotency Table
Add a table keyed by event ID:
CREATE TABLE charge_dedupe (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Then update the consumer:
fun onChargeCreated(event: ChargeCreated) {
if (!idempotencyStore.tryInsert(event.id)) return
ledgerRepository.save(event.id, event.amount)
cache.put(event.id, event.amount)
}
tryInsert runs INSERT ... ON CONFLICT DO NOTHING. This prevents duplicates but centralizes state in the database, which might bottleneck with high throughput. It also assumes event IDs are unique and present.
Durable Fix: Layered Idempotency with Distributed Cache
Implement a two-layer guard: fast in-memory dedupe plus durable storage.
Go example using Redis and Postgres:
func processCharge(ctx context.Context, evt ChargeCreated) error {
if seen, _ := redisClient.SetNX(ctx, dedupeKey(evt.ID), 1, 30*time.Minute).Result(); !seen {
return nil
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx,
`INSERT INTO ledger (id, amount, status) VALUES ($1,$2,'posted') ON CONFLICT DO NOTHING`,
evt.ID, evt.Amount,
); err != nil {
return err
}
if _, err := tx.ExecContext(ctx,
`INSERT INTO charge_dedupe (event_id) VALUES ($1) ON CONFLICT DO NOTHING`,
evt.ID,
); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
cache.Set(evt.ID, LedgerEntry{Amount: evt.Amount})
return nil
}
Redis guards the hot path, the database guarantees durability. If Redis loses the entry, the database still blocks duplicates. Use an expiry on Redis to prevent memory leaks.
Broker-Level Tools
Enable Kafka idempotent producers and transactions if feasible:
enable.idempotence=true
acks=all
transactional.id=ledger-service-tx
Wrap consumer commits in transactions so offsets commit only after the database write succeeds. This ensures replays occur only when the consumer cannot confirm success.
Handling Partial Failures
Wrap side effects in the same transaction or compensate. If you update the ledger and emit a notification, ensure they share the same consistency guard. One approach is to emit outbox events from the same transaction and let a worker dispatch them.
Observability Hooks
Track dedupe hit rates and replay attempts. Emit metrics like charge_dedupe.hit and alert when it spikes. Spikes indicate either a replay or a bug in your producer. Keep logs short but informative: log.With("event_id", evt.ID).Warn("duplicate charge ignored").
Alprina Integration
Set up Alprina policies to flag consumers that write to multiple stores without referencing the idempotency helper. Scan for cache.put without preceding idempotencyStore.tryInsert.
Testing & Verification
Write property tests that simulate duplicates. In Go:
func TestProcessChargeIdempotent(t *testing.T) {
evt := ChargeCreated{ID: uuid.New(), Amount: 500}
require.NoError(t, processCharge(ctx, evt))
require.NoError(t, processCharge(ctx, evt))
entry, err := ledgerRepo.Get(ctx, evt.ID)
require.NoError(t, err)
require.Equal(t, 500, entry.Amount)
}
Add chaos tests that simulate replay storms by rewinding consumer offsets in a test cluster. Use kafka-go or testcontainers to run local brokers. Assert dedupe metrics stay within acceptable ranges.
In CI, run migration checks ensuring the dedupe table exists and has appropriate retention jobs. Also run performance benchmarks to confirm Redis latency stays below your SLO when dedupe keys accumulate.
Common Questions & Edge Cases
What if events lack IDs? Hash the payload. Combine stable fields (order ID, customer ID, created timestamp) and hash them with SHA-256. Make sure the combination truly represents uniqueness.
How do we expire dedupe entries? Use retention policies. Keep durable dedupe records for the SLA window during which duplicates matter. Beyond that, rely on downstream reconciliation.
Does exactly-once Kafka make this unnecessary? No. Producer bugs, cross-topic workflows, and manual offset rewinds still cause duplicates.
How do we prevent replay abuse by attackers? Rate-limit duplicates per source account and trace them. Suspicious spikes might indicate someone replaying webhook payloads.
Will Redis become a bottleneck? Consider sharding or using local LRU caches keyed by event ID prefix. Measure before optimizing.
Conclusion
Replays will happen, whether from flaky networks or operators flipping the wrong switch. Build layered idempotency: fast caches, durable tables, transactional writes, and metrics that tell you when the guardrails fire. The next time Kafka rewinds, you will see a spike in dedupe hits, not a spike in refund tickets.