Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
// server to acknowledge the leader change before proceeding to the next migration.
for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket :
notifyLeaderAndIsrResultForBuckets) {
tryToCompleteRebalanceTask(notifyLeaderAndIsrResultForBucket.getTableBucket());
tryToCompleteRebalanceTask(notifyLeaderAndIsrResultForBucket, serverId);
}
}

Expand Down Expand Up @@ -1491,7 +1491,10 @@ public void tryToExecuteRebalanceTask(RebalancePlanForBucket planForBucket) {
}

/** try to finish rebalance tasks after receive notify leader and isr response. */
private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
private void tryToCompleteRebalanceTask(
NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket,
int responseServerId) {
TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket();
RebalancePlanForBucket planForBucket =
rebalanceManager.getRebalancePlanForBucket(tableBucket);
if (planForBucket != null) {
Expand All @@ -1502,21 +1505,17 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
if (planForBucket.isLeaderChanged() && !reassignment.isBeingReassigned()) {
LeaderAndIsr leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get();
int currentLeader = leaderAndIsr.leader();
if (currentLeader == planForBucket.getNewLeader()) {
if (canCompleteLeaderOnlyRebalanceTask(
notifyLeaderAndIsrResultForBucket,
responseServerId,
planForBucket,
currentLeader)) {
// leader action finish.
rebalanceManager.finishRebalanceTask(
tableBucket, RebalanceStatus.COMPLETED);
}
} else {
boolean isReassignmentComplete =
isReassignmentComplete(tableBucket, reassignment);
if (isReassignmentComplete) {
LOG.info(
"Target replicas {} have all caught up with the leader for reassigning bucket {}",
reassignment.getTargetReplicas(),
tableBucket);
onBucketReassignment(tableBucket, reassignment, true);
}
} else if (notifyLeaderAndIsrResultForBucket.succeeded()) {
tryToCompleteReassignmentTask(tableBucket, reassignment);
}
} catch (Exception e) {
LOG.error(
Expand All @@ -1526,6 +1525,49 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
}
}

private void tryToCompleteRebalanceTaskOnLeaderAndIsrChange(TableBucket tableBucket) {
RebalancePlanForBucket planForBucket =
rebalanceManager.getRebalancePlanForBucket(tableBucket);
if (planForBucket != null) {
ReplicaReassignment reassignment =
ReplicaReassignment.build(
planForBucket.getOriginReplicas(), planForBucket.getNewReplicas());
if (planForBucket.isLeaderChanged() && !reassignment.isBeingReassigned()) {
return;
}
try {
tryToCompleteReassignmentTask(tableBucket, reassignment);
} catch (Exception e) {
LOG.error(
"Failed to complete the reassignment for table bucket {}", tableBucket, e);
rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.FAILED);
}
}
}

private void tryToCompleteReassignmentTask(
TableBucket tableBucket, ReplicaReassignment reassignment) throws Exception {
boolean isReassignmentComplete = isReassignmentComplete(tableBucket, reassignment);
if (isReassignmentComplete) {
LOG.info(
"Target replicas {} have all caught up with the leader for reassigning bucket {}",
reassignment.getTargetReplicas(),
tableBucket);
onBucketReassignment(tableBucket, reassignment, true);
}
}

@VisibleForTesting
static boolean canCompleteLeaderOnlyRebalanceTask(
NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket,
int responseServerId,
RebalancePlanForBucket planForBucket,
int currentLeader) {
return notifyLeaderAndIsrResultForBucket.succeeded()
&& responseServerId == planForBucket.getNewLeader()
&& currentLeader == planForBucket.getNewLeader();
}

/**
* Reassigning replicas for a tableBucket goes through a few steps listed in the code.
*
Expand Down Expand Up @@ -1817,7 +1859,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);

// First, try to judge whether the bucket is in rebalance task when isr change.
newLeaderAndIsrList.keySet().forEach(this::tryToCompleteRebalanceTask);
newLeaderAndIsrList.keySet().forEach(this::tryToCompleteRebalanceTaskOnLeaderAndIsrChange);

// TODO update metadata for all alive tablet servers.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
Expand All @@ -59,6 +61,7 @@
import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.CommitRemoteLogManifestData;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
Expand Down Expand Up @@ -1483,13 +1486,13 @@ void testLeaderOnlyRebalanceExecutesSequentially() throws Exception {
// Set up controlled gateways that capture NotifyLeaderAndIsr calls.
// Gateways start in pass-through mode for table creation, then switch
// to controlled mode to verify sequential leader migration.
ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers =
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers =
new ConcurrentLinkedDeque<>();
int[] servers = zookeeperClient.getSortedTabletServerList();
Map<Integer, TabletServerGateway> gateways = new HashMap<>();
ControlledNotifyGateway[] controlledGateways = new ControlledNotifyGateway[servers.length];
for (int i = 0; i < servers.length; i++) {
ControlledNotifyGateway gw = new ControlledNotifyGateway(pendingTriggers);
ControlledNotifyGateway gw = new ControlledNotifyGateway(servers[i], pendingTriggers);
gateways.put(servers[i], gw);
controlledGateways[i] = gw;
}
Expand Down Expand Up @@ -1585,6 +1588,102 @@ void testLeaderOnlyRebalanceExecutesSequentially() throws Exception {
verifyIsr(tb2, 1, Arrays.asList(0, 1, 2));
}

@Test
void testLeaderOnlyRebalanceCompletionRequiresSuccessfulResponseFromNewLeader() {
TableBucket tableBucket = new TableBucket(1L, 0);
RebalancePlanForBucket planForBucket =
new RebalancePlanForBucket(
tableBucket, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 0, 2));
NotifyLeaderAndIsrResultForBucket successResult =
new NotifyLeaderAndIsrResultForBucket(tableBucket);
NotifyLeaderAndIsrResultForBucket failedResult =
new NotifyLeaderAndIsrResultForBucket(
tableBucket, new ApiError(Errors.UNKNOWN_SERVER_ERROR, "failed"));

assertThat(
CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask(
successResult, 1, planForBucket, 1))
.isTrue();
assertThat(
CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask(
successResult, 0, planForBucket, 1))
.isFalse();
assertThat(
CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask(
failedResult, 1, planForBucket, 1))
.isFalse();
assertThat(
CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask(
successResult, 1, planForBucket, 0))
.isFalse();
}

@Test
void testLeaderOnlyRebalanceIgnoresSuccessResponseFromOldLeader() throws Exception {
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers =
new ConcurrentLinkedDeque<>();
int[] servers = zookeeperClient.getSortedTabletServerList();
Map<Integer, TabletServerGateway> gateways = new HashMap<>();
ControlledNotifyGateway[] controlledGateways = new ControlledNotifyGateway[servers.length];
for (int i = 0; i < servers.length; i++) {
ControlledNotifyGateway gw = new ControlledNotifyGateway(servers[i], pendingTriggers);
gateways.put(servers[i], gw);
controlledGateways[i] = gw;
}
testCoordinatorChannelManager.setGateways(gateways);

TablePath t1 = TablePath.of(defaultDatabase, "test_leader_rebalance_wait_new_leader");
Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
bucketAssignments.put(0, BucketAssignment.of(0, 1, 2));
TableAssignment tableAssignment = new TableAssignment(bucketAssignments);
long t1Id =
metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false);

TableBucket tb0 = new TableBucket(t1Id, 0);

verifyIsr(tb0, 0, Arrays.asList(0, 1, 2));

for (ControlledNotifyGateway gw : controlledGateways) {
gw.enableControlMode();
}
pendingTriggers.clear();

Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
rebalancePlan.put(
tb0,
new RebalancePlanForBucket(
tb0, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 0, 2)));

eventProcessor
.getRebalanceManager()
.registerRebalance(
"rebalance-wait-new-leader-response",
rebalancePlan,
RebalanceStatus.NOT_STARTED);

retry(
Duration.ofMinutes(1),
() -> assertThat(hasPendingNotifyTrigger(pendingTriggers, 0)).isTrue());
retry(
Duration.ofMinutes(1),
() -> assertThat(hasPendingNotifyTrigger(pendingTriggers, 1)).isTrue());
assertThat(countInProgressRebalanceTasks(tb0)).isEqualTo(1);

completePendingNotifyTrigger(pendingTriggers, 0);
fromCtx(ctx -> null);

assertThat(countInProgressRebalanceTasks(tb0)).isEqualTo(1);
assertThat(eventProcessor.getRebalanceManager().hasInProgressRebalance()).isTrue();

completePendingNotifyTrigger(pendingTriggers, 1);
retry(
Duration.ofMinutes(1),
() ->
assertThat(eventProcessor.getRebalanceManager().hasInProgressRebalance())
.isFalse());
verifyIsr(tb0, 1, Arrays.asList(0, 1, 2));
}

private void verifyIsr(TableBucket tb, int expectedLeader, List<Integer> expectedIsr)
throws Exception {
LeaderAndIsr leaderAndIsr =
Expand Down Expand Up @@ -1984,13 +2083,36 @@ private static List<TableBucket> allTableBuckets(
}

private static void drainPendingNotifyTriggers(
ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers) {
CompletableFuture<Void> trigger;
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers) {
ControlledNotifyTrigger trigger;
while ((trigger = pendingTriggers.poll()) != null) {
trigger.complete(null);
}
}

private static boolean hasPendingNotifyTrigger(
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers, int responseServerId) {
for (ControlledNotifyTrigger trigger : pendingTriggers) {
if (trigger.getResponseServerId() == responseServerId) {
return true;
}
}
return false;
}

private static void completePendingNotifyTrigger(
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers, int responseServerId) {
for (ControlledNotifyTrigger trigger : pendingTriggers) {
if (trigger.getResponseServerId() == responseServerId) {
assertThat(pendingTriggers.remove(trigger)).isTrue();
trigger.complete(null);
return;
}
}
throw new AssertionError(
"No pending NotifyLeaderAndIsr response for server " + responseServerId);
}

private int countInProgressRebalanceTasks(TableBucket... buckets) {
int count = 0;
for (TableBucket tb : buckets) {
Expand All @@ -2009,10 +2131,14 @@ private int countInProgressRebalanceTasks(TableBucket... buckets) {
*/
private static class ControlledNotifyGateway extends TestTabletServerGateway {
private volatile boolean controlMode = false;
private final ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers;
private final int responseServerId;
private final ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers;

ControlledNotifyGateway(ConcurrentLinkedDeque<CompletableFuture<Void>> pendingTriggers) {
ControlledNotifyGateway(
int responseServerId,
ConcurrentLinkedDeque<ControlledNotifyTrigger> pendingTriggers) {
super(false, Collections.emptySet());
this.responseServerId = responseServerId;
this.pendingTriggers = pendingTriggers;
}

Expand All @@ -2029,9 +2155,30 @@ public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
// Build the proper success response using parent's logic.
NotifyLeaderAndIsrResponse response = super.notifyLeaderAndIsr(request).join();
// Return a future that completes only when the test releases the trigger.
CompletableFuture<Void> trigger = new CompletableFuture<>();
ControlledNotifyTrigger trigger = new ControlledNotifyTrigger(responseServerId);
pendingTriggers.add(trigger);
return trigger.thenApply(v -> response);
return trigger.getFuture().thenApply(v -> response);
}
}

private static class ControlledNotifyTrigger {
private final int responseServerId;
private final CompletableFuture<Void> future = new CompletableFuture<>();

ControlledNotifyTrigger(int responseServerId) {
this.responseServerId = responseServerId;
}

int getResponseServerId() {
return responseServerId;
}

CompletableFuture<Void> getFuture() {
return future;
}

void complete(Void value) {
future.complete(value);
}
}

Expand Down
Loading