-
Notifications
You must be signed in to change notification settings - Fork 290
gcs: keep bridge alive across live-migration transport swap #2771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ import ( | |
| "io" | ||
| "net" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
|
|
@@ -48,16 +49,20 @@ type bridge struct { | |
| // Timeout is the time a synchronous RPC must respond within. | ||
| Timeout time.Duration | ||
|
|
||
| mu sync.Mutex | ||
| nextID int64 | ||
| rpcs map[int64]*rpc | ||
| conn io.ReadWriteCloser | ||
| rpcCh chan *rpc | ||
| notify notifyFunc | ||
| closed bool | ||
| log *logrus.Entry | ||
| brdgErr error | ||
| waitCh chan struct{} | ||
| mu sync.Mutex | ||
| nextID int64 | ||
| rpcs map[int64]*rpc | ||
| // conn is the transport carrying messages to and from the guest. | ||
| // Held atomically because the send path reads it while a migration swaps it. | ||
| conn atomic.Value | ||
| rpcCh chan *rpc | ||
| notify notifyFunc | ||
| closed bool | ||
| log *logrus.Entry | ||
| brdgErr error | ||
| waitCh chan struct{} | ||
| migrating atomic.Bool | ||
| resumeCh chan struct{} | ||
| } | ||
|
|
||
| var ErrBridgeClosed = fmt.Errorf("bridge closed: %w", net.ErrClosed) | ||
|
|
@@ -73,15 +78,17 @@ type notifyFunc func(*prot.ContainerNotification) error | |
| // notification message arrives from the guest. It logs transport errors and | ||
| // traces using `log`. | ||
| func newBridge(conn io.ReadWriteCloser, notify notifyFunc, log *logrus.Entry) *bridge { | ||
| return &bridge{ | ||
| conn: conn, | ||
| rpcs: make(map[int64]*rpc), | ||
| rpcCh: make(chan *rpc), | ||
| waitCh: make(chan struct{}), | ||
| notify: notify, | ||
| log: log, | ||
| Timeout: bridgeFailureTimeout, | ||
| brdg := &bridge{ | ||
| rpcs: make(map[int64]*rpc), | ||
| rpcCh: make(chan *rpc), | ||
| waitCh: make(chan struct{}), | ||
| resumeCh: make(chan struct{}, 1), | ||
| notify: notify, | ||
| log: log, | ||
| Timeout: bridgeFailureTimeout, | ||
| } | ||
| brdg.conn.Store(conn) | ||
| return brdg | ||
| } | ||
|
|
||
| // Start begins the bridge send and receive goroutines. | ||
|
|
@@ -111,7 +118,9 @@ func (brdg *bridge) kill(err error) { | |
| } else { | ||
| brdg.log.Debug("bridge terminating") | ||
| } | ||
| brdg.conn.Close() | ||
| if c, ok := brdg.conn.Load().(io.ReadWriteCloser); ok { | ||
| _ = c.Close() | ||
| } | ||
| close(brdg.waitCh) | ||
| } | ||
|
|
||
|
|
@@ -129,6 +138,56 @@ func (brdg *bridge) Wait() error { | |
| return brdg.brdgErr | ||
| } | ||
|
|
||
| // SetMigrating toggles tolerance of transport-level failures around a | ||
| // live-migration blackout. Explicit [bridge.Close] and the RPC timeout | ||
| // kill still tear the bridge down. | ||
| func (brdg *bridge) SetMigrating(migrating bool) { | ||
| brdg.migrating.Store(migrating) | ||
| } | ||
|
|
||
| // ResumeOnConn swaps the bridge transport onto conn and wakes the recv | ||
| // loop without dropping outstanding RPCs. | ||
| // | ||
| // Resume is only legal inside a migration window, so it is gated on the | ||
| // migrating flag. The flag is set before the blackout begins; during the | ||
| // blackout the old transport is gone, so a send may still be dequeued and | ||
| // attempts to write that simply fails and is tolerated. The transport is held | ||
| // atomically, so that send's read of the conn and this swap never race. The migration | ||
| // flag stays set across this swap and is cleared only after the new transport | ||
| // is in place, so RPCs resume only once a healthy conn is installed. | ||
| func (brdg *bridge) ResumeOnConn(newConn io.ReadWriteCloser) (err error) { | ||
| brdg.mu.Lock() | ||
| defer brdg.mu.Unlock() | ||
|
|
||
| defer func() { | ||
| if err != nil { | ||
| // Not adopting newConn; close it so the accepted socket does not leak. | ||
| _ = newConn.Close() | ||
| } | ||
| }() | ||
|
|
||
| if brdg.closed { | ||
| return ErrBridgeClosed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a risk of socket/handle leak if bridge closes between accept and swap, since we are not closing the passed connection before returning?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching that. Fixed it in last commit. |
||
| } | ||
|
|
||
| if !brdg.migrating.Load() { | ||
| return fmt.Errorf("bridge resume requires an active migration") | ||
| } | ||
|
|
||
| if c, ok := brdg.conn.Load().(io.ReadWriteCloser); ok { | ||
| // Force any in-progress recvLoop off the stale conn so it can restart on the new one. | ||
| _ = c.Close() | ||
| } | ||
|
|
||
| brdg.conn.Store(newConn) | ||
|
|
||
| select { | ||
| case brdg.resumeCh <- struct{}{}: | ||
| default: | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // AsyncRPC sends an RPC request to the guest but does not wait for a response. | ||
| // If the message cannot be sent before the context is done, then an error is | ||
| // returned. | ||
|
|
@@ -239,7 +298,23 @@ func (brdg *bridge) RPC(ctx context.Context, proc prot.RPCProc, req requestMessa | |
| } | ||
|
|
||
| func (brdg *bridge) recvLoopRoutine() { | ||
| brdg.kill(brdg.recvLoop()) | ||
| for { | ||
| err := brdg.recvLoop() | ||
|
|
||
| if !brdg.migrating.Load() { | ||
| brdg.kill(err) | ||
| break | ||
| } | ||
| // Park until [bridge.ResumeOnConn] swaps the conn or [bridge.Close] fires. | ||
| brdg.log.WithError(err).Info("bridge transport down during migration; awaiting resume or close") | ||
| select { | ||
| case <-brdg.resumeCh: | ||
| continue | ||
| case <-brdg.waitCh: | ||
| } | ||
| break | ||
| } | ||
|
|
||
| // Fail any remaining RPCs. | ||
| brdg.mu.Lock() | ||
| rpcs := brdg.rpcs | ||
|
|
@@ -286,7 +361,7 @@ func isLocalDisconnectError(err error) bool { | |
| } | ||
|
|
||
| func (brdg *bridge) recvLoop() error { | ||
| br := bufio.NewReader(brdg.conn) | ||
| br := bufio.NewReader(brdg.conn.Load().(io.ReadWriteCloser)) | ||
| for { | ||
| id, typ, b, err := readMessage(br) | ||
| if err != nil { | ||
|
|
@@ -365,6 +440,11 @@ func (brdg *bridge) sendLoop() { | |
| case call := <-brdg.rpcCh: | ||
| err := brdg.sendRPC(&buf, enc, call) | ||
| if err != nil { | ||
| if brdg.migrating.Load() { | ||
| // Blackout drop: sendRPC already failed this call; hold the bridge open. | ||
| brdg.log.WithError(err).Debug("bridge send failed during migration; bridge held open") | ||
| continue | ||
| } | ||
| brdg.kill(err) | ||
| return | ||
| } | ||
|
|
@@ -412,7 +492,7 @@ func (brdg *bridge) writeMessage(buf *bytes.Buffer, enc *json.Encoder, typ prot. | |
| } | ||
|
|
||
| // Write the message. | ||
| _, err = buf.WriteTo(brdg.conn) | ||
| _, err = buf.WriteTo(brdg.conn.Load().(io.ReadWriteCloser)) | ||
| if err != nil { | ||
| return fmt.Errorf("bridge write: %w", err) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine but I dont get why its necessary. Why would we want transport level tolerance only when migrating? I get that this is a local loopback connection so in practice it likely never disconnects but doesnt it seem reasonable to just implement the bridge such that on disconnect its auto paused, and on reconnect it continues? No policy needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don’t want to do that under normal circumstances. This is because our shim depends on the invariant that if the bridge collapses then it’s a fatal error and all the Waits are released and thereafter, the workflow goes into teardown mode.
Just during migration, we avoid the same, so that in case of restore on rollback, we can resume over a fresh socket connection.