Circuit Breaker Pattern with Kafka: A Complete Guide - Part 2
Circuit Breaker Pattern with Kafka: A Complete Guide
Part 2: Implementation and Real-World Scenarios
Series Navigation:
- Part 1: Understanding the Fundamentals
- Part 2: Implementation and Real-World Scenarios (You are here)
- Part 3: Challenges, Edge Cases, and Alternatives
- Part 4: Configuration, Testing, and Best Practices
Implementation Overview
The most common implementation approach uses Resilience4j with Spring Kafka. The key insight is to listen for circuit breaker state transitions and control the Kafka consumer accordingly.
Core Concept
Circuit Breaker State Change
β
βΌ
βββββββββββββββββββββββββββ
β State Transition β
β Event Handler β
βββββββββββββββββββββββββββ€
β CLOSED_TO_OPEN βββββΆ consumer.pause()
β HALF_OPEN_TO_OPEN βββββΆ consumer.pause()
β OPEN_TO_HALF_OPEN βββββΆ (keep paused or resume for test)
β HALF_OPEN_TO_CLOSED βββββΆ consumer.resume()
βββββββββββββββββββββββββββ
Implementation Approach 1: Resilience4j with Spring Kafka
This is the recommended approach for Java/Spring applications.
Step 1: Configure the Circuit Breaker
Define circuit breaker settings in your application configuration:
resilience4j:
circuitbreaker:
instances:
externalApiBreaker:
failure-rate-threshold: 50
slow-call-rate-threshold: 80
slow-call-duration-threshold: 2s
sliding-window-type: COUNT_BASED
sliding-window-size: 10
minimum-number-of-calls: 5
wait-duration-in-open-state: 30s
permitted-number-of-calls-in-half-open-state: 3
automatic-transition-from-open-to-half-open-enabled: true
Step 2: Create a Circuit Breaker Supervisor
The supervisor listens for state transitions and controls the consumer:
@Configuration
public class CircuitBreakerSupervisor {
private final KafkaListenerEndpointRegistry registry;
private final CircuitBreaker circuitBreaker;
public CircuitBreakerSupervisor(
KafkaListenerEndpointRegistry registry,
CircuitBreakerRegistry circuitBreakerRegistry) {
this.registry = registry;
this.circuitBreaker = circuitBreakerRegistry
.circuitBreaker("externalApiBreaker");
setupStateTransitionHandler();
}
private void setupStateTransitionHandler() {
circuitBreaker.getEventPublisher()
.onStateTransition(this::handleStateChange);
}
private void handleStateChange(CircuitBreakerOnStateTransitionEvent event) {
StateTransition transition = event.getStateTransition();
switch (transition) {
case CLOSED_TO_OPEN:
case CLOSED_TO_FORCED_OPEN:
case HALF_OPEN_TO_OPEN:
pauseConsumer();
break;
case OPEN_TO_HALF_OPEN:
case HALF_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_HALF_OPEN:
resumeConsumer();
break;
}
}
private void pauseConsumer() {
MessageListenerContainer container =
registry.getListenerContainer("myListener");
if (container != null && container.isRunning()) {
container.pause();
log.info("Kafka consumer paused - circuit breaker OPEN");
}
}
private void resumeConsumer() {
MessageListenerContainer container =
registry.getListenerContainer("myListener");
if (container != null && container.isPaused()) {
container.resume();
log.info("Kafka consumer resumed - circuit breaker CLOSED");
}
}
}
Step 3: Wrap API Calls with Circuit Breaker
@Service
public class OrderProcessor {
private final CircuitBreaker circuitBreaker;
private final ExternalApiClient apiClient;
public void processOrder(Order order) {
// Circuit breaker wraps the external call
Supplier<Response> decoratedCall = CircuitBreaker
.decorateSupplier(circuitBreaker,
() -> apiClient.submitOrder(order));
try {
Response response = decoratedCall.get();
// Process successful response
} catch (CallNotPermittedException e) {
// Circuit is OPEN - request was not made
throw new ServiceUnavailableException(
"External service unavailable");
}
}
}
Step 4: Configure Kafka Consumer
@KafkaListener(
id = "myListener",
topics = "orders",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, Order> record,
Acknowledgment ack) {
try {
orderProcessor.processOrder(record.value());
ack.acknowledge(); // Only commit on success
} catch (ServiceUnavailableException e) {
// Don't acknowledge - message will be reprocessed
ack.nack(Duration.ZERO);
}
}
Implementation Approach 2: Language-Agnostic Pseudocode
For other languages or custom implementations:
CONFIGURATION:
FAILURE_THRESHOLD = 5
FAILURE_RATE_THRESHOLD = 50%
WAIT_DURATION = 30 seconds
TEST_CALLS = 3
STATE:
circuit_state = CLOSED
failure_count = 0
success_count = 0
last_failure_time = null
window = []
FUNCTION on_message_received(message):
IF circuit_state == OPEN:
IF current_time - last_failure_time > WAIT_DURATION:
circuit_state = HALF_OPEN
success_count = 0
log("Circuit entering HALF-OPEN state")
ELSE:
RETURN // Message stays in Kafka
TRY:
result = call_external_api(message.payload)
IF circuit_state == HALF_OPEN:
success_count++
IF success_count >= TEST_CALLS:
circuit_state = CLOSED
kafka_consumer.resume()
reset_counters()
log("Circuit CLOSED - service recovered")
commit_offset(message)
CATCH error:
record_failure()
IF should_open_circuit():
circuit_state = OPEN
last_failure_time = current_time
kafka_consumer.pause()
log("Circuit OPEN - pausing consumer")
// DO NOT commit offset
FUNCTION record_failure():
window.append(current_time)
// Remove old entries outside sliding window
window = window.filter(t => current_time - t < WINDOW_SIZE)
failure_count = window.length
FUNCTION should_open_circuit():
IF window.length < MIN_CALLS:
RETURN false
failure_rate = failure_count / total_calls_in_window
RETURN failure_rate >= FAILURE_RATE_THRESHOLD
Implementation Approach 3: Node.js with opossum
const CircuitBreaker = require('opossum')
const { Kafka } = require('kafkajs')
const kafka = new Kafka({ brokers: ['localhost:9092'] })
const consumer = kafka.consumer({ groupId: 'order-processor' })
// Configure circuit breaker
const breakerOptions = {
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000,
volumeThreshold: 5
}
const apiBreaker = new CircuitBreaker(callExternalApi, breakerOptions)
// Handle state changes
apiBreaker.on('open', async () => {
console.log('Circuit OPEN - pausing Kafka consumer')
await consumer.pause([{ topic: 'orders' }])
})
apiBreaker.on('halfOpen', () => {
console.log('Circuit HALF-OPEN - testing recovery')
})
apiBreaker.on('close', async () => {
console.log('Circuit CLOSED - resuming Kafka consumer')
await consumer.resume([{ topic: 'orders' }])
})
// Consumer setup
await consumer.connect()
await consumer.subscribe({ topic: 'orders' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await apiBreaker.fire(JSON.parse(message.value))
// Offset auto-committed on success
} catch (error) {
if (error.message === 'Breaker is open') {
// Circuit is open, message will be reprocessed
throw error
}
// Other errors - also don't commit
throw error
}
}
})
async function callExternalApi(payload) {
const response = await fetch('https://api.example.com/orders', {
method: 'POST',
body: JSON.stringify(payload)
})
if (!response.ok) throw new Error(`API error: ${response.status}`)
return response.json()
}
Real-World Scenario 1: Database Outage
Your order processing service consumes from Kafka and writes to a database. The database becomes unavailable.
The Flow
Timeline:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ
T0: Database healthy, circuit CLOSED
βββ Messages processing normally
T1: Database connection fails
βββ Circuit records failure (1/5)
T2-T4: More failures
βββ Circuit records failures (5/5)
T5: Threshold exceeded
βββ Circuit OPENS
βββ Consumer PAUSES
βββ Messages accumulate in Kafka
T5-T35: Wait duration (30 seconds)
βββ No processing
βββ Database team fixes issue
T35: Wait expires
βββ Circuit enters HALF-OPEN
βββ Test write to database
T36: Test succeeds
βββ Circuit CLOSES
βββ Consumer RESUMES
T37+: Backlog processing
βββ All accumulated messages processed in order
Key Points
- Messages are never lost
- Order is preserved
- Recovery is automatic
- No manual replay needed
Real-World Scenario 2: External API Rate Limiting
Your service calls a third-party API that has rate limits. When you exceed the limit, you receive 429 responses.
The Challenge
Rate limiting is different from outages:
- The service isn't "down" - it's protecting itself
- You need to wait for the rate limit window to reset
- Hammering it with retries makes things worse
The Solution
Configure the circuit breaker to trip on 429 responses:
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.recordExceptions(RateLimitExceededException.class)
.ignoreExceptions(ValidationException.class)
.waitDurationInOpenState(Duration.ofSeconds(60)) // Match rate limit window
.build();
The Flow
1. Consumer processes messages, calling API
2. API returns 429 (rate limit exceeded)
3. Circuit breaker records as failure
4. After threshold, circuit OPENS
5. Consumer PAUSES for 60 seconds (rate limit window)
6. Circuit enters HALF-OPEN
7. Test call succeeds (rate limit reset)
8. Resume processing at sustainable rate
Real-World Scenario 3: Multiple Downstream Services
Your consumer calls three different APIs: Payment, Inventory, and Shipping. Only the Payment API is failing.
The Problem
With a single circuit breaker:
- Payment API fails β Circuit opens
- ALL processing stops
- Inventory and Shipping calls stop too (even though they're healthy)
Solution: Separate Circuit Breakers
@Service
public class OrderProcessor {
private final CircuitBreaker paymentBreaker;
private final CircuitBreaker inventoryBreaker;
private final CircuitBreaker shippingBreaker;
public OrderResult processOrder(Order order) {
// Each call has its own circuit breaker
PaymentResult payment = paymentBreaker
.executeSupplier(() -> paymentApi.charge(order));
InventoryResult inventory = inventoryBreaker
.executeSupplier(() -> inventoryApi.reserve(order));
ShippingResult shipping = shippingBreaker
.executeSupplier(() -> shippingApi.schedule(order));
return new OrderResult(payment, inventory, shipping);
}
}
Consumer Pause Strategy
Now you have a choice:
Option A: Pause on ANY circuit open
// Conservative - stops processing if any downstream fails
if (paymentBreaker.isOpen() ||
inventoryBreaker.isOpen() ||
shippingBreaker.isOpen()) {
consumer.pause();
}
Option B: Pause only on critical circuits
// Only pause for payment failures (critical path)
if (paymentBreaker.isOpen()) {
consumer.pause();
}
// Inventory and shipping can use fallbacks
Option C: Separate topics for independent processing
orders-topic βββΆ order-consumer βββΆ payment-api (critical)
inventory-topic βββΆ inventory-consumer βββΆ inventory-api
shipping-topic βββΆ shipping-consumer βββΆ shipping-api
Real-World Scenario 4: Handling Already-Polled Messages
When you pause a Kafka consumer, it doesn't stop immediately. Messages already fetched (in the local buffer) continue processing.
The Problem
1. Consumer polls batch of 500 messages
2. Starts processing message 1
3. Message 1 fails, circuit OPENS
4. consumer.pause() called
5. But messages 2-500 are already in memory!
6. They continue processing and failing
Solution: Use nack() to Reject Remaining Messages
@KafkaListener(id = "myListener", topics = "orders")
public void consume(ConsumerRecord<String, Order> record,
Acknowledgment ack) {
// Check circuit state BEFORE processing
if (circuitBreaker.getState() == State.OPEN) {
// Reject this message - it will be reprocessed later
ack.nack(Duration.ZERO);
return;
}
try {
orderProcessor.processOrder(record.value());
ack.acknowledge();
} catch (CallNotPermittedException e) {
// Circuit just opened during our processing
ack.nack(Duration.ZERO);
} catch (ServiceException e) {
// Downstream failure
ack.nack(Duration.ZERO);
}
}
Alternative: Reduce Batch Size
Smaller batches mean fewer "stranded" messages:
spring:
kafka:
consumer:
max-poll-records: 10 # Instead of default 500
Trade-off: Lower throughput during normal operation.
Real-World Scenario 5: Preventing Retry Storms
After an outage, when the circuit closes, all consumers resume simultaneously. This can overwhelm the recovering service.
The Thundering Herd Problem
T0: Service recovers
T1: 50 consumers detect recovery (HALF-OPEN β CLOSED)
T2: 50 consumers resume simultaneously
T3: Each has 10,000 message backlog
T4: 500,000 messages flood the recovering service
T5: Service crashes again
T6: Circuits reopen
Solutions
1. Staggered Resume
Add random jitter to resume timing:
private void resumeConsumer() {
// Random delay between 0-5 seconds
int delayMs = random.nextInt(5000);
scheduler.schedule(() -> {
container.resume();
}, delayMs, TimeUnit.MILLISECONDS);
}
2. Rate Limiting After Resume
Limit processing rate during recovery:
private final RateLimiter rateLimiter =
RateLimiter.create(100); // 100 messages per second
public void consume(Order order) {
rateLimiter.acquire(); // Blocks if rate exceeded
orderProcessor.processOrder(order);
}
3. Gradual Backlog Processing
Track circuit recovery time and gradually increase rate:
private Instant circuitClosedAt;
public void onCircuitClosed() {
circuitClosedAt = Instant.now();
}
public int getCurrentRateLimit() {
Duration sinceRecovery = Duration.between(circuitClosedAt, Instant.now());
if (sinceRecovery.toMinutes() < 1) return 10; // First minute: 10/sec
if (sinceRecovery.toMinutes() < 5) return 50; // Next 4 minutes: 50/sec
return 200; // Normal rate
}
Offset Commit Strategy
The offset commit strategy is critical. Never commit offsets for failed messages.
Correct Pattern
βββββββββββ βββββββββββ βββββββββββ βββββββββββ
β Receive ββββΆβ Process ββββΆβ API OK ββββΆβ COMMIT β
β Message β β β β β β OFFSET β
βββββββββββ βββββββββββ βββββββββββ βββββββββββ
β
β If API fails at any point:
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DO NOT COMMIT OFFSET β
β Message will be reprocessed β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Configuration
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: MANUAL_IMMEDIATE
What's Next
In Part 3, we'll explore:
- Edge cases and failure modes
- Consumer group rebalancing challenges
- Distributed circuit breaker state
- Alternative approaches (DLQ, retry topics, bulkhead)
- When NOT to use this pattern