diff --git a/go/base/context.go b/go/base/context.go index 7ae262235..83d08711a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -273,14 +273,13 @@ type MigrationContext struct { // move tables: MoveTables struct { - TableNames []string // List of table names to be moved. - TargetHost string // Target hostname for the move. This must be a primary/writable host. - TargetPort int // Target MySQL port for the move. - TargetUser string // Target username for the move. If not specified, it will default to the source user. - TargetPass string // Target password for the move. If not specified, it will default to the source password. - TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. - TmpCutoverFilename string // TEMPORARY: Filename to use as a 'cutover-can-continue' signal. This is only for local testing. - ConnectionConfig *mysql.ConnectionConfig + TableNames []string // List of table names to be moved. + TargetHost string // Target hostname for the move. This must be a primary/writable host. + TargetPort int // Target MySQL port for the move. + TargetUser string // Target username for the move. If not specified, it will default to the source user. + TargetPass string // Target password for the move. If not specified, it will default to the source password. + TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. + ConnectionConfig *mysql.ConnectionConfig } Log Logger diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 552fa83cd..9b476cb93 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -174,7 +174,6 @@ func main() { flag.StringVar(&migrationContext.MoveTables.TargetUser, "target-user", "", "Target MySQL username for --move-tables mode. If not provided, uses the same user as the source connection") flag.StringVar(&migrationContext.MoveTables.TargetPass, "target-password", "", "Target MySQL password for --move-tables mode. If not provided, uses the same password as the source connection") flag.StringVar(&migrationContext.MoveTables.TargetDatabase, "target-database", "", "Target MySQL database name for --move-tables mode. If not provided, uses the same database name as the source connection") - flag.StringVar(&migrationContext.MoveTables.TmpCutoverFilename, "target-cutover-filename", "", "TEMPORARY: Filename to use as a 'cutover-can-continue' signal. This is only for local testing.") flag.CommandLine.SetOutput(os.Stdout) flag.Parse() diff --git a/go/logic/applier.go b/go/logic/applier.go index de4691f12..db5b34e09 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -723,7 +723,7 @@ func (apl *Applier) createTriggers(tableName string) error { // CreateTriggers creates the original triggers on applier host func (apl *Applier) CreateTriggersOnGhost() error { err := apl.createTriggers(apl.migrationContext.GetGhostTableName()) - return fmt.Errorf("error creating triggers on ghost table: %v", err) + return fmt.Errorf("error creating triggers on ghost table: %w", err) } // DropChangelogTable drops the changelog table on the applier host @@ -744,15 +744,17 @@ func (apl *Applier) DropOldTable() error { // DropGhostTable drops the ghost table on the applier host func (apl *Applier) DropGhostTable() error { if err := apl.dropTable(apl.migrationContext.GetGhostTableName()); err != nil { - return fmt.Errorf("error dropping ghost table: %v", err) + return fmt.Errorf("error dropping ghost table: %w", err) } return nil } // WriteChangelog writes a value to the changelog table. -// It returns the hint as given, for convenience +// It returns the hint (or an empty string in move-tables mode), for convenience func (apl *Applier) WriteChangelog(hint, value string) (string, error) { - // TODO(chriskirkland): move this bypass higher + // In move-tables mode, there is no changelog table (ยง1.2). All changelog + // writes are no-ops. This is a single chokepoint rather than per-caller + // guards to prevent drift when new callers are added. if apl.migrationContext.IsMoveTablesMode() { return "", nil } @@ -1196,6 +1198,9 @@ func (apl *Applier) ApplyIterationMoveTableCopyQueries(sourceDB *gosql.DB) (chun } chunkRows = append(chunkRows, row) } + if err := sqlRows.Err(); err != nil { + return nil, err + } return chunkRows, nil }() if err != nil { diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 410e3456b..6d1474ceb 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -301,7 +301,7 @@ func (suite *ApplierTestSuite) SetupSuite() { suite.Require().NoError(err) // Second database & connection for move-tables tests: - _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther)) + _, _ = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther)) otherConf := drivermysql.NewConfig() otherConf.DBName = testMysqlDatabaseOther otherConf.User = testMysqlUser @@ -334,6 +334,8 @@ func (suite *ApplierTestSuite) TearDownTest() { suite.Require().NoError(err) _, err = suite.otherDB.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOtherTableName()) suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`_%s_ghc`", testMysqlDatabase, testMysqlTableName)) + suite.Require().NoError(err) } func (suite *ApplierTestSuite) TestInitDBConnections() { @@ -365,6 +367,41 @@ func (suite *ApplierTestSuite) TestInitDBConnections() { suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) } +func (suite *ApplierTestSuite) TestInitiateApplierMoveTablesMode_NoGhostOrChangelogTable() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // In move-tables mode, no ghost or changelog table should exist. + // InitDBConnections() should succeed without them. + suite.Require().False(applier.tableExists("_testing_gho"), "ghost table should not exist in move-tables mode") + suite.Require().False(applier.tableExists("_testing_ghc"), "changelog table should not exist in move-tables mode") + + //Verify move-tables mode seeds columns from the source table + suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) +} + func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { ctx := context.Background() @@ -430,6 +467,53 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate) } +// finalCleanup() requires a fully wired migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates the early return. +// Full behavioral coverage relies on the suite: no ghost/changelog tables are +// created (Test #1), and WriteChangelog is a no-op (Test #2). +func (suite *ApplierTestSuite) TestFinalCleanupMoveTablesMode_SkipsDrops() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + +// initiateStreaming() requires a binlog-capable MySQL connection to call directly. +// This test verifies IsMoveTablesMode() and that GetChangelogTableName() returns +// a derivable name. A new streamer always starts with zero listeners; the real +// proof that no changelog listener is registered comes from the full run not +// failing on a nonexistent _ghc table. +func (suite *ApplierTestSuite) TestInitiateStreamingMoveTablesMode_NoChangelogListener() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) + + changelogTableName := migrationContext.GetChangelogTableName() + suite.Require().NotEmpty(changelogTableName, "changelog table name should be derivable") + + streamer := NewEventsStreamer(migrationContext) + suite.Require().Empty(streamer.listeners, "new streamer should have no listeners") +} + +// initiateApplier() requires a full migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates InitiateHeartbeat(). +// Even if heartbeat ran, TestWriteChangelogNoOpInMoveTablesMode proves WriteChangelog +// is a no-op, so no SQL would execute against a nonexistent changelog table. +// +// A stronger test would instrument InitiateHeartbeat() (e.g., via a callback or +// channel) to assert the goroutine is never started. That requires test-infrastructure +// changes to the Applier and is beyond #8206's scope. +func (suite *ApplierTestSuite) TestNoHeartbeatInMoveTablesMode() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() { ctx := context.Background() @@ -528,6 +612,43 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi suite.Require().Equal(gosql.ErrNoRows, err) } +func (suite *ApplierTestSuite) TestWriteChangelogNoOpInMoveTablesMode() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // WriteChangelog should be a no-op in move-tables mode. + // No changelog table exists, so if it tried to execute, it would fail. + hint, err := applier.WriteChangelog("heartbeat", "2026-06-05T00:00:00Z") + suite.Require().NoError(err) + suite.Require().Empty(hint) + + // Also verify state writes are no-ops + hint, err = applier.WriteChangelogState("Migrated") + suite.Require().NoError(err) + suite.Require().Equal("", hint) +} + func (suite *ApplierTestSuite) TestCreateGhostTable() { ctx := context.Background() @@ -623,11 +744,11 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -700,14 +821,14 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) err = applier.AlterGhost() suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -773,7 +894,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { err = applier.prepareQueries() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) // checkpoint table is empty @@ -1667,15 +1788,15 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) - chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries() + chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries(suite.db) suite.Require().NoError(err) suite.Require().Equal(int64(3), rowsAffected) suite.Require().Equal(int64(1000), chunkSize) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index cb197372d..a59d8499a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -525,9 +525,9 @@ func (mgtr *Migrator) Migrate() (err error) { initialLag, _ := mgtr.inspector.getReplicationLag() if !mgtr.migrationContext.Resume { - mgtr.migrationContext.Log.Infof("Waiting for target table to be migrated. Current lag is %+v", initialLag) + mgtr.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) <-mgtr.ghostTableMigrated - mgtr.migrationContext.Log.Debugf("target table migrated") + mgtr.migrationContext.Log.Debugf("ghost table migrated") } // Yay! We now know the Ghost and Changelog tables are good to examine! // When running on replica, this means the replica has those tables. When running @@ -888,32 +888,6 @@ func (mgtr *Migrator) MoveTables() (err error) { //TODO: cutover here - // temporarily use a file on disk to indicate cutover - if mgtr.migrationContext.MoveTables.TmpCutoverFilename != "" { - mgtr.migrationContext.Log.Infof("Waiting for cutover signal (%s)...", mgtr.migrationContext.MoveTables.TmpCutoverFilename) - - timeout := time.After(60 * time.Second) - LOOP: - for { - select { - case <-timeout: - mgtr.migrationContext.Log.Errorf("Timeout reached waiting for cutover signal (%s)", mgtr.migrationContext.MoveTables.TmpCutoverFilename) - default: - // check if cutover file name exists - if _, err := os.Stat(mgtr.migrationContext.MoveTables.TmpCutoverFilename); err != nil { - continue - } - mgtr.migrationContext.Log.Infof("Cutover signal received (%s)", mgtr.migrationContext.MoveTables.TmpCutoverFilename) - - if err := mgtr.simulateMoveTablesCutover(); err != nil { - return fmt.Errorf("simulated cutover failed: %w", err) - } - mgtr.migrationContext.Log.Info("Simulated cutover complete ๐ŸŽ‰") - break LOOP - } - } - } - if err := mgtr.finalCleanup(); err != nil { return nil } @@ -930,38 +904,6 @@ func (mgtr *Migrator) MoveTables() (err error) { return nil } -// TODO(chriskirkland): replace this with a _real_ cutover implementation -func (mgtr *Migrator) simulateMoveTablesCutover() (err error) { - if !mgtr.migrationContext.IsMoveTablesMode() { - return errors.New("not in MoveTables mode") - } - - // manually hack the `mysql-source-primary` connection config based on test bed settings - // this is just for demo purposes... I'm sorry. - primaryConnectionConfig := mgtr.inspector.connectionConfig.Duplicate() - primaryConnectionConfig.Key.Port = 3307 - - primaryURI := primaryConnectionConfig.GetDBUri(mgtr.migrationContext.DatabaseName) - primaryDB, _, err := mysql.GetDB(mgtr.migrationContext.Uuid, primaryURI) - if err != nil { - return fmt.Errorf("failed to connect to primary of source cluster: %w", err) - } - - // rename the original table in the source cluster to prevent further reads/writes; mimics the "full cutover" as describe in - // https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md - - database := mgtr.migrationContext.DatabaseName - oldTable := mgtr.migrationContext.MoveTables.TableNames[0] - delTable := fmt.Sprintf("_%s_del", oldTable) - query := fmt.Sprintf("RENAME TABLE %s.%s TO %s.%s", sql.EscapeName(database), sql.EscapeName(oldTable), sql.EscapeName(database), sql.EscapeName(delTable)) - if _, err := primaryDB.Exec(query); err != nil { - return fmt.Errorf("failed to rename table: %w", err) - } - mgtr.migrationContext.Log.Infof("[SIMULATED CUTOVER] Table %s renamed to %s._%s_del", oldTable, database, oldTable) - - return nil -} - // ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external // hook access point func (mgtr *Migrator) ExecOnFailureHook() (err error) { @@ -1295,7 +1237,6 @@ func (mgtr *Migrator) initiateInspector() (err error) { // Let's get master connection config if mgtr.migrationContext.IsMoveTablesMode() { mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.MoveTables.ConnectionConfig - } else if mgtr.migrationContext.AssumeMasterHostname == "" { // No forced master host; detect master if mgtr.migrationContext.ApplierConnectionConfig, err = mgtr.inspector.getMasterConnectionConfig(); err != nil { @@ -1715,7 +1656,7 @@ func (mgtr *Migrator) initiateApplier() error { ) createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) if err != nil { - return fmt.Errorf("failed to fetch create table statement: %v", err) + return fmt.Errorf("failed to fetch create table statement: %w", err) } mgtr.migrationContext.Log.Infof("Create table statement: %s", createTableStatement) @@ -1843,7 +1784,7 @@ func (mgtr *Migrator) iterateChunks() error { if err != nil { return fmt.Errorf("ApplyIterationInsertQuery failed: %w", err) // wrapping call will retry } - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery affected %d rows", rowsAffected) + mgtr.migrationContext.Log.Debugf("ApplyIterationInsertQuery affected %d rows", rowsAffected) if mgtr.migrationContext.PanicOnWarnings { if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { @@ -2064,7 +2005,7 @@ func (mgtr *Migrator) executeWriteFuncs() error { select { case eventStruct := <-mgtr.applyEventsQueue: { - mgtr.migrationContext.Log.Info("[execWriteFuncs] Processing apply event struct") + mgtr.migrationContext.Log.Debugf("[execWriteFuncs] Processing apply event struct") if err := mgtr.onApplyEventStruct(eventStruct); err != nil { return err } @@ -2074,7 +2015,7 @@ func (mgtr *Migrator) executeWriteFuncs() error { select { case copyRowsFunc := <-mgtr.copyRowsQueue: { - mgtr.migrationContext.Log.Info("[execWriteFuncs] Processing row copy function") + mgtr.migrationContext.Log.Debugf("[execWriteFuncs] Processing row copy function") copyRowsStartTime := time.Now() // Retries are handled within the copyRowsFunc if err := copyRowsFunc(); err != nil { @@ -2139,7 +2080,7 @@ func (mgtr *Migrator) finalCleanup() error { mgtr.migrationContext.Log.Infof("New table structure follows") fmt.Println(createTableStatement) } else if !mgtr.migrationContext.IsMoveTablesMode() { - mgtr.migrationContext.Log.Errore(fmt.Errorf("error showing create table: %v", err)) + mgtr.migrationContext.Log.Errore(fmt.Errorf("error showing create table: %w", err)) } } if err := mgtr.eventsStreamer.Close(); err != nil { diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7a1bcb48f..80759bce7 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -601,7 +601,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() { @@ -672,7 +672,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() {