Incremental Cooperative Rebalancing in Kafka
A deep dive into KIP-429 and Cooperative Rebalancing in Kafka 2.4+, how it solves the stop-the-world problem, and the two-phase rebalance mechanics.
It’s September 2020. We’ve been running Kafka in production for a while now, and if you’re like me, you have a love-hate relationship with consumer rebalances. You love that Kafka automatically handles consumer failures and scaling, but you hate the “stop-the-world” pause that comes with it.
With the recent releases of Kafka 2.3, 2.4, and the polish in 2.5, there’s a new game in town: Incremental Cooperative Rebalancing. This isn’t just a minor tweak; it’s a fundamental shift in how the consumer group protocol works, driven by KIP-429.
As developers, we need to understand what’s happening under the hood. Let’s examine the new protocol, look at the configuration, and check the code to see how it actually works.
The Old Way: Eager Rebalancing (Stop-the-World)#
Historically, Kafka used the Eager Rebalancing protocol. When a rebalance starts (e.g., a new consumer joins), every consumer in the group drops all their assigned partitions.
- Revoke All:
onPartitionsRevoked()is called. Processing stops. - JoinGroup: Consumers send a “I’m here” request to the coordinator.
- SyncGroup: The leader calculates a new assignment and sends it out.
- Assign:
onPartitionsAssigned()is called. Processing resumes.
This is the “stop-the-world” effect. Even if you just added one consumer to a group of 100, all 100 consumers stop, revoke everything, and wait. It’s inefficient and causes massive latency spikes.
The New Way: Cooperative Rebalancing (KIP-429)#
Enter KIP-429. The goal is simple: don’t revoke a partition unless you absolutely have to.
If a consumer holds partition topic-0 and is going to keep holding topic-0 after the rebalance, why revoke it? Why close the file handles, clear the caches, and stop processing?
1. Configuration#
To enable this, you need to switch your partition assignment strategy. The class is CooperativeStickyAssignor.
In your consumer.properties or Java config:
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignorpropertiesIf you are doing a rolling upgrade from the old RangeAssignor or StickyAssignor, you can specify both to allow the group to transition smoothly:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor,org.apache.kafka.clients.consumer.CooperativeStickyAssignorproperties2. Protocol Mechanics: The Two-Phase Rebalance#
The Cooperative protocol splits the rebalance into two phases if partitions need to move.
Phase 1: The “Revoke Only What’s Needed” Phase
- JoinGroup: Consumers send their subscription and their currently owned partitions to the group coordinator. They do not revoke everything yet.
- Assignment (Leader Logic): The group leader (running the
CooperativeStickyAssignor) calculates the new assignment. It sees that Consumer A ownsp1andp2, but needs to givep2to Consumer B. - SyncGroup: The leader sends the new assignment back.
- Consumer A is told: “You are assigned
p1.” - Consumer B is told: “You are assigned
p3” (it doesn’t getp2yet because A still owns it).
- Consumer A is told: “You are assigned
- Client Logic:
- Consumer A compares its owned (
p1,p2) with its assigned (p1). - It sees
p2is no longer assigned. Now it triggersonPartitionsRevoked([p2]). - Because it revoked a partition, it immediately triggers a second rebalance.
- Consumer A compares its owned (
Phase 2: The “Assignment” Phase
- JoinGroup: Consumer A joins again. It now owns only
p1.p2is free. - Assignment: The leader sees
p2is free and assigns it to Consumer B. - SyncGroup:
- Consumer B is told: “You are assigned
p3,p2.” - Consumer B triggers
onPartitionsAssigned([p2]).
- Consumer B is told: “You are assigned
Throughout this entire process, Consumer A kept processing p1 without interruption.
3. Under the Hood: ConsumerCoordinator.java#
If we look at the ConsumerCoordinator class in the Kafka client libraries (around version 2.4/2.5), we can see the logic distinguishing these modes.
In the onJoinComplete method, the coordinator handles the assignment received from the leader.
For Eager (the old way), it was simple: we already revoked everything in onJoinPrepare before sending the JoinGroup request.
For Cooperative, the logic changes. We haven’t revoked anything yet.
// Pseudo-code representation of ConsumerCoordinator.java logic
if (protocol == RebalanceProtocol.COOPERATIVE) {
// Calculate what we need to revoke based on the new assignment
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
if (!revokedPartitions.isEmpty()) {
// Phase 1: Revoke only the partitions we are losing
log.info("Revoking partitions {}", revokedPartitions);
rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
// Trigger the second rebalance to release these partitions to the group
requestRejoin("need to revoke partitions and re-join");
}
}
// ...
// Add partitions that are new to us
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
rebalanceListenerInvoker.invokePartitionsAssigned(addedPartitions);javaThe key difference in onJoinPrepare (which runs before joining) is also stark:
- Eager:
revokedPartitions.addAll(subscriptions.assignedPartitions());(Revoke everything) - Cooperative: Only revoke partitions that are no longer in the subscription (e.g., if you unsubscribed from a topic).
4. Handling Exceptions: RebalanceInProgressException#
One of the side effects of this architecture is that consumers can keep doing work during a rebalance.
In Kafka 2.5, a new optimization allows the consumer to return records even while a rebalance is pending. However, if you try to commit offsets for a partition that might be reassigned, you might run into RebalanceInProgressException.
This exception is a signal: “I can’t commit this right now because the group is stabilizing, but you still own the partition, so keep processing and try again.”
try {
consumer.commitSync();
} catch (RebalanceInProgressException e) {
// It's okay! The rebalance is happening.
// We can continue polling and try committing again in the next loop.
}javaWrapping Up#
If you are running Kafka 2.4 or newer, I highly recommend testing this out. The CooperativeStickyAssignor solves one of the most persistent operational headaches we’ve had with Kafka consumer groups.
It doesn’t make rebalancing go away. You still need to handle it, but it stops it from being a disruptive event. For us, it meant we could finally deploy during the day without staring at latency graphs, hoping the group stabilizes before the lag gets too high.