Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ request.Version = newVersion

```
submitqueue/ # repo root (Go module github.com/uber/submitqueue)
├── api/ # Wire contracts (proto) by domain/service
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/
│ └── stovepipe/{gateway,orchestrator}/{proto,protopb}/
├── api/ # Published wire contracts (cross-domain/external)
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto)
│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/
│ └── runway/messagequeue/ # external queue contracts (proto + protojson)
├── platform/ # SHARED cross-domain packages — no domain deps
│ ├── errs/, metrics/, consumer/, http/
│ ├── base/ # SHARED entities (change/, messagequeue/, …)
Expand Down Expand Up @@ -68,7 +69,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi

The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services.

The `api/` tree holds all wire contracts (proto definitions and their committed generated stubs), organized by `domain/service`: `api/{domain}/{service}/proto/` for `.proto` sources and `api/{domain}/{service}/protopb/` for generated Go. A service package may hold multiple `.proto` files — its RPC contract (`{service}.proto`) alongside messagequeue contracts (queue payload schemas) — all generating into the same `protopb/`.
The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`.

### Platform notes

Expand Down Expand Up @@ -156,9 +157,10 @@ Paths follow the directory layout: shared packages live under `platform/` at the
- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`)
- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}`
- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb`
- Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue`
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; internal pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` (external queue topic keys live with their contract package, e.g. `api/runway/messagequeue`)
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
- Shared entities: `github.com/uber/submitqueue/platform/base/{pkg}` (e.g. `.../platform/base/messagequeue`)
- Shared extensions: `github.com/uber/submitqueue/platform/extension/{ext}[/{impl}]` (e.g. `.../platform/extension/messagequeue/mysql`)
Expand All @@ -182,7 +184,13 @@ Generated proto files are committed. When modifying `.proto` files:
2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` into `api/{domain}/{service}/protopb/`)
3. Commit all generated files

To add a new `.proto` to a service (e.g. messagequeue contracts), drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
To add a new `.proto` to a service, drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.

### Message Queue Contracts

Queue payloads are defined in **proto3** (`.proto` under `proto/`, generated Go in `protopb/` as the binding) and serialized as **protobuf JSON** (protojson) so the queue keeps storing self-describing JSON. Location follows audience: external/cross-domain contracts go under `api/{domain}/messagequeue/`; internal contracts (used only within the owning domain) go under `{domain}/core/messagequeue/`. Bazel `visibility` enforces the split — internal targets are domain-scoped, `api/` targets are public. See [doc/rfc/messagequeue-contract.md](doc/rfc/messagequeue-contract.md).

Thin `protojson` helpers (`MergeRequestToBytes`/`MergeRequestFromBytes`, …) own the wire conventions: `UseProtoNames` (snake_case fields), UPPER_SNAKE enum values, int64-as-string, unknown fields discarded on read (additive evolution). The topic(s) carrying a message are declared on the message via the `topics` proto option — a `google.protobuf.MessageOptions` extension defined in `api/base/messagequeue`, the proto-native replacement for `x-topics`; a `Topics(msg)` reflection helper reads it. It is contract metadata, not the hot path — publish/consume still routes on `consumer.TopicKey` + `TopicRegistry`. The contract package owns both halves: the proto payload and the `TopicKey` constants for its topics. A contract test round-trips the payloads and asserts every topic key is bound to exactly one message. Shared field types (`Change`, `Strategy`) are shared protos under `api/base/{change,mergestrategy}`. `api/runway/messagequeue/` is the reference example.

### Naming Conventions

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GOIMPORTS_VERSION ?= v0.33.0
# (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A
# package may hold multiple .proto files (e.g. an RPC contract plus messagequeue
# contracts); all generated stubs land in the same protopb/ dir.
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator

# Set REPO_ROOT for docker-compose
export REPO_ROOT := $(shell pwd)
Expand Down
31 changes: 31 additions & 0 deletions api/runway/messagequeue/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "messagequeue",
srcs = [
"merge.go",
"topics.go",
],
importpath = "github.com/uber/submitqueue/api/runway/messagequeue",
visibility = ["//visibility:public"],
deps = [
"//api/base/messagequeue/protopb", # keep
"//api/runway/messagequeue/protopb", # keep
"//platform/consumer",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//proto",
],
)

go_test(
name = "messagequeue_test",
srcs = ["merge_test.go"],
embed = [":messagequeue"],
deps = [
"//api/base/change/protopb",
"//api/base/mergestrategy/protopb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//proto",
],
)
22 changes: 22 additions & 0 deletions api/runway/messagequeue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Runway message queue contract

