

Incremental Cooperative Rebalancing in Kafka
A deep dive into KIP-429 and Cooperative Rebalancing in Kafka 2.4+, how it solves the stop-the-world problem, and the two-phase rebalance mechanics.
It’s September 2020. We’ve been running Kafka in production long enough to develop a specific grudge: consumer rebalances. The automatic failure and scaling handling is the whole point of consumer groups, but every rebalance is a stop-the-world pause, and at our partition counts those pauses hurt.
Kafka 2.3 through 2.5 ship an alternative: Incremental Cooperative Rebalancing, defined by KIP-429. It changes the consumer group protocol so that most rebalances no longer require every consumer to drop every partition.
This post walks through the protocol mechanics, the configuration switch, and the relevant code in the Kafka client.
The Old Way: Eager Rebalancing (Stop-the-World)#
Historically, Kafka used the Eager Rebalancing protocol. When a rebalance starts (e.g., a new consumer joins), every consumer in the group drops all their assigned partitions.
- Revoke All:
onPartitionsRevoked()is called. Processing stops. - JoinGroup: Consumers send a “I’m here” request to the coordinator.
- SyncGroup: The leader calculates a new assignment and sends it out.
- Assign:
onPartitionsAssigned()is called. Processing resumes.
This is the “stop-the-world” effect. Even if you just added one consumer to a group of 100, all 100 consumers stop, revoke everything, and wait. It’s inefficient and causes massive latency spikes.
The New Way: Cooperative Rebalancing (KIP-429)#
KIP-429’s design rule is straightforward: don’t revoke a partition unless you actually have to.
If a consumer holds partition topic-0 and is going to keep holding topic-0 after the rebalance, why revoke it? Why close the file handles, clear the caches, and stop processing?
1. Configuration#
To enable this, you need to switch your partition assignment strategy. The class is CooperativeStickyAssignor.
In your consumer.properties or Java config:
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignorpropertiesIf you are doing a rolling upgrade from the old RangeAssignor or StickyAssignor, you can specify both to allow the group to transition smoothly:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor,org.apache.kafka.clients.consumer.CooperativeStickyAssignorproperties2. Protocol Mechanics: The Two-Phase Rebalance#
The Cooperative protocol splits the rebalance into two phases if partitions need to move.
Phase 1: The “Revoke Only What’s Needed” Phase
- JoinGroup: Consumers send their subscription and their currently owned partitions to the group coordinator. They do not revoke everything yet.
- Assignment (Leader Logic): The group leader (running the
CooperativeStickyAssignor) calculates the new assignment. It sees that Consumer A ownsp1andp2, but needs to givep2to Consumer B. - SyncGroup: The leader sends the new assignment back.
- Consumer A is told: “You are assigned
p1.” - Consumer B is told: “You are assigned
p3” (it doesn’t getp2yet because A still owns it).
- Consumer A is told: “You are assigned
- Client Logic:
- Consumer A compares its owned (
p1,p2) with its assigned (p1). - It sees
p2is no longer assigned. Now it triggersonPartitionsRevoked([p2]). - Because it revoked a partition, it immediately triggers a second rebalance.
- Consumer A compares its owned (
Phase 2: The “Assignment” Phase
- JoinGroup: Consumer A joins again. It now owns only
p1.p2is free. - Assignment: The leader sees
p2is free and assigns it to Consumer B. - SyncGroup:
- Consumer B is told: “You are assigned
p3,p2.” - Consumer B triggers
onPartitionsAssigned([p2]).
- Consumer B is told: “You are assigned
Throughout this entire process, Consumer A kept processing p1 without interruption.
3. Under the Hood: ConsumerCoordinator.java#
If we look at the ConsumerCoordinator class in the Kafka client libraries (around version 2.4/2.5), we can see the logic distinguishing these modes.
In the onJoinComplete method, the coordinator handles the assignment received from the leader.
For Eager (the old way), it was simple: we already revoked everything in onJoinPrepare before sending the JoinGroup request.
For Cooperative, the logic changes. We haven’t revoked anything yet.
// Pseudo-code representation of ConsumerCoordinator.java logic
if (protocol == RebalanceProtocol.COOPERATIVE) {
// Calculate what we need to revoke based on the new assignment
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
if (!revokedPartitions.isEmpty()) {
// Phase 1: Revoke only the partitions we are losing
log.info("Revoking partitions {}", revokedPartitions);
rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
// Trigger the second rebalance to release these partitions to the group
requestRejoin("need to revoke partitions and re-join");
}
}
// ...
// Add partitions that are new to us
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
rebalanceListenerInvoker.invokePartitionsAssigned(addedPartitions);javaThe difference in onJoinPrepare (which runs before joining) is:
- Eager:
revokedPartitions.addAll(subscriptions.assignedPartitions());(Revoke everything) - Cooperative: Only revoke partitions that are no longer in the subscription (e.g., if you unsubscribed from a topic).
4. Handling Exceptions: RebalanceInProgressException#
One of the side effects of this architecture is that consumers can keep doing work during a rebalance.
In Kafka 2.5, a new optimization allows the consumer to return records even while a rebalance is pending. However, if you try to commit offsets for a partition that might be reassigned, you might run into RebalanceInProgressException.
This exception is a signal: “I can’t commit this right now because the group is stabilizing, but you still own the partition, so keep processing and try again.”
try {
consumer.commitSync();
} catch (RebalanceInProgressException e) {
// It's okay! The rebalance is happening.
// We can continue polling and try committing again in the next loop.
}javaOperational impact#
CooperativeStickyAssignor does not eliminate rebalancing. Group membership changes still trigger the protocol; consumer offsets still need to be committed; you still need to handle RebalanceInProgressException from commitSync. What it removes is the stop-the-world pause for partitions that aren’t actually moving.
For us, that’s the difference between a deploy that nobody notices and a deploy that spikes our consumer lag for several minutes. We’ve been running CooperativeStickyAssignor since the 2.4 upgrade and have stopped scheduling rollouts around low-traffic windows for this reason.