diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 2b7b33ee..4e255312 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -79,13 +79,13 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **buildsignal** | Build | speculate | Feed CI result back into speculation | | **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | | **conclude** | BatchID | — | Map terminal batch state to request state | -| **log** | RequestLog | — | Append-only sink for request log events | +| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage | ## DLQ reconciliation -Every primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". +Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". -The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel, log) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. diff --git a/example/submitqueue/docker-compose.yml b/example/submitqueue/docker-compose.yml index dc49e86f..1b3ed04b 100644 --- a/example/submitqueue/docker-compose.yml +++ b/example/submitqueue/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 86baf961..05c868bc 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,12 +11,17 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/errs", + "//core/errs/generic", + "//core/errs/mysql", "//extension/counter/mysql", + "//extension/messagequeue", "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", + "//submitqueue/gateway/controller/log", "//submitqueue/gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/submitqueue/gateway/server/docker-compose.yml b/example/submitqueue/gateway/server/docker-compose.yml index 2b018f16..08a2f24d 100644 --- a/example/submitqueue/gateway/server/docker-compose.yml +++ b/example/submitqueue/gateway/server/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index b2c814d8..c23a47c5 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,12 +28,17 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" + logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -174,12 +179,35 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Build a publish-only topic registry: gateway only feeds the start of the - // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel). - // No subscription is configured because the gateway never consumes from the queue. + // Subscriber name for the log-topic consumer. It must be unique per running + // instance: SubscriberName identifies a subscriber for partition leases, so + // two gateway processes on the same host (sharing HOSTNAME) would otherwise + // contend for the same lease. Append the PID to keep co-located instances + // distinct; the PID is stable for the life of the process. Offset tracking + // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name. + // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs). + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("gateway-%d", time.Now().Unix()) + } + subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid()) + + // Build the topic registry. The gateway publishes to the start of the + // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — + // both publish-only. It additionally consumes the log topic (TopicKeyLog): + // the gateway is the sole writer of the request log, persisting entries that + // the orchestrator publishes there. registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, + { + Key: consumer.TopicKeyLog, + Name: "log", + Queue: mysqlQueue, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "gateway-log", + ), + }, }) if err != nil { return fmt.Errorf("failed to create topic registry: %w", err) @@ -201,7 +229,8 @@ func run() error { // Initialize storage from the shared app database connection. The land // controller writes to this store directly; cancel/status use the request - // log store directly. + // log store directly. The log consumer (registered below) is the sole + // persister of request log entries published by the orchestrator. store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) @@ -236,6 +265,31 @@ func run() error { // Register reflection service for debugging with grpcurl reflection.Register(grpcServer) + // Create the queue consumer and register the log controller. The gateway is + // the sole persister of the request log: the orchestrator publishes entries + // to the log topic and this consumer writes them to storage. + logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") + if err := logConsumer.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + + if err := logConsumer.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. + return fmt.Errorf("failed to start log consumer: %w", err) + } + logger.Info("log consumer started") + // Listen on configurable port port := os.Getenv("PORT") if port == "" { @@ -257,6 +311,8 @@ func run() error { // Wait for interrupt signal or server critical error // If interruption is signaled, gracefully stop the server + // If the server exits with an error, cancel the context to signal the consumer + // After this, stop the consumer // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { @@ -273,10 +329,27 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down gateway server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumer + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop the consumer with a 30s timeout; by this time the context should be + // cancelled and the processing threads may already be exiting; recollect them. + errStop := logConsumer.Stop(30000) + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumer: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error. The server + // error is the primary/root failure, so it leads; the consumer-stop error + // is secondary cleanup. + err = errors.Join(serverErr, errStop) } return err diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 4ae928dd..d01cdb12 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -49,7 +49,6 @@ go_library( "//submitqueue/orchestrator/controller/cancel", "//submitqueue/orchestrator/controller/conclude", "//submitqueue/orchestrator/controller/dlq", - "//submitqueue/orchestrator/controller/log", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 7412843d..fc5eb294 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -68,7 +68,6 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" - logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" @@ -382,7 +381,6 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe {consumer.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, {consumer.TopicKeyMerge, "merge", "orchestrator-merge"}, {consumer.TopicKeyConclude, "conclude", "orchestrator-conclude"}, - {consumer.TopicKeyLog, "log", "orchestrator-log"}, } configs := make([]consumer.TopicConfig, 0, 2*len(primaryTopics)) @@ -419,6 +417,16 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe }) } + // Publish-only: the orchestrator emits request log entries to the log + // topic but never persists them. The gateway is the sole consumer that + // writes the request log to storage, so the orchestrator registers no + // consuming subscription (and therefore no log DLQ) for this topic. + configs = append(configs, consumer.TopicConfig{ + Key: consumer.TopicKeyLog, + Name: "log", + Queue: q, + }) + return consumer.NewTopicRegistry(configs) } @@ -651,26 +659,13 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, } count++ - logController := logctrl.NewController( - logger, - scope, - store, - consumer.TopicKeyLog, - "orchestrator-log", - ) - if err := c.Register(logController); err != nil { - return count, fmt.Errorf("failed to register log controller: %w", err) - } - count++ - return count, nil } // registerDLQControllers creates one DLQ reconciler per primary stage and // registers them with the DLQ consumer. Each reconciler drives the affected // request or batch into a terminal Error/Failed state so the gateway stops -// reporting it as stuck-in-progress. The log DLQ is a metric-only no-op (log -// entries are observability, not pipeline state). +// reporting it as stuck-in-progress. func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage) (int, error) { dlqScope := scope.SubScope("dlq") dlqRegs := []struct { @@ -687,7 +682,6 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq")}, {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyConclude), "orchestrator-conclude-dlq")}, - {"log_dlq", dlq.NewDLQLogController(logger, dlqScope, dlq.TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq")}, } var count int for _, reg := range dlqRegs { diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index d739f424..5903f009 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -1 +1,25 @@ -SubmitQueue Gateway +# SubmitQueue Gateway + +The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`, +`Status`, and `Ping` calls, validates them at the edge, and hands work off to the +orchestrator pipeline asynchronously via the message queue. + +## Request log ownership + +The gateway is the **sole owner of the request log** — the only service that +both writes and reads it. No other service persists or reads request log +entries: + +- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on + `Cancel`), the gateway writes directly to storage so the entry is visible the + moment the RPC returns. +- For statuses produced downstream, the orchestrator only *publishes* entries to + the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a + consumer that drains the `log` topic and persists each entry to storage. + +Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request +log directly from storage. The orchestrator only *publishes* log entries and +never touches the request log store. + +This keeps a single service responsible for the request log while letting the +orchestrator remain free of storage writes for it. diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel similarity index 90% rename from submitqueue/orchestrator/controller/log/BUILD.bazel rename to submitqueue/gateway/controller/log/BUILD.bazel index f700931c..60647c61 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "log", srcs = ["log.go"], - importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log", + importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log", visibility = ["//visibility:public"], deps = [ "//core/metrics", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/gateway/controller/log/log.go similarity index 92% rename from submitqueue/orchestrator/controller/log/log.go rename to submitqueue/gateway/controller/log/log.go index 3c6212dd..311681a0 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -29,6 +29,10 @@ import ( // Controller handles log queue messages. // It consumes request log entries and persists them to storage. // Implements consumer.Controller interface for integration with the consumer. +// +// The request log is written exclusively by the gateway: other services +// (e.g. the orchestrator) only publish log entries to the log topic, and this +// controller is the single consumer that persists them to storage. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -40,7 +44,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new log controller for the orchestrator. +// NewController creates a new log controller for the gateway. func NewController( logger *zap.SugaredLogger, scope tally.Scope, diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go similarity index 99% rename from submitqueue/orchestrator/controller/log/log_test.go rename to submitqueue/gateway/controller/log/log_test.go index f8a1319d..c88fad1a 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log") } func TestController_Process(t *testing.T) { diff --git a/test/e2e/submitqueue/BUILD.bazel b/test/e2e/submitqueue/BUILD.bazel index b0db796e..01bb5273 100644 --- a/test/e2e/submitqueue/BUILD.bazel +++ b/test/e2e/submitqueue/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//submitqueue/orchestrator/protopb", "//test/testutil", diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 19a82787..91fb0fb7 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -27,10 +27,12 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber/submitqueue/submitqueue/entity" gatewaypb "github.com/uber/submitqueue/submitqueue/gateway/protopb" orchestratorpb "github.com/uber/submitqueue/submitqueue/orchestrator/protopb" "github.com/uber/submitqueue/test/testutil" @@ -54,6 +56,17 @@ func TestE2EIntegration(t *testing.T) { suite.Run(t, new(E2EIntegrationSuite)) } +// The gateway log consumer runs inside the gateway-service container, so this +// suite can only observe persistence black-box through the Status RPC — there is +// no in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *E2EIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -168,6 +181,44 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) } +// TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log +// ownership invariant end-to-end: the orchestrator only *publishes* request log +// entries to the log topic (it never writes the request log itself), and the +// gateway's log consumer drains that topic and persists them to storage. +// +// We observe this through the gateway Status RPC: immediately after Land the +// status is "accepted" (the gateway's synchronous direct write), and once the +// orchestrator's start controller publishes "started" to the log topic, the +// gateway consumer persists it and Status advances to "started". Seeing +// "started" therefore proves the publish→consume→persist path works across both +// services. +func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { + t := s.T() + + landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ + Queue: "e2e-test-queue", + Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-startlog/pull/4242/abcdef0123456789abcdef0123456789abcdef01"}}, + Strategy: gatewaypb.Strategy_REBASE, + }) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") + sqid := landResp.Sqid + s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + return false + } + s.log.Logf("Status(%s) = %q", sqid, resp.Status) + return resp.Status == string(entity.RequestStatusStarted) + }, persistTimeout, persistPollInterval, + "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + + s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) +} + // TestCancelRequest_InvalidSqid verifies the gateway rejects an empty sqid // synchronously before publishing anything to the cancel queue. func (s *E2EIntegrationSuite) TestCancelRequest_InvalidSqid() { diff --git a/test/integration/submitqueue/gateway/BUILD.bazel b/test/integration/submitqueue/gateway/BUILD.bazel index 6fe77784..da3d0d41 100644 --- a/test/integration/submitqueue/gateway/BUILD.bazel +++ b/test/integration/submitqueue/gateway/BUILD.bazel @@ -16,11 +16,17 @@ go_test( "integration", ], deps = [ + "//extension/messagequeue/mysql", + "//submitqueue/core/consumer", + "//submitqueue/core/request", + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//test/testutil", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", "@org_golang_google_grpc//:grpc", + "@org_uber_go_zap//:zap", ], ) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index b8406262..1db104c8 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -30,12 +30,19 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/entity" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "github.com/uber/submitqueue/test/testutil" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -53,6 +60,17 @@ func TestGatewayIntegration(t *testing.T) { suite.Run(t, new(GatewayIntegrationSuite)) } +// The log consumer runs inside the gateway-service container, so this suite can +// only observe persistence black-box through the Status RPC — there is no +// in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *GatewayIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -144,3 +162,47 @@ func (s *GatewayIntegrationSuite) TestLandAPI() { s.log.Logf("Land API test passed: request stored and message published") } + +// TestRequestLogConsumer verifies the gateway's log-topic consumer in isolation: +// no orchestrator runs in this stack, so the test itself publishes a request log +// entry to the log topic exactly as the orchestrator does in production (via +// submitqueue/core/request.PublishLog). The gateway is the sole writer of the +// request log; this asserts its consumer drains the log topic and persists the +// entry to storage, observable through the Status RPC. +func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { + t := s.T() + + // Build a publisher against the shared queue database. NewQueue only wires up + // stores; nothing consumes until a subscriber is started, so this publish-only + // use does not interfere with the gateway container's consumer. + queue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.queueDB, + Logger: zap.NewNop(), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err, "failed to create queue publisher") + defer queue.Close() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyLog, Name: "log", Queue: queue}, + }) + require.NoError(t, err, "failed to create topic registry") + + const sqid = "log-consumer-test/1" + logEntry := entity.NewRequestLog(sqid, entity.RequestStatusStarted, 1, "", nil) + require.NoError(t, corerequest.PublishLog(s.ctx, registry, logEntry, sqid), + "failed to publish request log to log topic") + + s.log.Logf("Published 'started' log for sqid=%s; waiting for gateway consumer to persist it", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.client.Status(s.ctx, &pb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + return false + } + return resp.Status == string(entity.RequestStatusStarted) + }, persistTimeout, persistPollInterval, + "gateway log consumer should persist the published request log for sqid=%s", sqid) + + s.log.Logf("Request log consumer test passed: entry persisted and readable via Status") +}