The published, language-neutral contract for the merge queues Runway owns. A client — in any language — publishes a merge request and consumes the result without access to Runway's Go types or storage. See [the message queue contract RFC](../../../doc/rfc/messagequeue-contract.md) for the design.

Payloads are defined as proto3 messages in [`proto/merge.proto`](proto/merge.proto) and generated into [`protopb/`](protopb); the proto is the authority and a non-Go client compiles against it directly. On the wire, payloads are serialized as protobuf JSON (`protojson`), so the queue keeps storing self-describing JSON. The Go helpers in this package (`MergeRequestToBytes`/`MergeRequestFromBytes` and the `MergeResult` counterparts) wrap `protojson` for Go callers; field names stay snake_case (`UseProtoNames`) and enums serialize as their UPPER_SNAKE value name.

The shared field types `Change` and `MergeStrategy` come from `api/base/change` and `api/base/mergestrategy`, imported by the contract.

## Topics

The binding between a queue topic and its payload lives in each message's `topics` option (defined in `api/base/messagequeue`); `Topics` reads it back by reflection.

| Message | Direction | Topics |
|---|---|---|
| `MergeRequest` | client → Runway | `merge-conflict-checker`, `merger` |
| `MergeResult` | Runway → client | `merge-conflict-checker-signal`, `merger-signal` |

One message serves a queue pair because a merge-conflict check is a dry run of a merge: Runway applies the same ordered steps onto the same target branch, and the topic the request arrives on decides whether it commits the result and reports the produced revisions. A request on `merge-conflict-checker` is a dry run; a request on `merger` commits.

## Evolution

Contract changes are additive-only: add new fields; never remove, rename, repurpose, or retype an existing field, and never reuse a field number. protojson ignores unknown fields on read and omits zero-valued fields on write, so a new optional field is backward-compatible in both directions.
106 changes: 106 additions & 0 deletions api/runway/messagequeue/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package messagequeue holds Runway's external message-queue contract: the wire
// payloads for the merge queues Runway owns, defined by the proto files in
// proto/ and generated into protopb/. The proto is the language-neutral
// authority; the generated Go types in protopb are the binding for Go callers.
//
// Payloads are serialized as protobuf JSON (protojson), not binary, so the
// MySQL-backed queue keeps storing self-describing JSON. The topic that carries
// each payload is declared on the message itself via the topics proto option
// (see api/base/messagequeue); Topics reads it back.
//
// One contract serves two queue pairs because a merge-conflict check is a dry
// run of a merge: Runway applies the same ordered steps onto the same target
// branch, and the only difference is whether it commits the result and reports
// the revisions it produced. The topic a request arrives on encodes that choice
// — the merge-conflict-checker pair for a dry run, the merger pair for a
// committing merge — so MergeRequest and MergeResult are identical on both.
package messagequeue

import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb"
"github.com/uber/submitqueue/api/runway/messagequeue/protopb"
)

// Wire payload types. These alias the generated protobuf bindings so callers
// reference the contract through this curated package rather than protopb.
type (
// MergeRequest is the payload a client publishes to one of Runway's merge
// queues: the merge-conflict-checker topic for a dry-run check, the merger
// topic for a committing merge.
MergeRequest = protopb.MergeRequest
// MergeStep is one step of an ordered merge: a single set of change(s)
// applied with a strategy.
MergeStep = protopb.MergeStep
// MergeResult is the payload Runway publishes to the corresponding signal
// queue once a request completes.
MergeResult = protopb.MergeResult
// StepResult reports what happened to a single MergeStep.
StepResult = protopb.StepResult
)

// marshalOpts keeps the JSON field names identical to the proto field names
// (snake_case), so the wire shape matches the declared contract rather than
// protojson's default lowerCamelCase. Zero-valued fields are omitted.
var marshalOpts = protojson.MarshalOptions{UseProtoNames: true}

// unmarshalOpts tolerates unknown fields so an additive contract change (a new
// field a producer sends but this consumer does not yet know) is ignored rather
// than rejected.
var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true}

// MergeRequestToBytes serializes a MergeRequest to protojson bytes for the queue
// payload.
func MergeRequestToBytes(r *MergeRequest) ([]byte, error) {
return marshalOpts.Marshal(r)
}

// MergeRequestFromBytes deserializes a MergeRequest from protojson bytes.
func MergeRequestFromBytes(data []byte) (*MergeRequest, error) {
var req MergeRequest
err := unmarshalOpts.Unmarshal(data, &req)
return &req, err
}

