TheAnig

Back

It’s September 2020. We’ve been running Kafka in production for a while now, and if you’re like me, you have a love-hate relationship with consumer rebalances. You love that Kafka automatically handles consumer failures and scaling, but you hate the “stop-the-world” pause that comes with it.

With the recent releases of Kafka 2.3, 2.4, and the polish in 2.5, there’s a new game in town: Incremental Cooperative Rebalancing. This isn’t just a minor tweak; it’s a fundamental shift in how the consumer group protocol works, driven by KIP-429.

A Brief Overview of Kafka Architecture

A Brief Overview of Kafka Architecture

As developers, we need to understand what’s happening under the hood. Let’s examine the new protocol, look at the configuration, and check the code to see how it actually works.

The Old Way: Eager Rebalancing (Stop-the-World)#

Historically, Kafka used the Eager Rebalancing protocol. When a rebalance starts (e.g., a new consumer joins), every consumer in the group drops all their assigned partitions.

  1. Revoke All: onPartitionsRevoked() is called. Processing stops.
  2. JoinGroup: Consumers send a “I’m here” request to the coordinator.
  3. SyncGroup: The leader calculates a new assignment and sends it out.
  4. Assign: onPartitionsAssigned() is called. Processing resumes.

This is the “stop-the-world” effect. Even if you just added one consumer to a group of 100, all 100 consumers stop, revoke everything, and wait. It’s inefficient and causes massive latency spikes.

The New Way: Cooperative Rebalancing (KIP-429)#

Enter KIP-429. The goal is simple: don’t revoke a partition unless you absolutely have to.

If a consumer holds partition topic-0 and is going to keep holding topic-0 after the rebalance, why revoke it? Why close the file handles, clear the caches, and stop processing?

1. Configuration#

To enable this, you need to switch your partition assignment strategy. The class is CooperativeStickyAssignor.

In your consumer.properties or Java config:

partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
properties

If you are doing a rolling upgrade from the old RangeAssignor or StickyAssignor, you can specify both to allow the group to transition smoothly:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor,org.apache.kafka.clients.consumer.CooperativeStickyAssignor
properties

2. Protocol Mechanics: The Two-Phase Rebalance#

The Cooperative protocol splits the rebalance into two phases if partitions need to move.

Phase 1: The “Revoke Only What’s Needed” Phase

  1. JoinGroup: Consumers send their subscription and their currently owned partitions to the group coordinator. They do not revoke everything yet.
  2. Assignment (Leader Logic): The group leader (running the CooperativeStickyAssignor) calculates the new assignment. It sees that Consumer A owns p1 and p2, but needs to give p2 to Consumer B.
  3. SyncGroup: The leader sends the new assignment back.
    • Consumer A is told: “You are assigned p1.”
    • Consumer B is told: “You are assigned p3” (it doesn’t get p2 yet because A still owns it).
  4. Client Logic:
    • Consumer A compares its owned (p1, p2) with its assigned (p1).
    • It sees p2 is no longer assigned. Now it triggers onPartitionsRevoked([p2]).
    • Because it revoked a partition, it immediately triggers a second rebalance.

Phase 2: The “Assignment” Phase

  1. JoinGroup: Consumer A joins again. It now owns only p1. p2 is free.
  2. Assignment: The leader sees p2 is free and assigns it to Consumer B.
  3. SyncGroup:
    • Consumer B is told: “You are assigned p3, p2.”
    • Consumer B triggers onPartitionsAssigned([p2]).

Throughout this entire process, Consumer A kept processing p1 without interruption.

3. Under the Hood: ConsumerCoordinator.java#

If we look at the ConsumerCoordinator class in the Kafka client libraries (around version 2.4/2.5), we can see the logic distinguishing these modes.

In the onJoinComplete method, the coordinator handles the assignment received from the leader.

For Eager (the old way), it was simple: we already revoked everything in onJoinPrepare before sending the JoinGroup request.

For Cooperative, the logic changes. We haven’t revoked anything yet.

The key difference in onJoinPrepare (which runs before joining) is also stark:

  • Eager: revokedPartitions.addAll(subscriptions.assignedPartitions()); (Revoke everything)
  • Cooperative: Only revoke partitions that are no longer in the subscription (e.g., if you unsubscribed from a topic).

4. Handling Exceptions: RebalanceInProgressException#

One of the side effects of this architecture is that consumers can keep doing work during a rebalance.

In Kafka 2.5, a new optimization allows the consumer to return records even while a rebalance is pending. However, if you try to commit offsets for a partition that might be reassigned, you might run into RebalanceInProgressException.

This exception is a signal: “I can’t commit this right now because the group is stabilizing, but you still own the partition, so keep processing and try again.”

try {
    consumer.commitSync();
} catch (RebalanceInProgressException e) {
    // It's okay! The rebalance is happening.
    // We can continue polling and try committing again in the next loop.
}
java

Wrapping Up#

If you are running Kafka 2.4 or newer, I highly recommend testing this out. The CooperativeStickyAssignor solves one of the most persistent operational headaches we’ve had with Kafka consumer groups.

It doesn’t make rebalancing go away. You still need to handle it, but it stops it from being a disruptive event. For us, it meant we could finally deploy during the day without staring at latency graphs, hoping the group stabilizes before the lag gets too high.

Incremental Cooperative Rebalancing in Kafka
https://theanig.dev/blog/incremental-cooperative-rebalancing-kafka
Author Anirudh Ganesh
Published at September 4, 2020