diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 537e2df63f..478fc35d73 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -1079,7 +1079,7 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( // server to acknowledge the leader change before proceeding to the next migration. for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket : notifyLeaderAndIsrResultForBuckets) { - tryToCompleteRebalanceTask(notifyLeaderAndIsrResultForBucket.getTableBucket()); + tryToCompleteRebalanceTask(notifyLeaderAndIsrResultForBucket, serverId); } } @@ -1491,7 +1491,10 @@ public void tryToExecuteRebalanceTask(RebalancePlanForBucket planForBucket) { } /** try to finish rebalance tasks after receive notify leader and isr response. */ - private void tryToCompleteRebalanceTask(TableBucket tableBucket) { + private void tryToCompleteRebalanceTask( + NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket, + int responseServerId) { + TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket(); RebalancePlanForBucket planForBucket = rebalanceManager.getRebalancePlanForBucket(tableBucket); if (planForBucket != null) { @@ -1502,21 +1505,17 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) { if (planForBucket.isLeaderChanged() && !reassignment.isBeingReassigned()) { LeaderAndIsr leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get(); int currentLeader = leaderAndIsr.leader(); - if (currentLeader == planForBucket.getNewLeader()) { + if (canCompleteLeaderOnlyRebalanceTask( + notifyLeaderAndIsrResultForBucket, + responseServerId, + planForBucket, + currentLeader)) { // leader action finish. rebalanceManager.finishRebalanceTask( tableBucket, RebalanceStatus.COMPLETED); } - } else { - boolean isReassignmentComplete = - isReassignmentComplete(tableBucket, reassignment); - if (isReassignmentComplete) { - LOG.info( - "Target replicas {} have all caught up with the leader for reassigning bucket {}", - reassignment.getTargetReplicas(), - tableBucket); - onBucketReassignment(tableBucket, reassignment, true); - } + } else if (notifyLeaderAndIsrResultForBucket.succeeded()) { + tryToCompleteReassignmentTask(tableBucket, reassignment); } } catch (Exception e) { LOG.error( @@ -1526,6 +1525,49 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) { } } + private void tryToCompleteRebalanceTaskOnLeaderAndIsrChange(TableBucket tableBucket) { + RebalancePlanForBucket planForBucket = + rebalanceManager.getRebalancePlanForBucket(tableBucket); + if (planForBucket != null) { + ReplicaReassignment reassignment = + ReplicaReassignment.build( + planForBucket.getOriginReplicas(), planForBucket.getNewReplicas()); + if (planForBucket.isLeaderChanged() && !reassignment.isBeingReassigned()) { + return; + } + try { + tryToCompleteReassignmentTask(tableBucket, reassignment); + } catch (Exception e) { + LOG.error( + "Failed to complete the reassignment for table bucket {}", tableBucket, e); + rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.FAILED); + } + } + } + + private void tryToCompleteReassignmentTask( + TableBucket tableBucket, ReplicaReassignment reassignment) throws Exception { + boolean isReassignmentComplete = isReassignmentComplete(tableBucket, reassignment); + if (isReassignmentComplete) { + LOG.info( + "Target replicas {} have all caught up with the leader for reassigning bucket {}", + reassignment.getTargetReplicas(), + tableBucket); + onBucketReassignment(tableBucket, reassignment, true); + } + } + + @VisibleForTesting + static boolean canCompleteLeaderOnlyRebalanceTask( + NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket, + int responseServerId, + RebalancePlanForBucket planForBucket, + int currentLeader) { + return notifyLeaderAndIsrResultForBucket.succeeded() + && responseServerId == planForBucket.getNewLeader() + && currentLeader == planForBucket.getNewLeader(); + } + /** * Reassigning replicas for a tableBucket goes through a few steps listed in the code. * @@ -1817,7 +1859,7 @@ private List tryProcessAdjustIsr( newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr); // First, try to judge whether the bucket is in rebalance task when isr change. - newLeaderAndIsrList.keySet().forEach(this::tryToCompleteRebalanceTask); + newLeaderAndIsrList.keySet().forEach(this::tryToCompleteRebalanceTaskOnLeaderAndIsrChange); // TODO update metadata for all alive tablet servers. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 001a90f74e..e1989c002f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -46,7 +46,9 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; +import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; @@ -59,6 +61,7 @@ import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; @@ -1483,13 +1486,13 @@ void testLeaderOnlyRebalanceExecutesSequentially() throws Exception { // Set up controlled gateways that capture NotifyLeaderAndIsr calls. // Gateways start in pass-through mode for table creation, then switch // to controlled mode to verify sequential leader migration. - ConcurrentLinkedDeque> pendingTriggers = + ConcurrentLinkedDeque pendingTriggers = new ConcurrentLinkedDeque<>(); int[] servers = zookeeperClient.getSortedTabletServerList(); Map gateways = new HashMap<>(); ControlledNotifyGateway[] controlledGateways = new ControlledNotifyGateway[servers.length]; for (int i = 0; i < servers.length; i++) { - ControlledNotifyGateway gw = new ControlledNotifyGateway(pendingTriggers); + ControlledNotifyGateway gw = new ControlledNotifyGateway(servers[i], pendingTriggers); gateways.put(servers[i], gw); controlledGateways[i] = gw; } @@ -1585,6 +1588,102 @@ void testLeaderOnlyRebalanceExecutesSequentially() throws Exception { verifyIsr(tb2, 1, Arrays.asList(0, 1, 2)); } + @Test + void testLeaderOnlyRebalanceCompletionRequiresSuccessfulResponseFromNewLeader() { + TableBucket tableBucket = new TableBucket(1L, 0); + RebalancePlanForBucket planForBucket = + new RebalancePlanForBucket( + tableBucket, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 0, 2)); + NotifyLeaderAndIsrResultForBucket successResult = + new NotifyLeaderAndIsrResultForBucket(tableBucket); + NotifyLeaderAndIsrResultForBucket failedResult = + new NotifyLeaderAndIsrResultForBucket( + tableBucket, new ApiError(Errors.UNKNOWN_SERVER_ERROR, "failed")); + + assertThat( + CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask( + successResult, 1, planForBucket, 1)) + .isTrue(); + assertThat( + CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask( + successResult, 0, planForBucket, 1)) + .isFalse(); + assertThat( + CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask( + failedResult, 1, planForBucket, 1)) + .isFalse(); + assertThat( + CoordinatorEventProcessor.canCompleteLeaderOnlyRebalanceTask( + successResult, 1, planForBucket, 0)) + .isFalse(); + } + + @Test + void testLeaderOnlyRebalanceIgnoresSuccessResponseFromOldLeader() throws Exception { + ConcurrentLinkedDeque pendingTriggers = + new ConcurrentLinkedDeque<>(); + int[] servers = zookeeperClient.getSortedTabletServerList(); + Map gateways = new HashMap<>(); + ControlledNotifyGateway[] controlledGateways = new ControlledNotifyGateway[servers.length]; + for (int i = 0; i < servers.length; i++) { + ControlledNotifyGateway gw = new ControlledNotifyGateway(servers[i], pendingTriggers); + gateways.put(servers[i], gw); + controlledGateways[i] = gw; + } + testCoordinatorChannelManager.setGateways(gateways); + + TablePath t1 = TablePath.of(defaultDatabase, "test_leader_rebalance_wait_new_leader"); + Map bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(0, 1, 2)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + long t1Id = + metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false); + + TableBucket tb0 = new TableBucket(t1Id, 0); + + verifyIsr(tb0, 0, Arrays.asList(0, 1, 2)); + + for (ControlledNotifyGateway gw : controlledGateways) { + gw.enableControlMode(); + } + pendingTriggers.clear(); + + Map rebalancePlan = new HashMap<>(); + rebalancePlan.put( + tb0, + new RebalancePlanForBucket( + tb0, 0, 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 0, 2))); + + eventProcessor + .getRebalanceManager() + .registerRebalance( + "rebalance-wait-new-leader-response", + rebalancePlan, + RebalanceStatus.NOT_STARTED); + + retry( + Duration.ofMinutes(1), + () -> assertThat(hasPendingNotifyTrigger(pendingTriggers, 0)).isTrue()); + retry( + Duration.ofMinutes(1), + () -> assertThat(hasPendingNotifyTrigger(pendingTriggers, 1)).isTrue()); + assertThat(countInProgressRebalanceTasks(tb0)).isEqualTo(1); + + completePendingNotifyTrigger(pendingTriggers, 0); + fromCtx(ctx -> null); + + assertThat(countInProgressRebalanceTasks(tb0)).isEqualTo(1); + assertThat(eventProcessor.getRebalanceManager().hasInProgressRebalance()).isTrue(); + + completePendingNotifyTrigger(pendingTriggers, 1); + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventProcessor.getRebalanceManager().hasInProgressRebalance()) + .isFalse()); + verifyIsr(tb0, 1, Arrays.asList(0, 1, 2)); + } + private void verifyIsr(TableBucket tb, int expectedLeader, List expectedIsr) throws Exception { LeaderAndIsr leaderAndIsr = @@ -1984,13 +2083,36 @@ private static List allTableBuckets( } private static void drainPendingNotifyTriggers( - ConcurrentLinkedDeque> pendingTriggers) { - CompletableFuture trigger; + ConcurrentLinkedDeque pendingTriggers) { + ControlledNotifyTrigger trigger; while ((trigger = pendingTriggers.poll()) != null) { trigger.complete(null); } } + private static boolean hasPendingNotifyTrigger( + ConcurrentLinkedDeque pendingTriggers, int responseServerId) { + for (ControlledNotifyTrigger trigger : pendingTriggers) { + if (trigger.getResponseServerId() == responseServerId) { + return true; + } + } + return false; + } + + private static void completePendingNotifyTrigger( + ConcurrentLinkedDeque pendingTriggers, int responseServerId) { + for (ControlledNotifyTrigger trigger : pendingTriggers) { + if (trigger.getResponseServerId() == responseServerId) { + assertThat(pendingTriggers.remove(trigger)).isTrue(); + trigger.complete(null); + return; + } + } + throw new AssertionError( + "No pending NotifyLeaderAndIsr response for server " + responseServerId); + } + private int countInProgressRebalanceTasks(TableBucket... buckets) { int count = 0; for (TableBucket tb : buckets) { @@ -2009,10 +2131,14 @@ private int countInProgressRebalanceTasks(TableBucket... buckets) { */ private static class ControlledNotifyGateway extends TestTabletServerGateway { private volatile boolean controlMode = false; - private final ConcurrentLinkedDeque> pendingTriggers; + private final int responseServerId; + private final ConcurrentLinkedDeque pendingTriggers; - ControlledNotifyGateway(ConcurrentLinkedDeque> pendingTriggers) { + ControlledNotifyGateway( + int responseServerId, + ConcurrentLinkedDeque pendingTriggers) { super(false, Collections.emptySet()); + this.responseServerId = responseServerId; this.pendingTriggers = pendingTriggers; } @@ -2029,9 +2155,30 @@ public CompletableFuture notifyLeaderAndIsr( // Build the proper success response using parent's logic. NotifyLeaderAndIsrResponse response = super.notifyLeaderAndIsr(request).join(); // Return a future that completes only when the test releases the trigger. - CompletableFuture trigger = new CompletableFuture<>(); + ControlledNotifyTrigger trigger = new ControlledNotifyTrigger(responseServerId); pendingTriggers.add(trigger); - return trigger.thenApply(v -> response); + return trigger.getFuture().thenApply(v -> response); + } + } + + private static class ControlledNotifyTrigger { + private final int responseServerId; + private final CompletableFuture future = new CompletableFuture<>(); + + ControlledNotifyTrigger(int responseServerId) { + this.responseServerId = responseServerId; + } + + int getResponseServerId() { + return responseServerId; + } + + CompletableFuture getFuture() { + return future; + } + + void complete(Void value) { + future.complete(value); } }