// MergeResultToBytes serializes a MergeResult to protojson bytes for the queue
// payload.
func MergeResultToBytes(r *MergeResult) ([]byte, error) {
return marshalOpts.Marshal(r)
}

// MergeResultFromBytes deserializes a MergeResult from protojson bytes.
func MergeResultFromBytes(data []byte) (*MergeResult, error) {
var res MergeResult
err := unmarshalOpts.Unmarshal(data, &res)
return &res, err
}

// Topics returns the canonical wire topic names bound to a message via the
// topics proto option. It returns nil for a message that declares no topics.
func Topics(m proto.Message) []string {
opts := m.ProtoReflect().Descriptor().Options()
if opts == nil {
return nil
}
topics, ok := proto.GetExtension(opts, basemqpb.E_Topics).([]string)
if !ok {
return nil
}
return topics
}
133 changes: 133 additions & 0 deletions api/runway/messagequeue/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package messagequeue

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

changepb "github.com/uber/submitqueue/api/base/change/protopb"
strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb"
)

func TestMergeRequestRoundTrip(t *testing.T) {
req := &MergeRequest{
Id: "queue-a/42",
QueueName: "queue-a",
Steps: []*MergeStep{
{
StepId: "queue-a/1",
Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/1/0123456789abcdef0123456789abcdef01234567"}}},
Strategy: strategypb.Strategy_REBASE,
},
{
StepId: "queue-a/2",
Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/2/89abcdef0123456789abcdef0123456789abcdef"}}},
Strategy: strategypb.Strategy_MERGE,
},
},
}

data, err := MergeRequestToBytes(req)
require.NoError(t, err)

got, err := MergeRequestFromBytes(data)
require.NoError(t, err)
assert.True(t, proto.Equal(req, got), "round-tripped MergeRequest should equal the original")
}

func TestMergeResultRoundTrip(t *testing.T) {
// A committing merge reports the revisions each step produced on the target;
// a dry-run check leaves output_ids empty and reports a per-step reason on
// failure. Both shapes share the one MergeResult contract.
cases := map[string]*MergeResult{
"merged with produced revisions": {
Id: "queue-a/42",
Success: true,
Steps: []*StepResult{
{StepId: "queue-a/1", OutputIds: []string{"0123456789abcdef0123456789abcdef01234567"}},
},
},
"failed with per-step reason": {
Id: "queue-a/42",
Success: false,
Reason: "conflict in foo.go",
Steps: []*StepResult{{StepId: "queue-a/2", Reason: "conflict in foo.go"}},
},
"minimal": {
Id: "queue-a/42",
Success: true,
},
}

for name, res := range cases {
t.Run(name, func(t *testing.T) {
data, err := MergeResultToBytes(res)
require.NoError(t, err)

got, err := MergeResultFromBytes(data)
require.NoError(t, err)
assert.True(t, proto.Equal(res, got), "round-tripped MergeResult should equal the original")
})
}
}

// TestWireFormat locks the two protojson encoding decisions the contract relies
// on: snake_case field names (UseProtoNames) and proto-conventional UPPER_SNAKE
// enum values on the wire.
func TestWireFormat(t *testing.T) {
data, err := MergeRequestToBytes(&MergeRequest{
Id: "queue-a/42",
QueueName: "queue-a",
Steps: []*MergeStep{{StepId: "queue-a/1", Strategy: strategypb.Strategy_SQUASH_REBASE}},
})
require.NoError(t, err)

assert.Contains(t, string(data), `"queue_name"`, "fields must serialize as snake_case")
assert.Contains(t, string(data), `"SQUASH_REBASE"`, "enums must serialize as their UPPER_SNAKE value name")
}

// TestTopicsBindEveryTopicKey is the topic-binding drift guard: every Runway
// topic key is carried by exactly one message's topics option, and no topics
// option names an unknown topic.
func TestTopicsBindEveryTopicKey(t *testing.T) {
bound := map[string]int{}
for _, m := range []proto.Message{&MergeRequest{}, &MergeResult{}} {
topics := Topics(m)
require.NotEmpty(t, topics, "message must declare a non-empty topics option")
for _, topic := range topics {
bound[topic]++
}
}

keys := []TopicKey{
TopicKeyMergeConflictCheck,
TopicKeyMergeConflictCheckSignal,
TopicKeyMerge,
TopicKeyMergeSignal,
}

valid := map[string]bool{}
for _, k := range keys {
valid[k.String()] = true
assert.Equalf(t, 1, bound[k.String()], "topic key %q must be bound to exactly one message via the topics option", k)
}
for topic := range bound {
assert.Truef(t, valid[topic], "topics option names unknown topic %q", topic)
}
}
Loading
Loading