From 802e3a1fa19bc417b16db1b068517c6a7a070a89 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 17 Jun 2026 13:19:01 -0700 Subject: [PATCH] feat(api/runway): add external merge queue contract Add Runway's published, language-neutral merge queue contract. merge.proto defines MergeRequest/MergeResult, reusing the shared Change and Strategy types and the topic_keys option, generated into protopb and serialized as protobuf JSON so the queue keeps storing self-describing JSON. The Go helpers wrap protojson and expose the topic binding via reflection; topic keys are co-located with the contract. A drift test round-trips the payloads and verifies every topic key is bound to exactly one message. --- CLAUDE.md | 20 +- Makefile | 2 +- api/runway/messagequeue/BUILD.bazel | 31 ++ api/runway/messagequeue/README.md | 22 ++ api/runway/messagequeue/merge.go | 107 +++++ api/runway/messagequeue/merge_test.go | 133 +++++++ api/runway/messagequeue/proto/BUILD.bazel | 51 +++ api/runway/messagequeue/proto/merge.proto | 101 +++++ api/runway/messagequeue/protopb/BUILD.bazel | 15 + api/runway/messagequeue/protopb/merge.pb.go | 417 ++++++++++++++++++++ api/runway/messagequeue/topics.go | 42 ++ runway/core/topickey/BUILD.bazel | 9 - runway/core/topickey/topickey.go | 46 --- runway/entity/BUILD.bazel | 24 -- runway/entity/merge.go | 128 ------ runway/entity/merge_test.go | 88 ----- tool/proto/BUILD.bazel | 13 + 17 files changed, 947 insertions(+), 302 deletions(-) create mode 100644 api/runway/messagequeue/BUILD.bazel create mode 100644 api/runway/messagequeue/README.md create mode 100644 api/runway/messagequeue/merge.go create mode 100644 api/runway/messagequeue/merge_test.go create mode 100644 api/runway/messagequeue/proto/BUILD.bazel create mode 100644 api/runway/messagequeue/proto/merge.proto create mode 100644 api/runway/messagequeue/protopb/BUILD.bazel create mode 100644 api/runway/messagequeue/protopb/merge.pb.go create mode 100644 api/runway/messagequeue/topics.go delete mode 100644 runway/core/topickey/BUILD.bazel delete mode 100644 runway/core/topickey/topickey.go delete mode 100644 runway/entity/BUILD.bazel delete mode 100644 runway/entity/merge.go delete mode 100644 runway/entity/merge_test.go diff --git a/CLAUDE.md b/CLAUDE.md index 8b749b38..e0aad576 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -36,9 +36,10 @@ request.Version = newVersion ``` submitqueue/ # repo root (Go module github.com/uber/submitqueue) -├── api/ # Wire contracts (proto) by domain/service -│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ -│ └── stovepipe/{gateway,orchestrator}/{proto,protopb}/ +├── api/ # Published wire contracts (cross-domain/external) +│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto) +│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/ +│ └── runway/messagequeue/ # external queue contracts (proto + protojson) ├── platform/ # SHARED cross-domain packages — no domain deps │ ├── errs/, metrics/, consumer/, http/ │ ├── base/ # SHARED entities (change/, messagequeue/, …) @@ -68,7 +69,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services. -The `api/` tree holds all wire contracts (proto definitions and their committed generated stubs), organized by `domain/service`: `api/{domain}/{service}/proto/` for `.proto` sources and `api/{domain}/{service}/protopb/` for generated Go. A service package may hold multiple `.proto` files — its RPC contract (`{service}.proto`) alongside messagequeue contracts (queue payload schemas) — all generating into the same `protopb/`. +The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`. ### Platform notes @@ -156,9 +157,10 @@ Paths follow the directory layout: shared packages live under `platform/` at the - RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`) - Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}` - Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb` +- Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue` - Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`) - Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`) -- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` +- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; internal pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` (external queue topic keys live with their contract package, e.g. `api/runway/messagequeue`) - Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`) - Shared entities: `github.com/uber/submitqueue/platform/base/{pkg}` (e.g. `.../platform/base/messagequeue`) - Shared extensions: `github.com/uber/submitqueue/platform/extension/{ext}[/{impl}]` (e.g. `.../platform/extension/messagequeue/mysql`) @@ -182,7 +184,13 @@ Generated proto files are committed. When modifying `.proto` files: 2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` into `api/{domain}/{service}/protopb/`) 3. Commit all generated files -To add a new `.proto` to a service (e.g. messagequeue contracts), drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package. +To add a new `.proto` to a service, drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package. + +### Message Queue Contracts + +Queue payloads are defined in **proto3** (`.proto` under `proto/`, generated Go in `protopb/` as the binding) and serialized as **protobuf JSON** (protojson) so the queue keeps storing self-describing JSON. Location follows audience: external/cross-domain contracts go under `api/{domain}/messagequeue/`; internal contracts (used only within the owning domain) go under `{domain}/core/messagequeue/`. Bazel `visibility` enforces the split — internal targets are domain-scoped, `api/` targets are public. See [doc/rfc/messagequeue-contract.md](doc/rfc/messagequeue-contract.md). + +Thin `protojson` helpers (`MergeRequestToBytes`/`MergeRequestFromBytes`, …) own the wire conventions: `UseProtoNames` (snake_case fields), UPPER_SNAKE enum values, int64-as-string, unknown fields discarded on read (additive evolution). The topic(s) carrying a message are declared on the message via the `topics` proto option — a `google.protobuf.MessageOptions` extension defined in `api/base/messagequeue`, the proto-native replacement for `x-topics`; a `Topics(msg)` reflection helper reads it. It is contract metadata, not the hot path — publish/consume still routes on `consumer.TopicKey` + `TopicRegistry`. The contract package owns both halves: the proto payload and the `TopicKey` constants for its topics. A contract test round-trips the payloads and asserts every topic key is bound to exactly one message. Shared field types (`Change`, `Strategy`) are shared protos under `api/base/{change,mergestrategy}`. `api/runway/messagequeue/` is the reference example. ### Naming Conventions diff --git a/Makefile b/Makefile index 49c110bf..1038e61d 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ GOIMPORTS_VERSION ?= v0.33.0 # (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A # package may hold multiple .proto files (e.g. an RPC contract plus messagequeue # contracts); all generated stubs land in the same protopb/ dir. -PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator +PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) diff --git a/api/runway/messagequeue/BUILD.bazel b/api/runway/messagequeue/BUILD.bazel new file mode 100644 index 00000000..301a0fb9 --- /dev/null +++ b/api/runway/messagequeue/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "messagequeue", + srcs = [ + "merge.go", + "topics.go", + ], + importpath = "github.com/uber/submitqueue/api/runway/messagequeue", + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/protopb", # keep + "//api/runway/messagequeue/protopb", # keep + "//platform/consumer", + "@org_golang_google_protobuf//encoding/protojson", + "@org_golang_google_protobuf//proto", + ], +) + +go_test( + name = "messagequeue_test", + srcs = ["merge_test.go"], + embed = [":messagequeue"], + deps = [ + "//api/base/change/protopb", + "//api/base/mergestrategy/protopb", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//proto", + ], +) diff --git a/api/runway/messagequeue/README.md b/api/runway/messagequeue/README.md new file mode 100644 index 00000000..f024ccfb --- /dev/null +++ b/api/runway/messagequeue/README.md @@ -0,0 +1,22 @@ +# Runway message queue contract + +The published, language-neutral contract for the merge queues Runway owns. A client — in any language — publishes a merge request and consumes the result without access to Runway's Go types or storage. See [the message queue contract RFC](../../../doc/rfc/messagequeue-contract.md) for the design. + +Payloads are defined as proto3 messages in [`proto/merge.proto`](proto/merge.proto) and generated into [`protopb/`](protopb); the proto is the authority and a non-Go client compiles against it directly. On the wire, payloads are serialized as protobuf JSON (`protojson`), so the queue keeps storing self-describing JSON. The Go helpers in this package (`MergeRequestToBytes`/`MergeRequestFromBytes` and the `MergeResult` counterparts) wrap `protojson` for Go callers; field names stay snake_case (`UseProtoNames`) and enums serialize as their UPPER_SNAKE value name. + +The shared field types `Change` and `MergeStrategy` come from `api/base/change` and `api/base/mergestrategy`, imported by the contract. + +## Topic keys + +The binding between a topic key and its payload lives in each message's `topic_keys` option (defined in `api/base/messagequeue`); `TopicKeys` reads it back by reflection. A topic key is a stable logical name, not a concrete wire topic — each implementer maps the key to whatever topic name its broker/queue requires. Our Go wiring maps it via `consumer.TopicRegistry`. + +| Message | Direction | Topic keys | +|---|---|---| +| `MergeRequest` | client → Runway | `merge-conflict-check`, `merge` | +| `MergeResult` | Runway → client | `merge-conflict-check-signal`, `merge-signal` | + +One message serves a queue pair because a merge-conflict check is a dry run of a merge: Runway applies the same ordered steps onto the same target branch, and the topic key the request arrives on decides whether it commits the result and reports the produced revisions. A request on `merge-conflict-check` is a dry run; a request on `merge` commits. + +## Evolution + +Contract changes are additive-only: add new fields; never remove, rename, repurpose, or retype an existing field, and never reuse a field number. protojson ignores unknown fields on read and omits zero-valued fields on write, so a new optional field is backward-compatible in both directions. diff --git a/api/runway/messagequeue/merge.go b/api/runway/messagequeue/merge.go new file mode 100644 index 00000000..83661e40 --- /dev/null +++ b/api/runway/messagequeue/merge.go @@ -0,0 +1,107 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package messagequeue holds Runway's external message-queue contract: the wire +// payloads for the merge queues Runway owns, defined by the proto files in +// proto/ and generated into protopb/. The proto is the language-neutral +// authority; the generated Go types in protopb are the binding for Go callers. +// +// Payloads are serialized as protobuf JSON (protojson), not binary, so the +// MySQL-backed queue keeps storing self-describing JSON. The topic key that +// carries each payload is declared on the message itself via the topic_keys +// proto option (see api/base/messagequeue); TopicKeys reads it back. +// +// One contract serves two queue pairs because a merge-conflict check is a dry +// run of a merge: Runway applies the same ordered steps onto the same target +// branch, and the only difference is whether it commits the result and reports +// the revisions it produced. The topic key a request arrives on encodes that choice +// — the merge-conflict-check pair for a dry run, the merge pair for a +// committing merge — so MergeRequest and MergeResult are identical on both. +package messagequeue + +import ( + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb" + "github.com/uber/submitqueue/api/runway/messagequeue/protopb" +) + +// Wire payload types. These alias the generated protobuf bindings so callers +// reference the contract through this curated package rather than protopb. +type ( + // MergeRequest is the payload a client publishes to one of Runway's merge + // queues: the merge-conflict-check topic for a dry-run check, the merge + // topic for a committing merge. + MergeRequest = protopb.MergeRequest + // MergeStep is one step of an ordered merge: a single set of change(s) + // applied with a strategy. + MergeStep = protopb.MergeStep + // MergeResult is the payload Runway publishes to the corresponding signal + // queue once a request completes. + MergeResult = protopb.MergeResult + // StepResult reports what happened to a single MergeStep. + StepResult = protopb.StepResult +) + +// marshalOpts keeps the JSON field names identical to the proto field names +// (snake_case), so the wire shape matches the declared contract rather than +// protojson's default lowerCamelCase. Zero-valued fields are omitted. +var marshalOpts = protojson.MarshalOptions{UseProtoNames: true} + +// unmarshalOpts tolerates unknown fields so an additive contract change (a new +// field a producer sends but this consumer does not yet know) is ignored rather +// than rejected. +var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true} + +// MergeRequestToBytes serializes a MergeRequest to protojson bytes for the queue +// payload. +func MergeRequestToBytes(r *MergeRequest) ([]byte, error) { + return marshalOpts.Marshal(r) +} + +// MergeRequestFromBytes deserializes a MergeRequest from protojson bytes. +func MergeRequestFromBytes(data []byte) (*MergeRequest, error) { + var req MergeRequest + err := unmarshalOpts.Unmarshal(data, &req) + return &req, err +} + +// MergeResultToBytes serializes a MergeResult to protojson bytes for the queue +// payload. +func MergeResultToBytes(r *MergeResult) ([]byte, error) { + return marshalOpts.Marshal(r) +} + +// MergeResultFromBytes deserializes a MergeResult from protojson bytes. +func MergeResultFromBytes(data []byte) (*MergeResult, error) { + var res MergeResult + err := unmarshalOpts.Unmarshal(data, &res) + return &res, err +} + +// TopicKeys returns the stable logical topic keys bound to a message via the +// topic_keys proto option — not concrete wire names; a caller maps each key to +// its backend's topic name. Returns nil for a message that declares no keys. +func TopicKeys(m proto.Message) []string { + opts := m.ProtoReflect().Descriptor().Options() + if opts == nil { + return nil + } + keys, ok := proto.GetExtension(opts, basemqpb.E_TopicKeys).([]string) + if !ok { + return nil + } + return keys +} diff --git a/api/runway/messagequeue/merge_test.go b/api/runway/messagequeue/merge_test.go new file mode 100644 index 00000000..4f5167e5 --- /dev/null +++ b/api/runway/messagequeue/merge_test.go @@ -0,0 +1,133 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + changepb "github.com/uber/submitqueue/api/base/change/protopb" + strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb" +) + +func TestMergeRequestRoundTrip(t *testing.T) { + req := &MergeRequest{ + Id: "queue-a/42", + QueueName: "queue-a", + Steps: []*MergeStep{ + { + StepId: "queue-a/1", + Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/1/0123456789abcdef0123456789abcdef01234567"}}}, + Strategy: strategypb.Strategy_REBASE, + }, + { + StepId: "queue-a/2", + Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/2/89abcdef0123456789abcdef0123456789abcdef"}}}, + Strategy: strategypb.Strategy_MERGE, + }, + }, + } + + data, err := MergeRequestToBytes(req) + require.NoError(t, err) + + got, err := MergeRequestFromBytes(data) + require.NoError(t, err) + assert.True(t, proto.Equal(req, got), "round-tripped MergeRequest should equal the original") +} + +func TestMergeResultRoundTrip(t *testing.T) { + // A committing merge reports the revisions each step produced on the target; + // a dry-run check leaves output_ids empty and reports a per-step reason on + // failure. Both shapes share the one MergeResult contract. + cases := map[string]*MergeResult{ + "merged with produced revisions": { + Id: "queue-a/42", + Success: true, + Steps: []*StepResult{ + {StepId: "queue-a/1", OutputIds: []string{"0123456789abcdef0123456789abcdef01234567"}}, + }, + }, + "failed with per-step reason": { + Id: "queue-a/42", + Success: false, + Reason: "conflict in foo.go", + Steps: []*StepResult{{StepId: "queue-a/2", Reason: "conflict in foo.go"}}, + }, + "minimal": { + Id: "queue-a/42", + Success: true, + }, + } + + for name, res := range cases { + t.Run(name, func(t *testing.T) { + data, err := MergeResultToBytes(res) + require.NoError(t, err) + + got, err := MergeResultFromBytes(data) + require.NoError(t, err) + assert.True(t, proto.Equal(res, got), "round-tripped MergeResult should equal the original") + }) + } +} + +// TestWireFormat locks the two protojson encoding decisions the contract relies +// on: snake_case field names (UseProtoNames) and proto-conventional UPPER_SNAKE +// enum values on the wire. +func TestWireFormat(t *testing.T) { + data, err := MergeRequestToBytes(&MergeRequest{ + Id: "queue-a/42", + QueueName: "queue-a", + Steps: []*MergeStep{{StepId: "queue-a/1", Strategy: strategypb.Strategy_SQUASH_REBASE}}, + }) + require.NoError(t, err) + + assert.Contains(t, string(data), `"queue_name"`, "fields must serialize as snake_case") + assert.Contains(t, string(data), `"SQUASH_REBASE"`, "enums must serialize as their UPPER_SNAKE value name") +} + +// TestTopicKeysBindEveryTopicKey is the topic-binding drift guard: every Runway +// topic key is carried by exactly one message's topic_keys option, and no +// topic_keys option names an unknown key. +func TestTopicKeysBindEveryTopicKey(t *testing.T) { + bound := map[string]int{} + for _, m := range []proto.Message{&MergeRequest{}, &MergeResult{}} { + keys := TopicKeys(m) + require.NotEmpty(t, keys, "message must declare a non-empty topic_keys option") + for _, key := range keys { + bound[key]++ + } + } + + keys := []TopicKey{ + TopicKeyMergeConflictCheck, + TopicKeyMergeConflictCheckSignal, + TopicKeyMerge, + TopicKeyMergeSignal, + } + + valid := map[string]bool{} + for _, k := range keys { + valid[k.String()] = true + assert.Equalf(t, 1, bound[k.String()], "topic key %q must be bound to exactly one message via the topic_keys option", k) + } + for key := range bound { + assert.Truef(t, valid[key], "topic_keys option names unknown key %q", key) + } +} diff --git a/api/runway/messagequeue/proto/BUILD.bazel b/api/runway/messagequeue/proto/BUILD.bazel new file mode 100644 index 00000000..56b0ce76 --- /dev/null +++ b/api/runway/messagequeue/proto/BUILD.bazel @@ -0,0 +1,51 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +exports_files( + ["merge.proto"], + visibility = ["//tool/proto:__pkg__"], +) + +proto_library( + name = "mergepb_proto", + srcs = ["merge.proto"], + visibility = ["//visibility:public"], + deps = [ + "//api/base/change/proto:changepb_proto", + "//api/base/mergestrategy/proto:mergestrategypb_proto", + "//api/base/messagequeue/proto:messagequeuepb_proto", + ], +) + +# keep +go_proto_library( + name = "mergepb_go_proto", + compilers = [ + "@rules_go//proto:go_proto", + "@rules_go//proto:go_grpc_v2", + ], + importpath = "github.com/uber/submitqueue/api/runway/messagequeue/proto", + proto = ":mergepb_proto", + visibility = ["//visibility:public"], + # keep + deps = [ + "//api/base/change/proto:changepb_go_proto", + "//api/base/mergestrategy/proto:mergestrategypb_go_proto", + "//api/base/messagequeue/proto:messagequeuepb_go_proto", + ], +) + +go_library( + name = "proto", + embed = [":mergepb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/messagequeue/proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "protopb", + embed = [":mergepb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/messagequeue/protopb", + visibility = ["//visibility:public"], +) diff --git a/api/runway/messagequeue/proto/merge.proto b/api/runway/messagequeue/proto/merge.proto new file mode 100644 index 00000000..316f4841 --- /dev/null +++ b/api/runway/messagequeue/proto/merge.proto @@ -0,0 +1,101 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package uber.runway.messagequeue; + +import "api/base/change/proto/change.proto"; +import "api/base/mergestrategy/proto/mergestrategy.proto"; +import "api/base/messagequeue/proto/messagequeue.proto"; + +option go_package = "github.com/uber/submitqueue/api/runway/messagequeue/protopb"; +option java_multiple_files = true; +option java_outer_classname = "MergeProto"; +option java_package = "com.uber.submitqueue.runway.messagequeue"; + +// MergeStep is one step of an ordered merge: a single set of change(s) applied +// with a strategy. Runway applies the steps of a request in order on top of the +// target branch; the ordering encodes the base-layering (earlier steps are the +// in-flight base, the last step is the candidate). +message MergeStep { + // step_id is an opaque, caller-assigned identifier for this step. Runway + // treats it as an attribution token only -- it echoes it back per-step in + // StepResult so a multi-step result is attributable -- and never interprets + // its contents. + string step_id = 1; + // changes are the code change(s) to apply for this step. + repeated uber.base.change.Change changes = 2; + // strategy is how this step's changes are integrated into the target branch. + uber.base.mergestrategy.Strategy strategy = 3; +} + +// MergeRequest is the payload a client publishes to one of Runway's merge +// queues: the merge-conflict-check topic for a dry-run check, the merge topic +// for a committing merge. The id is owned by the client so it can record the +// in-flight work before publishing and correlate the asynchronous result; +// Runway echoes it back unchanged. +message MergeRequest { + option (uber.base.messagequeue.topic_keys) = "merge-conflict-check"; + option (uber.base.messagequeue.topic_keys) = "merge"; + + // id is the client-owned correlation id for this request (one per request). + // Runway echoes it back on the result unchanged. + string id = 1; + // queue_name is the caller-provided queue name the request belongs to. + // Runway resolves the target branch and provider config per-queue from this + // name; no target ref is passed. + string queue_name = 2; + // steps is the ordered application sequence: in-flight steps first, the + // candidate last. A single-element list expresses "candidate vs target + // branch". + repeated MergeStep steps = 3; +} + +// StepResult reports what happened to a single MergeStep, so a multi-step result +// is attributable to the step that produced (or failed to produce) it. +message StepResult { + // step_id echoes the step_id of the step this result is for. + string step_id = 1; + // output_ids are the VCS-neutral identifiers of the revisions this step + // produced on the target branch -- a git commit SHA, a Mercurial changeset + // hash, a Subversion revision number, a Perforce changelist, and so on. + // Empty for a dry-run check, for a change already present on the target, or + // for a step that failed to apply. + repeated string output_ids = 2; + // reason is a human-readable explanation when the step failed to apply. + // Empty on success. + string reason = 3; +} + +// MergeResult is the payload Runway publishes to the corresponding signal queue +// (merge-conflict-check-signal for a check, merge-signal for a merge) once a +// request completes. +message MergeResult { + option (uber.base.messagequeue.topic_keys) = "merge-conflict-check-signal"; + option (uber.base.messagequeue.topic_keys) = "merge-signal"; + + // id echoes the client-owned correlation id from the request. + string id = 1; + // success is true if the whole ordered step sequence applied cleanly: + // mergeable for a dry-run check, merged for a committing merge. + bool success = 2; + // reason is a human-readable explanation when success is false. Empty on + // success. + string reason = 3; + // steps optionally reports per-step outcomes, in request order. A committing + // merge populates each step's output_ids with the revisions it produced; a + // dry-run check leaves them empty. + repeated StepResult steps = 4; +} diff --git a/api/runway/messagequeue/protopb/BUILD.bazel b/api/runway/messagequeue/protopb/BUILD.bazel new file mode 100644 index 00000000..838c33be --- /dev/null +++ b/api/runway/messagequeue/protopb/BUILD.bazel @@ -0,0 +1,15 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "protopb", + srcs = ["merge.pb.go"], + importpath = "github.com/uber/submitqueue/api/runway/messagequeue/protopb", + visibility = ["//visibility:public"], + deps = [ + "//api/base/change/protopb", + "//api/base/mergestrategy/protopb", + "//api/base/messagequeue/protopb", # keep + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + ], +) diff --git a/api/runway/messagequeue/protopb/merge.pb.go b/api/runway/messagequeue/protopb/merge.pb.go new file mode 100644 index 00000000..f1a47c1c --- /dev/null +++ b/api/runway/messagequeue/protopb/merge.pb.go @@ -0,0 +1,417 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: merge.proto + +package protopb + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + protopb "github.com/uber/submitqueue/api/base/change/protopb" + protopb1 "github.com/uber/submitqueue/api/base/mergestrategy/protopb" + _ "github.com/uber/submitqueue/api/base/messagequeue/protopb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// MergeStep is one step of an ordered merge: a single set of change(s) applied +// with a strategy. Runway applies the steps of a request in order on top of the +// target branch; the ordering encodes the base-layering (earlier steps are the +// in-flight base, the last step is the candidate). +type MergeStep struct { + state protoimpl.MessageState `protogen:"open.v1"` + // step_id is an opaque, caller-assigned identifier for this step. Runway + // treats it as an attribution token only -- it echoes it back per-step in + // StepResult so a multi-step result is attributable -- and never interprets + // its contents. + StepId string `protobuf:"bytes,1,opt,name=step_id,json=stepId,proto3" json:"step_id,omitempty"` + // changes are the code change(s) to apply for this step. + Changes []*protopb.Change `protobuf:"bytes,2,rep,name=changes,proto3" json:"changes,omitempty"` + // strategy is how this step's changes are integrated into the target branch. + Strategy protopb1.Strategy `protobuf:"varint,3,opt,name=strategy,proto3,enum=uber.base.mergestrategy.Strategy" json:"strategy,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MergeStep) Reset() { + *x = MergeStep{} + mi := &file_merge_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MergeStep) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MergeStep) ProtoMessage() {} + +func (x *MergeStep) ProtoReflect() protoreflect.Message { + mi := &file_merge_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MergeStep.ProtoReflect.Descriptor instead. +func (*MergeStep) Descriptor() ([]byte, []int) { + return file_merge_proto_rawDescGZIP(), []int{0} +} + +func (x *MergeStep) GetStepId() string { + if x != nil { + return x.StepId + } + return "" +} + +func (x *MergeStep) GetChanges() []*protopb.Change { + if x != nil { + return x.Changes + } + return nil +} + +func (x *MergeStep) GetStrategy() protopb1.Strategy { + if x != nil { + return x.Strategy + } + return protopb1.Strategy(0) +} + +// MergeRequest is the payload a client publishes to one of Runway's merge +// queues: the merge-conflict-check topic for a dry-run check, the merge topic +// for a committing merge. The id is owned by the client so it can record the +// in-flight work before publishing and correlate the asynchronous result; +// Runway echoes it back unchanged. +type MergeRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // id is the client-owned correlation id for this request (one per request). + // Runway echoes it back on the result unchanged. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // queue_name is the caller-provided queue name the request belongs to. + // Runway resolves the target branch and provider config per-queue from this + // name; no target ref is passed. + QueueName string `protobuf:"bytes,2,opt,name=queue_name,json=queueName,proto3" json:"queue_name,omitempty"` + // steps is the ordered application sequence: in-flight steps first, the + // candidate last. A single-element list expresses "candidate vs target + // branch". + Steps []*MergeStep `protobuf:"bytes,3,rep,name=steps,proto3" json:"steps,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MergeRequest) Reset() { + *x = MergeRequest{} + mi := &file_merge_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MergeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MergeRequest) ProtoMessage() {} + +func (x *MergeRequest) ProtoReflect() protoreflect.Message { + mi := &file_merge_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MergeRequest.ProtoReflect.Descriptor instead. +func (*MergeRequest) Descriptor() ([]byte, []int) { + return file_merge_proto_rawDescGZIP(), []int{1} +} + +func (x *MergeRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *MergeRequest) GetQueueName() string { + if x != nil { + return x.QueueName + } + return "" +} + +func (x *MergeRequest) GetSteps() []*MergeStep { + if x != nil { + return x.Steps + } + return nil +} + +// StepResult reports what happened to a single MergeStep, so a multi-step result +// is attributable to the step that produced (or failed to produce) it. +type StepResult struct { + state protoimpl.MessageState `protogen:"open.v1"` + // step_id echoes the step_id of the step this result is for. + StepId string `protobuf:"bytes,1,opt,name=step_id,json=stepId,proto3" json:"step_id,omitempty"` + // output_ids are the VCS-neutral identifiers of the revisions this step + // produced on the target branch -- a git commit SHA, a Mercurial changeset + // hash, a Subversion revision number, a Perforce changelist, and so on. + // Empty for a dry-run check, for a change already present on the target, or + // for a step that failed to apply. + OutputIds []string `protobuf:"bytes,2,rep,name=output_ids,json=outputIds,proto3" json:"output_ids,omitempty"` + // reason is a human-readable explanation when the step failed to apply. + // Empty on success. + Reason string `protobuf:"bytes,3,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StepResult) Reset() { + *x = StepResult{} + mi := &file_merge_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StepResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StepResult) ProtoMessage() {} + +func (x *StepResult) ProtoReflect() protoreflect.Message { + mi := &file_merge_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StepResult.ProtoReflect.Descriptor instead. +func (*StepResult) Descriptor() ([]byte, []int) { + return file_merge_proto_rawDescGZIP(), []int{2} +} + +func (x *StepResult) GetStepId() string { + if x != nil { + return x.StepId + } + return "" +} + +func (x *StepResult) GetOutputIds() []string { + if x != nil { + return x.OutputIds + } + return nil +} + +func (x *StepResult) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +// MergeResult is the payload Runway publishes to the corresponding signal queue +// (merge-conflict-check-signal for a check, merge-signal for a merge) once a +// request completes. +type MergeResult struct { + state protoimpl.MessageState `protogen:"open.v1"` + // id echoes the client-owned correlation id from the request. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // success is true if the whole ordered step sequence applied cleanly: + // mergeable for a dry-run check, merged for a committing merge. + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` + // reason is a human-readable explanation when success is false. Empty on + // success. + Reason string `protobuf:"bytes,3,opt,name=reason,proto3" json:"reason,omitempty"` + // steps optionally reports per-step outcomes, in request order. A committing + // merge populates each step's output_ids with the revisions it produced; a + // dry-run check leaves them empty. + Steps []*StepResult `protobuf:"bytes,4,rep,name=steps,proto3" json:"steps,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MergeResult) Reset() { + *x = MergeResult{} + mi := &file_merge_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MergeResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MergeResult) ProtoMessage() {} + +func (x *MergeResult) ProtoReflect() protoreflect.Message { + mi := &file_merge_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MergeResult.ProtoReflect.Descriptor instead. +func (*MergeResult) Descriptor() ([]byte, []int) { + return file_merge_proto_rawDescGZIP(), []int{3} +} + +func (x *MergeResult) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *MergeResult) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *MergeResult) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +func (x *MergeResult) GetSteps() []*StepResult { + if x != nil { + return x.Steps + } + return nil +} + +var File_merge_proto protoreflect.FileDescriptor + +const file_merge_proto_rawDesc = "" + + "\n" + + "\vmerge.proto\x12\x18uber.runway.messagequeue\x1a\"api/base/change/proto/change.proto\x1a0api/base/mergestrategy/proto/mergestrategy.proto\x1a.api/base/messagequeue/proto/messagequeue.proto\"\x97\x01\n" + + "\tMergeStep\x12\x17\n" + + "\astep_id\x18\x01 \x01(\tR\x06stepId\x122\n" + + "\achanges\x18\x02 \x03(\v2\x18.uber.base.change.ChangeR\achanges\x12=\n" + + "\bstrategy\x18\x03 \x01(\x0e2!.uber.base.mergestrategy.StrategyR\bstrategy\"\x9b\x01\n" + + "\fMergeRequest\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x1d\n" + + "\n" + + "queue_name\x18\x02 \x01(\tR\tqueueName\x129\n" + + "\x05steps\x18\x03 \x03(\v2#.uber.runway.messagequeue.MergeStepR\x05steps:!\x8a\xb5\x18\x14merge-conflict-check\x8a\xb5\x18\x05merge\"\\\n" + + "\n" + + "StepResult\x12\x17\n" + + "\astep_id\x18\x01 \x01(\tR\x06stepId\x12\x1d\n" + + "\n" + + "output_ids\x18\x02 \x03(\tR\toutputIds\x12\x16\n" + + "\x06reason\x18\x03 \x01(\tR\x06reason\"\xbc\x01\n" + + "\vMergeResult\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x18\n" + + "\asuccess\x18\x02 \x01(\bR\asuccess\x12\x16\n" + + "\x06reason\x18\x03 \x01(\tR\x06reason\x12:\n" + + "\x05steps\x18\x04 \x03(\v2$.uber.runway.messagequeue.StepResultR\x05steps:/\x8a\xb5\x18\x1bmerge-conflict-check-signal\x8a\xb5\x18\fmerge-signalBu\n" + + "(com.uber.submitqueue.runway.messagequeueB\n" + + "MergeProtoP\x01Z;github.com/uber/submitqueue/api/runway/messagequeue/protopbb\x06proto3" + +var ( + file_merge_proto_rawDescOnce sync.Once + file_merge_proto_rawDescData []byte +) + +func file_merge_proto_rawDescGZIP() []byte { + file_merge_proto_rawDescOnce.Do(func() { + file_merge_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_merge_proto_rawDesc), len(file_merge_proto_rawDesc))) + }) + return file_merge_proto_rawDescData +} + +var file_merge_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_merge_proto_goTypes = []any{ + (*MergeStep)(nil), // 0: uber.runway.messagequeue.MergeStep + (*MergeRequest)(nil), // 1: uber.runway.messagequeue.MergeRequest + (*StepResult)(nil), // 2: uber.runway.messagequeue.StepResult + (*MergeResult)(nil), // 3: uber.runway.messagequeue.MergeResult + (*protopb.Change)(nil), // 4: uber.base.change.Change + (protopb1.Strategy)(0), // 5: uber.base.mergestrategy.Strategy +} +var file_merge_proto_depIdxs = []int32{ + 4, // 0: uber.runway.messagequeue.MergeStep.changes:type_name -> uber.base.change.Change + 5, // 1: uber.runway.messagequeue.MergeStep.strategy:type_name -> uber.base.mergestrategy.Strategy + 0, // 2: uber.runway.messagequeue.MergeRequest.steps:type_name -> uber.runway.messagequeue.MergeStep + 2, // 3: uber.runway.messagequeue.MergeResult.steps:type_name -> uber.runway.messagequeue.StepResult + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_merge_proto_init() } +func file_merge_proto_init() { + if File_merge_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_merge_proto_rawDesc), len(file_merge_proto_rawDesc)), + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_merge_proto_goTypes, + DependencyIndexes: file_merge_proto_depIdxs, + MessageInfos: file_merge_proto_msgTypes, + }.Build() + File_merge_proto = out.File + file_merge_proto_goTypes = nil + file_merge_proto_depIdxs = nil +} diff --git a/api/runway/messagequeue/topics.go b/api/runway/messagequeue/topics.go new file mode 100644 index 00000000..062fca43 --- /dev/null +++ b/api/runway/messagequeue/topics.go @@ -0,0 +1,42 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import "github.com/uber/submitqueue/platform/consumer" + +// TopicKey is the typed identifier used to look up a queue backend, topic name, +// and subscription config in a consumer.TopicRegistry. The constants below are +// the wire topic names a client uses to publish to / consume from Runway's merge +// queues; they are the same strings each message lists in its topics option. +type TopicKey = consumer.TopicKey + +const ( + // TopicKeyMergeConflictCheck carries dry-run merge-conflict check requests. + // A client publishes a MergeRequest here; Runway attempts the merge without + // committing and reports only whether it was mergeable. + TopicKeyMergeConflictCheck TopicKey = "merge-conflict-check" + // TopicKeyMergeConflictCheckSignal carries merge-conflict check results. + // Runway publishes a MergeResult here (with no produced revisions); the + // requesting client consumes it. + TopicKeyMergeConflictCheckSignal TopicKey = "merge-conflict-check-signal" + // TopicKeyMerge carries committing merge requests. A client publishes a + // MergeRequest here; Runway applies the steps, commits the result, and + // reports the revisions it produced. + TopicKeyMerge TopicKey = "merge" + // TopicKeyMergeSignal carries committing merge results. Runway publishes a + // MergeResult here (with the produced revisions populated); the requesting + // client consumes it. + TopicKeyMergeSignal TopicKey = "merge-signal" +) diff --git a/runway/core/topickey/BUILD.bazel b/runway/core/topickey/BUILD.bazel deleted file mode 100644 index 94163f0d..00000000 --- a/runway/core/topickey/BUILD.bazel +++ /dev/null @@ -1,9 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "topickey", - srcs = ["topickey.go"], - importpath = "github.com/uber/submitqueue/runway/core/topickey", - visibility = ["//visibility:public"], - deps = ["//platform/consumer"], -) diff --git a/runway/core/topickey/topickey.go b/runway/core/topickey/topickey.go deleted file mode 100644 index ede0b83a..00000000 --- a/runway/core/topickey/topickey.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package topickey defines the Runway-owned queue identifiers. Runway owns the -// merge queues — a dry-run merge-conflict check pair and a committing merge -// pair, both carrying the shared entity.MergeRequest/MergeResult contract. -// Other services (e.g. SubmitQueue) import these keys to publish onto / consume -// from them. -package topickey - -import "github.com/uber/submitqueue/platform/consumer" - -// TopicKey is the shared pipeline stage identifier type. -type TopicKey = consumer.TopicKey - -const ( - // TopicKeyMergeConflictCheck is the runway-owned queue that carries dry-run - // merge-conflict check requests. A client publishes a full - // entity.MergeRequest here; runway consumes it, attempts the merge without - // committing, and reports only whether it was mergeable. - TopicKeyMergeConflictCheck TopicKey = "merge-conflict-checker" - // TopicKeyMergeConflictCheckSignal is the runway-owned queue that carries - // merge-conflict check results. Runway publishes a full entity.MergeResult - // here (with no produced revisions); the requesting client consumes it. - TopicKeyMergeConflictCheckSignal TopicKey = "merge-conflict-checker-signal" - // TopicKeyMerge is the runway-owned queue that carries committing merge - // requests. A client publishes a full entity.MergeRequest here; runway - // consumes it, applies the steps, commits the result, and reports the - // revisions it produced. - TopicKeyMerge TopicKey = "merger" - // TopicKeyMergeSignal is the runway-owned queue that carries committing - // merge results. Runway publishes a full entity.MergeResult here (with the - // produced revisions populated); the requesting client consumes it. - TopicKeyMergeSignal TopicKey = "merger-signal" -) diff --git a/runway/entity/BUILD.bazel b/runway/entity/BUILD.bazel deleted file mode 100644 index d8b169cb..00000000 --- a/runway/entity/BUILD.bazel +++ /dev/null @@ -1,24 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "entity", - srcs = ["merge.go"], - importpath = "github.com/uber/submitqueue/runway/entity", - visibility = ["//visibility:public"], - deps = [ - "//platform/base/change", - "//platform/base/mergestrategy", - ], -) - -go_test( - name = "entity_test", - srcs = ["merge_test.go"], - embed = [":entity"], - deps = [ - "//platform/base/change", - "//platform/base/mergestrategy", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - ], -) diff --git a/runway/entity/merge.go b/runway/entity/merge.go deleted file mode 100644 index ebeffe3c..00000000 --- a/runway/entity/merge.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package entity holds Runway's domain entities, including the wire contract for -// the merge queues that Runway owns. The contract crosses a service boundary (a -// calling service cannot read Runway's storage and vice versa), so these -// payloads carry the full data needed to perform a merge attempt rather than -// opaque entity IDs. -// -// One contract serves two queue pairs because a merge-conflict check is a dry -// run of a merge: Runway applies the same ordered Steps onto the same target -// branch, and the only difference is whether it commits the result and reports -// the revisions it produced. The queue a request arrives on encodes that choice -// — the merge-conflict-checker pair for a dry run, the merger pair for a -// committing merge — so MergeRequest and MergeResult are identical on both. -package entity - -import ( - "encoding/json" - - "github.com/uber/submitqueue/platform/base/change" - "github.com/uber/submitqueue/platform/base/mergestrategy" -) - -// MergeStep is one step of an ordered merge: a single set of change(s) applied -// with a strategy. Runway applies the steps of a request in order on top of the -// target branch; the ordering encodes the base-layering (earlier steps are the -// in-flight base, the last step is the candidate). -type MergeStep struct { - // StepID is an opaque, caller-assigned identifier for this step. Runway - // treats it as an attribution token only — it echoes it back per-step in - // StepResult so a multi-step result is attributable — and never interprets - // its contents. (A caller might use, for example, its own request id here.) - StepID string `json:"step_id"` - // Changes are the code change(s) to apply for this step (provider URIs with - // head commit SHAs; see entity/change.Change). - Changes []change.Change `json:"changes"` - // Strategy is how this step's changes are integrated into the target branch. - Strategy mergestrategy.MergeStrategy `json:"strategy"` -} - -// MergeRequest is the payload a client publishes to one of Runway's merge -// queues: TopicKeyMergeConflictCheck for a dry-run check, TopicKeyMerge for a -// committing merge. The ID is owned by the client so it can record the -// in-flight work before publishing and correlate the asynchronous result; -// runway echoes it back unchanged. -type MergeRequest struct { - // ID is the client-owned correlation id for this request (one per request). - // Runway echoes it back on the result unchanged. - ID string `json:"id"` - // QueueName is the caller-provided queue name the request belongs to. Runway - // resolves the target branch and provider config per-queue from this name; - // no target ref is passed. - QueueName string `json:"queue_name"` - // Steps is the ordered application sequence: in-flight steps first, the - // candidate last. A single-element slice expresses "candidate vs target - // branch". - Steps []MergeStep `json:"steps"` -} - -// ToBytes serializes the MergeRequest to JSON bytes for the queue payload. -func (r MergeRequest) ToBytes() ([]byte, error) { - return json.Marshal(r) -} - -// MergeRequestFromBytes deserializes a MergeRequest from JSON bytes. -func MergeRequestFromBytes(data []byte) (MergeRequest, error) { - var req MergeRequest - err := json.Unmarshal(data, &req) - return req, err -} - -// StepResult reports what happened to a single MergeStep, so a multi-step result -// is attributable to the step that produced (or failed to produce) it. -type StepResult struct { - // StepID echoes the StepID of the step this result is for (see MergeStep.StepID). - StepID string `json:"step_id"` - // OutputIDs are the VCS-neutral identifiers of the revisions this step - // produced on the target branch — a git commit SHA, a Mercurial changeset - // hash, a Subversion revision number, a Perforce changelist, and so on — - // opaque to the caller. Empty for a dry-run check (which produces nothing), - // for a change already present on the target, or for a step that failed to - // apply. - OutputIDs []string `json:"output_ids,omitempty"` - // Reason is a human-readable explanation when the step failed to apply. - // Empty on success. - Reason string `json:"reason,omitempty"` -} - -// MergeResult is the payload runway publishes to the corresponding signal queue -// (TopicKeyMergeConflictCheckSignal for a check, TopicKeyMergeSignal for a -// merge) once a request completes. -type MergeResult struct { - // ID echoes the client-owned correlation id from the request. - ID string `json:"id"` - // Success is true if the whole ordered step sequence applied cleanly: - // mergeable for a dry-run check, merged for a committing merge. - Success bool `json:"success"` - // Reason is a human-readable explanation when Success is false. Empty on success. - Reason string `json:"reason,omitempty"` - // Steps optionally reports per-step outcomes, in request order. A committing - // merge populates each step's OutputIDs with the revisions it produced; a - // dry-run check leaves them empty. - Steps []StepResult `json:"steps,omitempty"` -} - -// ToBytes serializes the MergeResult to JSON bytes for the queue payload. -func (r MergeResult) ToBytes() ([]byte, error) { - return json.Marshal(r) -} - -// MergeResultFromBytes deserializes a MergeResult from JSON bytes. -func MergeResultFromBytes(data []byte) (MergeResult, error) { - var res MergeResult - err := json.Unmarshal(data, &res) - return res, err -} diff --git a/runway/entity/merge_test.go b/runway/entity/merge_test.go deleted file mode 100644 index 2865f7b6..00000000 --- a/runway/entity/merge_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber/submitqueue/platform/base/change" - "github.com/uber/submitqueue/platform/base/mergestrategy" -) - -func TestMergeRequestRoundTrip(t *testing.T) { - req := MergeRequest{ - ID: "queue-a/42", - QueueName: "queue-a", - Steps: []MergeStep{ - { - StepID: "queue-a/1", - Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/" + "0123456789abcdef0123456789abcdef01234567"}}}, - Strategy: mergestrategy.MergeStrategyRebase, - }, - { - StepID: "queue-a/2", - Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/2/" + "89abcdef0123456789abcdef0123456789abcdef"}}}, - Strategy: mergestrategy.MergeStrategyMerge, - }, - }, - } - - data, err := req.ToBytes() - require.NoError(t, err) - - got, err := MergeRequestFromBytes(data) - require.NoError(t, err) - assert.Equal(t, req, got) -} - -func TestMergeResultRoundTrip(t *testing.T) { - // A committing merge reports the revisions each step produced on the target; - // a dry-run check leaves OutputIDs empty and reports a per-step reason on - // failure. Both shapes share the one MergeResult contract. - t.Run("merged with produced revisions", func(t *testing.T) { - res := MergeResult{ - ID: "queue-a/42", - Success: true, - Steps: []StepResult{ - {StepID: "queue-a/1", OutputIDs: []string{"0123456789abcdef0123456789abcdef01234567"}}, - }, - } - - data, err := res.ToBytes() - require.NoError(t, err) - - got, err := MergeResultFromBytes(data) - require.NoError(t, err) - assert.Equal(t, res, got) - }) - - t.Run("failed with per-step reason", func(t *testing.T) { - res := MergeResult{ - ID: "queue-a/42", - Success: false, - Reason: "conflict in foo.go", - Steps: []StepResult{{StepID: "queue-a/2", Reason: "conflict in foo.go"}}, - } - - data, err := res.ToBytes() - require.NoError(t, err) - - got, err := MergeResultFromBytes(data) - require.NoError(t, err) - assert.Equal(t, res, got) - }) -} diff --git a/tool/proto/BUILD.bazel b/tool/proto/BUILD.bazel index f7e9f0ba..9567ce10 100644 --- a/tool/proto/BUILD.bazel +++ b/tool/proto/BUILD.bazel @@ -23,6 +23,18 @@ go_proto_generated_files( out_dir = "api_base_messagequeue", ) +go_proto_generated_files( + name = "api_runway_messagequeue", + srcs = ["//api/runway/messagequeue/proto:merge.proto"], + gen_services = False, + imports = [ + "//api/base/change/proto:change.proto", + "//api/base/mergestrategy/proto:mergestrategy.proto", + "//api/base/messagequeue/proto:messagequeue.proto", + ], + out_dir = "api_runway_messagequeue", +) + go_proto_generated_files( name = "api_submitqueue_gateway", srcs = ["//api/submitqueue/gateway/proto:gateway.proto"], @@ -58,6 +70,7 @@ filegroup( ":api_base_change", ":api_base_mergestrategy", ":api_base_messagequeue", + ":api_runway_messagequeue", ":api_stovepipe_gateway", ":api_stovepipe_orchestrator", ":api_submitqueue_gateway",