diff --git a/CLAUDE.md b/CLAUDE.md index e0aad576..cef6dc1a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -109,6 +109,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er Controllers receive `consumer.Delivery` (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure. +**Queue payloads: IDs within a boundary, full payloads across one.** When producer and consumer share a store (same service — e.g. `build`→`buildsignal`, `validate`→`mergeconflict`), put only the entity **ID** on the queue and reload from storage (the store is the source of truth, messages stay small, redelivery is idempotent). When a queue **crosses a service boundary** (the consumer cannot read the producer's store — e.g. orchestrator→runway), publish the **full payload** the consumer needs, and have the **client own the correlation ID** so it can match the async result back to the work it is tracking. The queue's **owner defines the wire contract and topic keys** (in its own domain package); the other side imports them. + ### Entities Domain objects live under each domain's `entity/` tree, or under `platform/base/` when shared across domains. Guidelines: diff --git a/doc/rfc/submitqueue/extension-contract.md b/doc/rfc/submitqueue/extension-contract.md index 1fae9134..f0cf5519 100644 --- a/doc/rfc/submitqueue/extension-contract.md +++ b/doc/rfc/submitqueue/extension-contract.md @@ -4,7 +4,7 @@ Design notes for what SubmitQueue's pluggable extensions accept: orchestrator ** ## Problem -Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `mergechecker`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do: +Extension input granularity is inconsistent across the pipeline stages (see [workflow.md](workflow.md)). `conflict.Analyzer` takes identity (`entity.Batch`); `scorer`, `changeprovider`, `buildrunner`, `pusher` take controller-resolved `entity.Change`. The split caps what an extension can do: - `ConflictType` already names `target_overlap`, but a real target-overlap analyzer **cannot be written** — the batch controller hands it identity-level batches (no changed targets) and the contract has nowhere to put them. - `scorer` gets a URIs-only `Change`, so a heuristic scorer **cannot see** lines-changed / file-count. @@ -21,7 +21,7 @@ Both unblock with the shape `conflict` already uses: accept identity, resolve in | Stage | Loads | Resolves for the extension | Hands to the extension | |---|---|---|---| -| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `mergechecker`, `changeprovider` | +| `validate` | `entity.Request` | nothing — `request.Change` is already in hand (the change-store reads here serve duplicate detection) | `request.Change` → `changeprovider` | | `batch` | `entity.Request` + active `[]entity.Batch` | **nothing** — builds a batch whose `Contains` is `[requestID]` | `entity.Batch`, `[]entity.Batch` → `conflict` | | `score` | `entity.Batch`, then each `entity.Request` | batch → requests | `request.Change` per request, then multiplies the scores → `scorer` | | `build` | `entity.Batch`, then `collectChanges` | batch → requests → changes, **flattening batch boundaries** | base `[]Change`, head `[]Change` → `buildrunner` | @@ -35,13 +35,14 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and |---|---|---|---|---|---| | `conflict.Analyzer` | batch | identity (`Batch`, `[]Batch`) | unchanged — **the baseline** | conflicting in-flight batches (`[]Conflict`, `BatchID`-tagged) — unchanged | request store + change provider | | `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider | -| `mergechecker.MergeChecker` | validate | `Change` | `entity.Request` | mergeability (`Result`) — unchanged | none | | `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver | | `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider | | `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider | | `storage`, `changestore`, `queueconfig` | — | keys + entities | unchanged — resolution targets | entities | — | -**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; five of the six return contracts — conflicts, score, mergeability, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes. +**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes. + +The validate-time mergeability check runs **asynchronously and out-of-process** in runway rather than as an in-process extension: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-check` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The in-process `mergechecker` package is unused on the validate path. Non-obvious points: diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 52e1aff5..6711b72b 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -1,6 +1,6 @@ # Orchestrator Workflow -The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. +The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. A stage that crosses a service boundary is the exception: it publishes a full payload to the other service's queue and consumes a full payload back, because neither service can read the other's storage. (The `validate`→`mergeconflictsignal` hop is one such stage: `validate` hands a check to runway and `mergeconflictsignal` consumes the result.) See the queue-payload-boundary rule in [CLAUDE.md](../../../CLAUDE.md). The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. @@ -21,7 +21,20 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` │ ▼ │ ┌──────────────────────────────────┐ │ │ validate │ - │ │ Check mergeability + change meta │ + │ │ Dedup, fetch metadata, publish │ + │ │ check request to runway │ + │ └────────────────┬─────────────────┘ + │ MergeRequest + │ ▼ + │ ╔══════════════════════════════════╗ + │ ║ runway (separate service) ║ + │ ║ Attempt merge, emit result ║ + │ ╚════════════════┬═════════════════╝ + │ MergeResult + │ ▼ + │ ┌──────────────────────────────────┐ + │ │ mergeconflictsignal │ + │ │ Correlate result, gate request │ │ └────────────────┬─────────────────┘ │ │ RequestID │ ▼ @@ -71,7 +84,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` |---|---|---|---| | **gateway/Land** | RPC | start | Accept request, mint ID, log Accepted, hand off async | | **start** | LandRequest | validate, log | Persist Request and emit Started log | -| **validate** | RequestID | batch | Check mergeability and fetch change metadata | +| **validate** | RequestID | merge-conflict-check (runway) | Dedup, fetch change metadata, claim changes, then publish the full check request to runway (keyed by the request id, the correlation id) | +| **mergeconflictsignal** | MergeResult | batch | Correlate runway's result; advance if mergeable, fail if conflicted | | **batch** | RequestID | score | Group request into a Batch with dependencies | | **score** | BatchID | speculate, log | Score the batch (∏ per-request scores), persist score | | **speculate** | BatchID | build, merge | (stub) Decide whether to verify via CI or land | @@ -85,7 +99,7 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". -The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected request (`RequestID`) or batch (`BatchID`) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches, with fan-out to the member requests. A DLQ whose topic carries a full payload rather than a bare ID recovers the id from that payload instead — for example the `mergeconflictsignal` DLQ reads it from the runway `MergeResult` the producer echoed back. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index c21e738e..829f8658 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/orchestrator/server", visibility = ["//visibility:private"], deps = [ + "//api/runway/messagequeue", "//api/submitqueue/orchestrator/protopb", "//platform/consumer", "//platform/errs", @@ -34,9 +35,6 @@ go_library( "//submitqueue/extension/conflict/fake", "//submitqueue/extension/conflict/fileoverlap", "//submitqueue/extension/conflict/none", - "//submitqueue/extension/mergechecker", - "//submitqueue/extension/mergechecker/fake", - "//submitqueue/extension/mergechecker/github", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/fake", "//submitqueue/extension/pusher/git", @@ -54,6 +52,7 @@ go_library( "//submitqueue/orchestrator/controller/conclude", "//submitqueue/orchestrator/controller/dlq", "//submitqueue/orchestrator/controller/merge", + "//submitqueue/orchestrator/controller/mergeconflictsignal", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", "//submitqueue/orchestrator/controller/start", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index e62cfdce..1a388aa8 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -30,6 +30,7 @@ import ( "golang.org/x/oauth2" "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" pb "github.com/uber/submitqueue/api/submitqueue/orchestrator/protopb" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" @@ -53,9 +54,6 @@ import ( conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake" "github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap" "github.com/uber/submitqueue/submitqueue/extension/conflict/none" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" - mcfake "github.com/uber/submitqueue/submitqueue/extension/mergechecker/fake" - githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake" gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git" @@ -73,6 +71,7 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/start" @@ -239,7 +238,6 @@ func run() error { } // Per-extension factories all resolve against the registry by queue name. - mcf := mergeCheckerFactory{queues} cpf := changeProviderFactory{queues} pshf := pusherFactory{queues} brf := buildRunnerFactory{queues} @@ -247,7 +245,7 @@ func run() error { cof := analyzerFactory{queues} // Register controllers - primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, mcf, cpf, pshf, brf, scf, cof, cnt, store) + primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store) if err != nil { return err } @@ -377,6 +375,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe {topickey.TopicKeyStart, "start", "orchestrator-start"}, {topickey.TopicKeyCancel, "cancel", "orchestrator-cancel"}, {topickey.TopicKeyValidate, "validate", "orchestrator-validate"}, + {runwaymq.TopicKeyMergeConflictCheckSignal, "merge-conflict-check-signal", "orchestrator-mergeconflictsignal"}, {topickey.TopicKeyBatch, "batch", "orchestrator-batch"}, {topickey.TopicKeyScore, "score", "orchestrator-score"}, {topickey.TopicKeySpeculate, "speculate", "orchestrator-speculate"}, @@ -430,17 +429,35 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe Queue: q, }) + // Publish-only: the orchestrator hands merge-conflict check requests to + // runway via the runway-owned merge-conflict-check queue. Runway is the + // sole consumer, so the orchestrator registers no consuming subscription + // (and no DLQ) here — the inbound result arrives on the separate + // merge-conflict-check-signal queue, which is a consumed primary topic + // above. + configs = append(configs, consumer.TopicConfig{ + Key: runwaymq.TopicKeyMergeConflictCheck, + Name: "merge-conflict-check", + Queue: q, + }) + return consumer.NewTopicRegistry(configs) } // registerPrimaryControllers creates all pipeline controllers and registers // them with the primary consumer. Pipeline: // -// request → validate → batch → score → speculate → build → buildsignal ─┐ -// ↑ ↘ ↻ poll │ -// │ merge → conclude │ -// │ │ │ -// └────────┴───────────────────────┘ +// request → validate ⇢ (runway) ⇢ mergeconflictsignal → batch → score → speculate → build → buildsignal ─┐ +// ↑ ↘ ↻ poll │ +// │ merge → conclude │ +// │ │ │ +// └────────┴───────────────────────┘ +// +// The merge-conflict check is asynchronous and crosses a service boundary: +// validate publishes the full check request to the runway-owned +// merge-conflict-check queue (⇢); runway performs the merge attempt and +// publishes the result to merge-conflict-check-signal, which mergeconflictsignal +// consumes before fanning the request out to batch. // TODO(wiring abstraction): queueExtensions + queueRegistry currently live here // as example-local wiring. Evaluate promoting them into a defined abstraction in @@ -459,7 +476,6 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // read as "for this queue, here are its scorer, analyzer, pusher, …", and lets // a queue profile start from a baseline and override only what differs. type queueExtensions struct { - mergeChecker mergechecker.MergeChecker changeProvider changeprovider.ChangeProvider pusher pusher.Pusher buildRunner buildrunner.BuildRunner @@ -486,12 +502,6 @@ func (r queueRegistry) get(queue string) queueExtensions { // The per-extension factories below are thin adapters: each satisfies its // extension's Factory contract by resolving the queue's profile from the // registry. All routing logic lives here in the wiring layer. -type mergeCheckerFactory struct{ reg queueRegistry } - -func (f mergeCheckerFactory) For(cfg mergechecker.Config) (mergechecker.MergeChecker, error) { - return f.reg.get(cfg.QueueName).mergeChecker, nil -} - type changeProviderFactory struct{ reg queueRegistry } func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.ChangeProvider, error) { @@ -522,7 +532,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) { return f.reg.get(cfg.QueueName).analyzer, nil } -func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mcf mergechecker.Factory, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { +func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { var count int requestController := start.NewController( logger, @@ -555,8 +565,8 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - mcf, cpf, + runwaymq.TopicKeyMergeConflictCheck, topickey.TopicKeyValidate, "orchestrator-validate", ) @@ -565,6 +575,19 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, } count++ + mergeconflictsignalController := mergeconflictsignal.NewController( + logger, + scope, + store, + registry, + runwaymq.TopicKeyMergeConflictCheckSignal, + "orchestrator-mergeconflictsignal", + ) + if err := c.Register(mergeconflictsignalController); err != nil { + return count, fmt.Errorf("failed to register mergeconflictsignal controller: %w", err) + } + count++ + batchController := batch.NewController( logger, scope, @@ -678,6 +701,7 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop {"start_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeLandRequestID, dlq.TopicKey(topickey.TopicKeyStart), "orchestrator-start-dlq")}, {"cancel_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeCancelRequestID, dlq.TopicKey(topickey.TopicKeyCancel), "orchestrator-cancel-dlq")}, {"validate_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq")}, + {"mergeconflictsignal_dlq", dlq.NewDLQMergeConflictSignalController(logger, dlqScope, store, dlq.TopicKey(runwaymq.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq")}, {"batch_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyBatch), "orchestrator-batch-dlq")}, {"score_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyScore), "orchestrator-score-dlq")}, {"speculate_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeySpeculate), "orchestrator-speculate-dlq")}, @@ -717,34 +741,6 @@ func parseTimeout(envVal string, defaultVal time.Duration) time.Duration { return defaultVal } -// newMergeChecker creates a MergeChecker for GitHub (github.com), configured via -// GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is unset -// it returns the fake merge checker (every change mergeable unless a URI carries -// a failure marker, see mergechecker/fake), keeping the example runnable without -// GitHub and letting e2e tests drive unmergeable scenarios via request payloads. -func newMergeChecker(logger *zap.Logger, scope tally.Scope) (mergechecker.MergeChecker, error) { - if os.Getenv("GITHUB_TOKEN") == "" { - logger.Warn("GITHUB_TOKEN not set; using fake merge checker (every change mergeable unless URI-marked)") - return mcfake.New(), nil - } - - client, err := http.NewClient(getEnv("GITHUB_BASE_URL", "https://api.github.com")) - if err != nil { - return nil, fmt.Errorf("failed to build GitHub HTTP client: %w", err) - } - - ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: os.Getenv("GITHUB_TOKEN")}) - client.Transport = &oauth2.Transport{Source: ts, Base: client.Transport} - - client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second) - - return githubchecker.NewMergeChecker(githubchecker.Params{ - HTTPClient: client, - Logger: logger.Sugar(), - MetricsScope: scope.SubScope("mergechecker"), - }), nil -} - // newChangeProvider creates a ChangeProvider for GitHub (github.com), configured // via GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is // unset it returns the fake change provider (one empty ChangeInfo per URI unless @@ -801,10 +797,6 @@ func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolve // baseline. This is the one place queue topology lives; extension packages stay // queue-agnostic. func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (queueRegistry, error) { - mc, err := newMergeChecker(logger, scope) - if err != nil { - return queueRegistry{}, fmt.Errorf("failed to create merge checker: %w", err) - } cp, err := newChangeProvider(logger, scope) if err != nil { return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err) @@ -833,7 +825,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. // on a queue to exercise the analyzer error path, as e2e-conflict-error-queue // below does. base := queueExtensions{ - mergeChecker: mc, changeProvider: cp, pusher: psh, buildRunner: buildfake.New(resolver), diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel index 2e2626f5..db85dd8a 100644 --- a/submitqueue/orchestrator/controller/dlq/BUILD.bazel +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -7,11 +7,13 @@ go_library( "buildsignal.go", "dlq.go", "log.go", + "mergeconflictsignal.go", "request.go", ], importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq", visibility = ["//visibility:public"], deps = [ + "//api/runway/messagequeue", "//platform/consumer", "//platform/metrics", "//submitqueue/entity", @@ -28,10 +30,12 @@ go_test( "buildsignal_test.go", "dlq_test.go", "log_test.go", + "mergeconflictsignal_test.go", "request_test.go", ], embed = [":dlq"], deps = [ + "//api/runway/messagequeue", "//platform/base/messagequeue", "//platform/consumer", "//platform/errs", diff --git a/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go new file mode 100644 index 00000000..1d75a4ce --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal.go @@ -0,0 +1,109 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// mergeConflictSignalController is the DLQ reconciler for the +// mergeconflictsignal topic. Its payload carries a runway +// MergeResult whose id is the request id echoed back, so +// reconciliation fails that request directly via failRequest. +type mergeConflictSignalController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify mergeConflictSignalController implements consumer.Controller at compile time. +var _ consumer.Controller = (*mergeConflictSignalController)(nil) + +// NewDLQMergeConflictSignalController builds a DLQ controller for the +// mergeconflictsignal topic. +func NewDLQMergeConflictSignalController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &mergeConflictSignalController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for the mergeconflictsignal topic. +func (c *mergeConflictSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + result, err := runwaymq.MergeResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to decode merge conflict check result from dlq payload: %w", err) + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "request_id", result.Id, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + if err := failRequest(ctx, c.store, c.logger, result.Id); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *mergeConflictSignalController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *mergeConflictSignalController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *mergeConflictSignalController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go new file mode 100644 index 00000000..01122385 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergeconflictsignal_test.go @@ -0,0 +1,75 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQMergeConflictSignalController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + assert.Equal(t, "merge-conflict-check-signal_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("merge-conflict-check-signal_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-mergeconflictsignal-dlq", c.ConsumerGroup()) +} + +func TestDLQMergeConflictSignalController_Process_ReconcilesRequest(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + payload, err := runwaymq.MergeResultToBytes(&runwaymq.MergeResult{Id: "q/1", Success: false, Reason: "boom"}) + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQMergeConflictSignalController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQMergeConflictSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeConflictCheckSignal), "orchestrator-mergeconflictsignal-dlq") + + delivery := newMockDelivery(ctrl, []byte("garbage")) + require.Error(t, c.Process(context.Background(), delivery)) +} diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel b/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel new file mode 100644 index 00000000..b267d2d9 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflictsignal", + srcs = ["mergeconflictsignal.go"], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//submitqueue/core/request", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflictsignal_test", + srcs = ["mergeconflictsignal_test.go"], + embed = [":mergeconflictsignal"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go new file mode 100644 index 00000000..5b99f7f0 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal.go @@ -0,0 +1,219 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mergeconflictsignal consumes merge-conflict check results from runway's +// signal queue, correlates them to the request by the echoed id, and either +// advances the request to the batch stage (mergeable) or fails it (conflicted). +// Unlike buildsignal it is purely result-driven — runway pushes the result, so +// there is no poll loop or self-reschedule. +package mergeconflictsignal + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// Controller handles mergeconflictsignal queue messages. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new mergeconflictsignal controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + registry consumer.TopicRegistry, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("mergeconflictsignal_controller"), + metricsScope: scope.SubScope("mergeconflictsignal_controller"), + store: store, + registry: registry, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process consumes a runway check result and advances or fails the request. +// Returns nil to ack, or error to nack/reject. +// +// A not-mergeable verdict is an expected outcome of the check, not a failure: +// the request is driven to terminal Error inline and the message is acked. Only +// infrastructure faults — deserialize, storage, the terminal transition, and the +// batch publish — return an error and reject to the DLQ, where the request is +// reconciled to Error. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + // The runway result carries full data (it crosses the service boundary). Its + // id is the request id echoed back, so correlate straight to the request. + result, err := runwaymq.MergeResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge conflict check result: %w", err) + } + + request, err := c.store.GetRequestStore().Get(ctx, result.Id) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get request %s: %w", result.Id, err) + } + + c.logger.Infow("received mergeconflict signal", + "request_id", request.ID, + "mergeable", result.Success, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // Short-circuit halted requests: the cancel path owns driving them terminal. + if entity.IsRequestStateHalted(request.State) { + metrics.NamedCounter(c.metricsScope, opName, "skipped_halted", 1) + c.logger.Infow("skipping mergeconflict signal for halted request", + "request_id", request.ID, + "state", string(request.State), + ) + return nil + } + + if !result.Success { + metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1) + c.logger.Infow("request not mergeable", + "request_id", request.ID, + "reason", result.Reason, + ) + // Expected terminal outcome, not a failure: mark the request Error inline + // and ack. + if err := c.failRequest(ctx, request, result.Reason); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "fail_errors", 1) + return fmt.Errorf("failed to fail request %s: %w", request.ID, err) + } + return nil + } + + if err := c.publishRequestID(ctx, topickey.TopicKeyBatch, request.ID, request.Queue); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish to batch: %w", err) + } + + c.logger.Infow("published request to batch", + "request_id", request.ID, + "topic_key", topickey.TopicKeyBatch, + ) + + return nil // Success - message will be acked +} + +// failRequest drives the request to terminal RequestStateError and records the +// conflict reason on the request log. A not-mergeable verdict is an expected +// terminal outcome of the check, so the request is concluded here directly. +// +// Idempotent under at-least-once delivery: a redelivery whose request is already +// in Error skips the state CAS but still publishes the log (so a prior attempt +// that flipped the state but failed before logging is repaired); a request that +// reached a different terminal state (e.g. a racing cancel) is left untouched. +func (c *Controller) failRequest(ctx context.Context, request entity.Request, reason string) error { + switch { + case request.State == entity.RequestStateError: + // Idempotent retry: a prior delivery already wrote Error. Fall through to + // the log publish. + case entity.IsRequestStateTerminal(request.State): + c.logger.Warnw("request already in different terminal state, skipping fail", + "request_id", request.ID, + "state", string(request.State), + ) + return nil + default: + newVersion := request.Version + 1 + if err := c.store.GetRequestStore().UpdateState(ctx, request.ID, request.Version, newVersion, entity.RequestStateError); err != nil { + return fmt.Errorf("failed to update request %s state to error: %w", request.ID, err) + } + request.Version = newVersion + request.State = entity.RequestStateError + } + + logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusError, request.Version, reason, nil) + if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil { + return fmt.Errorf("failed to publish request log for %s: %w", request.ID, err) + } + return nil +} + +// publishRequestID publishes a request ID to the given topic key, partitioned by queue. +func (c *Controller) publishRequestID(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + payload, err := entity.RequestID{ID: requestID}.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request ID: %w", err) + } + + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "mergeconflictsignal" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go new file mode 100644 index 00000000..3d63d6eb --- /dev/null +++ b/submitqueue/orchestrator/controller/mergeconflictsignal/mergeconflictsignal_test.go @@ -0,0 +1,169 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictsignal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func resultPayload(t *testing.T, res runwaymq.MergeResult) []byte { + payload, err := runwaymq.MergeResultToBytes(&res) + require.NoError(t, err) + return payload +} + +func newDelivery(ctrl *gomock.Controller, msg entityqueue.Message) *queuemock.MockDelivery { + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +const ( + testRequestID = "test-queue/1" + testQueue = "test-queue" +) + +func TestProcess_MergeablePublishesToBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateStarted, Version: 1}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaymq.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwaymq.MergeResult{Id: testRequestID, Success: true} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) + + assert.Equal(t, "batch", gotTopic) + rid, err := entity.RequestIDFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, testRequestID, rid.ID) +} + +func TestProcess_NotMergeableMarksRequestError(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateStarted, Version: 1}, nil) + // The request is driven to terminal Error inline (version 1 -> 2). + reqStore.EXPECT().UpdateState(gomock.Any(), testRequestID, int32(1), int32(2), entity.RequestStateError).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + // One publish is expected — the terminal log entry to the log topic. A publish + // to the batch topic would be a bug (gomock fails on the unexpected 2nd call). + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: q}, + }, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaymq.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwaymq.MergeResult{Id: testRequestID, Success: false, Reason: "conflict in foo.go"} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + // Not-mergeable is an expected terminal outcome, so Process acks (no error). + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) + + // The single publish is the terminal log entry carrying the conflict reason. + assert.Equal(t, "log", gotTopic) + logEntry, err := entity.RequestLogFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, entity.RequestStatusError, logEntry.Status) + assert.Equal(t, int32(2), logEntry.RequestVersion) + assert.Equal(t, "conflict in foo.go", logEntry.LastError) +} + +func TestProcess_HaltedRequestSkips(t *testing.T) { + ctrl := gomock.NewController(t) + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), testRequestID).Return( + entity.Request{ID: testRequestID, Queue: testQueue, State: entity.RequestStateCancelled, Version: 4}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + // No publish: gomock fails if a batch publish runs for a halted request. + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: q}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, + runwaymq.TopicKeyMergeConflictCheckSignal, "orchestrator-mergeconflictsignal") + + res := runwaymq.MergeResult{Id: testRequestID, Success: true} + msg := entityqueue.NewMessage(testRequestID, resultPayload(t, res), testQueue, nil) + require.NoError(t, controller.Process(context.Background(), newDelivery(ctrl, msg))) +} diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index c3056bb3..c95570c2 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -6,14 +6,16 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/validate", visibility = ["//visibility:public"], deps = [ + "//api/base/change/protopb", + "//api/base/mergestrategy/protopb", + "//api/runway/messagequeue", + "//platform/base/mergestrategy", "//platform/base/messagequeue", "//platform/consumer", "//platform/errs", "//platform/metrics", - "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider", - "//submitqueue/extension/mergechecker", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", @@ -25,6 +27,8 @@ go_test( srcs = ["validate_test.go"], embed = [":validate"], deps = [ + "//api/base/mergestrategy/protopb", + "//api/runway/messagequeue", "//platform/base/change", "//platform/base/mergestrategy", "//platform/base/messagequeue", @@ -34,8 +38,6 @@ go_test( "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider/mock", - "//submitqueue/extension/mergechecker", - "//submitqueue/extension/mergechecker/mock", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 81f782f5..7d05406b 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -21,29 +21,32 @@ import ( "time" "github.com/uber-go/tally" + changepb "github.com/uber/submitqueue/api/base/change/protopb" + strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" coremetrics "github.com/uber/submitqueue/platform/metrics" - "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles validate queue messages. -// It consumes requests, performs validation checks (duplicate detection via the change store, -// merge conflicts, change metadata fetch), and publishes to the batch stage. Validation logic -// is extensible to support additional checks. Implements consumer.Controller. +// It consumes requests, performs local validation checks (duplicate detection via the change store +// and change metadata fetch), then kicks off the asynchronous merge-conflict check by publishing the +// full check request to runway's merge-conflict-check queue. Validation logic is extensible to +// support additional checks. Implements consumer.Controller. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage registry consumer.TopicRegistry - mergeCheckers mergechecker.Factory changeProviders changeprovider.Factory + runwayTopicKey consumer.TopicKey topicKey consumer.TopicKey consumerGroup string } @@ -52,13 +55,15 @@ type Controller struct { var _ consumer.Controller = (*Controller)(nil) // NewController creates a new validate controller for the orchestrator. +// runwayTopicKey is the runway-owned topic the merge-conflict check request is +// published to (TopicKeyMergeConflictCheck). func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry, - mergeCheckers mergechecker.Factory, changeProviders changeprovider.Factory, + runwayTopicKey consumer.TopicKey, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -67,15 +72,16 @@ func NewController( metricsScope: scope.SubScope("validate_controller"), store: store, registry: registry, - mergeCheckers: mergeCheckers, changeProviders: changeProviders, + runwayTopicKey: runwayTopicKey, topicKey: topicKey, consumerGroup: consumerGroup, } } // Process processes a validate delivery from the queue. -// Runs duplicate detection, merge-conflict check, change metadata fetch, then publishes to batch. +// Runs duplicate detection, change metadata fetch, and change claiming, then kicks off the +// asynchronous merge-conflict check by publishing the full check request to runway. // Returns nil to ack (success or non-retryable rejection), error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { op := coremetrics.Begin(c.metricsScope, "process") @@ -135,27 +141,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return errs.NewUserError(fmt.Errorf("request %s is a duplicate of in-flight request %s", request.ID, dupID)) } - // Merge conflict check - mergeChecker, err := c.mergeCheckers.For(mergechecker.Config{QueueName: request.Queue}) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) - return fmt.Errorf("failed to build merge checker for queue %s: %w", request.Queue, err) - } - mergeResult, err := mergeChecker.Check(ctx, request) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "merge_check_errors", 1) - return fmt.Errorf("merge check failed: %w", err) - } - if !mergeResult.Mergeable { - c.logger.Infow("request not mergeable", - "request_id", request.ID, - "queue", request.Queue, - "reason", mergeResult.Reason, - ) - coremetrics.NamedCounter(c.metricsScope, "process", "not_mergeable", 1) - return errs.NewUserError(fmt.Errorf("request %s is not mergeable: %s", request.ID, mergeResult.Reason)) - } - // Fetch change metadata changeProvider, err := c.changeProviders.For(changeprovider.Config{QueueName: request.Queue}) if err != nil { @@ -184,15 +169,31 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to claim change records for request %s: %w", request.ID, err) } - // Publish to batch topic - if err := c.publish(ctx, topickey.TopicKeyBatch, request.ID, request.Queue); err != nil { + // Kick off the asynchronous merge-conflict check: hand the full check request + // to runway via its merge-conflict-check queue, keyed by the request id (the + // client-owned correlation id) so a redelivery republishes the same id and the + // result correlates straight back. At validate time the check is a single step + // (candidate vs target branch). + req := &runwaymq.MergeRequest{ + Id: request.ID, + QueueName: request.Queue, + Steps: []*runwaymq.MergeStep{ + { + StepId: request.ID, + Changes: []*changepb.Change{{Uris: request.Change.URIs}}, + Strategy: toProtoStrategy(request.LandStrategy), + }, + }, + } + if err := c.publishMergeCheck(ctx, req); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) - return fmt.Errorf("failed to publish to batch: %w", err) + // Retryable: the hand-off to runway is what keeps this check alive. + return errs.NewRetryableError(fmt.Errorf("failed to publish to runway merge-conflict-check: %w", err)) } - c.logger.Infow("published request to batch", + c.logger.Infow("published merge conflict check to runway", "request_id", request.ID, - "topic_key", topickey.TopicKeyBatch, + "topic_key", c.runwayTopicKey, ) return nil // Success - message will be acked @@ -241,24 +242,24 @@ func (c *Controller) checkDuplicate(ctx context.Context, request entity.Request) return "", nil } -// publish publishes a request ID to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { - rid := entity.RequestID{ID: requestID} - payload, err := rid.ToBytes() +// publishMergeCheck serializes the runway check request and publishes it to the +// runway merge-conflict-check topic, partitioned by queue. +func (c *Controller) publishMergeCheck(ctx context.Context, req *runwaymq.MergeRequest) error { + payload, err := runwaymq.MergeRequestToBytes(req) if err != nil { - return fmt.Errorf("failed to serialize request ID: %w", err) + return fmt.Errorf("failed to serialize merge conflict check request: %w", err) } - msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) + msg := entityqueue.NewMessage(req.Id, payload, req.QueueName, nil) - q, ok := c.registry.Queue(key) + q, ok := c.registry.Queue(c.runwayTopicKey) if !ok { - return fmt.Errorf("no queue registered for topic key %s", key) + return fmt.Errorf("no queue registered for topic key %s", c.runwayTopicKey) } - topicName, ok := c.registry.TopicName(key) + topicName, ok := c.registry.TopicName(c.runwayTopicKey) if !ok { - return fmt.Errorf("no topic name registered for topic key %s", key) + return fmt.Errorf("no topic name registered for topic key %s", c.runwayTopicKey) } if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { @@ -268,6 +269,22 @@ func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request return nil } +// toProtoStrategy maps the shared mergestrategy.MergeStrategy entity to the proto +// Strategy enum carried on the wire. An unknown strategy maps to DEFAULT, letting +// runway apply the queue's configured default. +func toProtoStrategy(s mergestrategy.MergeStrategy) strategypb.Strategy { + switch s { + case mergestrategy.MergeStrategyRebase: + return strategypb.Strategy_REBASE + case mergestrategy.MergeStrategySquashRebase: + return strategypb.Strategy_SQUASH_REBASE + case mergestrategy.MergeStrategyMerge: + return strategypb.Strategy_MERGE + default: + return strategypb.Strategy_DEFAULT + } +} + // claimChanges persists one ChangeRecord per fetched ChangeInfo, capturing the // provider details at claim time. The record's identity (queue, uri, request_id) // and its Details are written together in a single immutable Create — there is no diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index 5580b7e4..48ceba4c 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" "github.com/uber/submitqueue/platform/base/change" "github.com/uber/submitqueue/platform/base/mergestrategy" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" @@ -31,8 +33,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" - "github.com/uber/submitqueue/submitqueue/extension/mergechecker" - mergecheckermock "github.com/uber/submitqueue/submitqueue/extension/mergechecker/mock" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -66,13 +66,6 @@ func (m *mockChangeProvider) Get(ctx context.Context, request entity.Request) ([ }, nil } -// newMergeableMock returns a mock MergeChecker that always returns mergeable. -func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{Mergeable: true}, nil).AnyTimes() - return mc -} - // newMockStorage creates a MockStorage with a MockRequestStore that returns the given request on Get. // The returned MockRequestStore is exposed so individual tests can layer additional Get expectations. func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemock.MockStorage, *storagemock.MockRequestStore) { @@ -100,7 +93,6 @@ func newTestController( ctrl *gomock.Controller, store *storagemock.MockStorage, cs *storagemock.MockChangeStore, - mc mergechecker.MergeChecker, publishErr error, ) *Controller { logger := zaptest.NewLogger(t).Sugar() @@ -119,23 +111,19 @@ func newTestController( mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: mockQ}}, + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMergeConflictCheck, Name: "merge-conflict-check", Queue: mockQ}}, ) require.NoError(t, err) cp := &mockChangeProvider{} - - mcFactory := mergecheckermock.NewMockFactory(ctrl) - mcFactory.EXPECT().For(gomock.Any()).Return(mc, nil).AnyTimes() cpFactory := changeprovidermock.NewMockFactory(ctrl) cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() - return NewController(logger, scope, store, registry, mcFactory, cpFactory, topickey.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, cpFactory, runwaymq.TopicKeyMergeConflictCheck, topickey.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -145,7 +133,7 @@ func TestNewController(t *testing.T) { Version: 1, } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) require.NotNil(t, controller) assert.Equal(t, topickey.TopicKeyValidate, controller.TopicKey()) @@ -155,7 +143,6 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -166,7 +153,7 @@ func TestController_Process_Success(t *testing.T) { Version: 1, } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) msg := entityqueue.NewMessage("test-queue/123", requestIDPayload(t, request.ID), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -176,12 +163,71 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, controller.Process(context.Background(), delivery)) } +// TestController_Process_PublishesCheckToRunway verifies the full merge-conflict +// check request is published to runway's merge-conflict-check queue (keyed by +// the request id, the client-owned correlation id) on the happy path. +func TestController_Process_PublishesCheckToRunway(t *testing.T) { + ctrl := gomock.NewController(t) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: change.Change{URIs: []string{"github://uber/service/pull/456/abcdef0123456789abcdef0123456789abcdef01"}}, + LandStrategy: mergestrategy.MergeStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store, _ := newMockStorage(ctrl, request) + store.EXPECT().GetChangeStore().Return(newMockChangeStore(ctrl)).AnyTimes() + + logger := zaptest.NewLogger(t).Sugar() + + var gotTopic string + var gotPayload []byte + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMergeConflictCheck, Name: "merge-conflict-check", Queue: mockQ}}, + ) + require.NoError(t, err) + cpFactory := changeprovidermock.NewMockFactory(ctrl) + cpFactory.EXPECT().For(gomock.Any()).Return(&mockChangeProvider{}, nil).AnyTimes() + + controller := NewController(logger, tally.NoopScope, store, registry, cpFactory, runwaymq.TopicKeyMergeConflictCheck, topickey.TopicKeyValidate, "orchestrator-validate") + + msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + + // Full payload published to runway, keyed by the request id (the correlation id). + assert.Equal(t, "merge-conflict-check", gotTopic) + got, err := runwaymq.MergeRequestFromBytes(gotPayload) + require.NoError(t, err) + assert.Equal(t, request.ID, got.Id) + assert.Equal(t, request.Queue, got.QueueName) + require.Len(t, got.Steps, 1) + assert.Equal(t, request.ID, got.Steps[0].StepId) + require.Len(t, got.Steps[0].Changes, 1) + assert.Equal(t, request.Change.URIs, got.Steps[0].Changes[0].Uris) + assert.Equal(t, strategypb.Strategy_REBASE, got.Steps[0].Strategy) +} + // TestController_Process_ClaimsChangeRecordsWithDetails verifies that, on the happy // path, validate creates a change record per fetched change, capturing the provider // details in a single immutable Create. func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) // The request's URI matches the URI the mock change provider returns, so the // claim carries that change's details. @@ -214,7 +260,7 @@ func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { }, ) - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -226,14 +272,13 @@ func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) msg := entityqueue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -247,7 +292,6 @@ func TestController_Process_StorageFailure(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -259,80 +303,29 @@ func TestController_Process_PublishFailure(t *testing.T) { } store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), fmt.Errorf("publish failed")) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - assert.Error(t, controller.Process(context.Background(), delivery)) + // The hand-off to runway is retryable: a transient enqueue blip should replay + // rather than strand the request. + err := controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ID: "test-queue/123", Queue: "test-queue"} store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) + controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) var _ consumer.Controller = controller } -func TestController_Process_NotMergeable(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{Mergeable: false}, nil) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: change.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}, - LandStrategy: mergestrategy.MergeStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) - - msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err := controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.True(t, errs.IsUserError(err)) -} - -func TestController_Process_MergeCheckError(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("merge check failed")) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: change.Change{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}, - LandStrategy: mergestrategy.MergeStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - store, _ := newMockStorage(ctrl, request) - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) - - msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err := controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} - func TestController_Process_DuplicateDetection(t *testing.T) { const ( queueName = "test-queue" @@ -356,7 +349,7 @@ func TestController_Process_DuplicateDetection(t *testing.T) { wantUnexpected bool }{ { - name: "no overlap proceeds to merge check", + name: "no overlap proceeds", byURI: map[string][]entity.ChangeRecord{uriA: nil}, }, { @@ -445,7 +438,6 @@ func TestController_Process_DuplicateDetection(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) uris := tt.requestURIs if uris == nil { @@ -485,7 +477,7 @@ func TestController_Process_DuplicateDetection(t *testing.T) { // and claims each fetched change via Create. Accept any Create. cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -510,7 +502,6 @@ func TestController_Process_DuplicateDetection(t *testing.T) { func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) request := entity.Request{ ID: "test-queue/123", @@ -525,7 +516,7 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down")) - controller := newTestController(t, ctrl, store, cs, mc, nil) + controller := newTestController(t, ctrl, store, cs, nil) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -540,8 +531,8 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { // A request already in a terminal state (e.g. cancelled while the validate // message was in flight) must be short-circuited before any extension is // touched and before any publish happens. We verify this by registering a -// merge checker and change store with NO expectations — gomock fails the test -// if either is called — and a publisher that returns an error if invoked. +// change store with NO expectations — gomock fails the test if it is called — +// and a publisher that returns an error if invoked. func TestController_Process_TerminalShortCircuit(t *testing.T) { for _, state := range []entity.RequestState{ entity.RequestStateCancelled, @@ -559,13 +550,12 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { } store, _ := newMockStorage(ctrl, request) - // No EXPECTs on merge checker or change store: gomock will fail if either is called. - mc := mergecheckermock.NewMockMergeChecker(ctrl) + // No EXPECTs on change store: gomock will fail if it is called. cs := storagemock.NewMockChangeStore(ctrl) // Sentinel publish error: if Process publishes, it returns a non-nil err, // which the require.NoError below will catch. - controller := newTestController(t, ctrl, store, cs, mc, fmt.Errorf("should not publish")) + controller := newTestController(t, ctrl, store, cs, fmt.Errorf("should not publish")) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl)