-
Notifications
You must be signed in to change notification settings - Fork 2
feat(stovepipe): add start controller for the change ingress flow #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,21 @@ | ||
| load("@rules_go//go:def.bzl", "go_library") | ||
| load("@rules_go//go:def.bzl", "go_library", "go_test") | ||
|
|
||
| go_library( | ||
| name = "entity", | ||
| srcs = ["entity.go"], | ||
| srcs = [ | ||
| "change_event.go", | ||
| "entity.go", | ||
| ], | ||
| importpath = "github.com/uber/submitqueue/stovepipe/entity", | ||
| visibility = ["//visibility:public"], | ||
| ) | ||
|
|
||
| go_test( | ||
| name = "entity_test", | ||
| srcs = ["change_event_test.go"], | ||
| embed = [":entity"], | ||
| deps = [ | ||
| "@com_github_stretchr_testify//assert", | ||
| "@com_github_stretchr_testify//require", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| // Copyright (c) 2025 Uber Technologies, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package entity | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/url" | ||
| ) | ||
|
|
||
| // ChangeEvent represents a change in the pipeline. URI represents the identity | ||
| // of the change. | ||
| // | ||
| // The repository-scoped ordering key is carried on the queue message envelope | ||
| // (Message.PartitionKey), stamped once at ingestion and propagated unchanged | ||
| // across stages, so it is deliberately not duplicated here. | ||
| type ChangeEvent struct { | ||
| // URI represents the identity of the change. The scheme names the VCS; the rest | ||
| // is provider-specific (e.g. git://remote/repo/ref/commit_sha). | ||
| URI string `json:"uri"` | ||
| } | ||
|
|
||
| // ToBytes serializes the ChangeEvent to JSON bytes for queue message payload. | ||
| func (e ChangeEvent) ToBytes() ([]byte, error) { | ||
| return json.Marshal(e) | ||
| } | ||
|
|
||
| // Scheme returns the URI scheme identifying the VCS (e.g. "git"), or "" if the | ||
| // URI is empty or not scheme-qualified. It is the key used to select a resolver. | ||
| func (e ChangeEvent) Scheme() string { | ||
| u, err := url.Parse(e.URI) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| return u.Scheme | ||
| } | ||
|
|
||
| // Validate checks that the change event carries a scheme-qualified commit URI. | ||
| func (e ChangeEvent) Validate() error { | ||
| if e.URI == "" { | ||
| return fmt.Errorf("change event requires a commit URI") | ||
| } | ||
| if e.Scheme() == "" { | ||
| return fmt.Errorf("change event URI %q must be scheme-qualified", e.URI) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes. | ||
| func ChangeEventFromBytes(data []byte) (ChangeEvent, error) { | ||
| var event ChangeEvent | ||
| if err := json.Unmarshal(data, &event); err != nil { | ||
| return ChangeEvent{}, err | ||
| } | ||
| if err := event.Validate(); err != nil { | ||
| return ChangeEvent{}, err | ||
| } | ||
| return event, nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| // Copyright (c) 2025 Uber Technologies, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package entity | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| const testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" | ||
|
|
||
| func TestChangeEvent_Validate(t *testing.T) { | ||
| t.Run("valid scheme-qualified URI", func(t *testing.T) { | ||
| require.NoError(t, ChangeEvent{URI: testURI}.Validate()) | ||
| }) | ||
|
|
||
| t.Run("rejects empty URI", func(t *testing.T) { | ||
| require.Error(t, ChangeEvent{}.Validate()) | ||
| }) | ||
|
|
||
| t.Run("rejects URI without scheme", func(t *testing.T) { | ||
| require.Error(t, ChangeEvent{URI: "not-a-uri"}.Validate()) | ||
| }) | ||
|
|
||
| // Validate is VCS-agnostic: a non-git but scheme-qualified URI passes here; | ||
| // rejecting an unsupported VCS is the resolver/wiring layer's job. | ||
| t.Run("accepts non-git scheme", func(t *testing.T) { | ||
| require.NoError(t, ChangeEvent{URI: "hg://example.com/repo/rev"}.Validate()) | ||
| }) | ||
| } | ||
|
|
||
| func TestChangeEvent_Scheme(t *testing.T) { | ||
| assert.Equal(t, "git", ChangeEvent{URI: testURI}.Scheme()) | ||
| assert.Equal(t, "", ChangeEvent{URI: "not-a-uri"}.Scheme()) | ||
| } | ||
|
|
||
| func TestChangeEventFromBytes(t *testing.T) { | ||
| original := ChangeEvent{URI: testURI} | ||
| data, err := original.ToBytes() | ||
| require.NoError(t, err) | ||
|
|
||
| got, err := ChangeEventFromBytes(data) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, original.URI, got.URI) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| load("@rules_go//go:def.bzl", "go_library", "go_test") | ||
|
|
||
| go_library( | ||
| name = "start", | ||
| srcs = ["start.go"], | ||
| importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start", | ||
| visibility = ["//visibility:public"], | ||
| deps = [ | ||
| "//core/consumer", | ||
| "//core/metrics", | ||
| "//entity/messagequeue", | ||
| "//stovepipe/core/topickey", | ||
| "//stovepipe/entity", | ||
| "@com_github_uber_go_tally//:tally", | ||
| "@org_uber_go_zap//:zap", | ||
| ], | ||
| ) | ||
|
|
||
| go_test( | ||
| name = "start_test", | ||
| srcs = ["start_test.go"], | ||
| embed = [":start"], | ||
| deps = [ | ||
| "//core/consumer", | ||
| "//entity/messagequeue", | ||
| "//extension/messagequeue/mock", | ||
| "//stovepipe/core/topickey", | ||
| "//stovepipe/entity", | ||
| "@com_github_stretchr_testify//assert", | ||
| "@com_github_stretchr_testify//require", | ||
| "@com_github_uber_go_tally//:tally", | ||
| "@org_uber_go_mock//gomock", | ||
| "@org_uber_go_zap//zaptest", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| // Copyright (c) 2025 Uber Technologies, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package start | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/uber-go/tally" | ||
| "github.com/uber/submitqueue/core/consumer" | ||
| "github.com/uber/submitqueue/core/metrics" | ||
| entityqueue "github.com/uber/submitqueue/entity/messagequeue" | ||
| "github.com/uber/submitqueue/stovepipe/core/topickey" | ||
| entity "github.com/uber/submitqueue/stovepipe/entity" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| // Controller handles start queue messages. It is the pipeline entry point: it | ||
| // deserializes the inbound change event and forwards the commit reference to the | ||
| // validate stage using the partition key for ordering. | ||
| // | ||
| // Ordering key is decided once at ingestion and carried through the pipeline. | ||
| // | ||
| // Currently a forwarding stub. Per the Stovepipe workflow RFC, start will | ||
| // also record the Commit as `unknown` (keyed by SHA, making ingest idempotent across | ||
| // the webhook and poll producers) and emit status + log events. | ||
|
|
||
| var _ consumer.Controller = (*Controller)(nil) | ||
|
|
||
| type Controller struct { | ||
| logger *zap.SugaredLogger | ||
| metricsScope tally.Scope | ||
| registry consumer.TopicRegistry | ||
| topicKey consumer.TopicKey | ||
| consumerGroup string | ||
| } | ||
|
|
||
| // Params are the parameters for creating a new start controller. | ||
| type Params struct { | ||
| Registry consumer.TopicRegistry | ||
| TopicKey consumer.TopicKey | ||
| ConsumerGroup string | ||
|
|
||
| Scope tally.Scope | ||
| Logger *zap.SugaredLogger | ||
| } | ||
|
|
||
| // NewController creates a new start controller for the orchestrator. | ||
| func NewController(p Params) *Controller { | ||
| return &Controller{ | ||
| logger: p.Logger.Named("start_controller"), | ||
| metricsScope: p.Scope.SubScope("start_controller"), | ||
| registry: p.Registry, | ||
| topicKey: p.TopicKey, | ||
| consumerGroup: p.ConsumerGroup, | ||
| } | ||
| } | ||
|
|
||
| // Process deserializes the change event and forwards the commit to the validate stage. | ||
| func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { | ||
| const opName = "process" | ||
|
|
||
| op := metrics.Begin(c.metricsScope, opName) | ||
| defer func() { op.Complete(retErr) }() | ||
|
|
||
| msg := delivery.Message() | ||
|
|
||
| event, err := entity.ChangeEventFromBytes(msg.Payload) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should ideally be the whole Request that is sent to gateway
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If have the poller passing ChangeEvent (or Change - depending on what we decide above), isn't this already doing that? Or do you mean something else? |
||
| if err != nil { | ||
| metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) | ||
| return fmt.Errorf("failed to deserialize change event: %w", err) | ||
| } | ||
|
|
||
| // The ordering key lives on the message envelope, stamped by the producer at | ||
| // ingestion; the controller propagates it verbatim to the next stage. | ||
| partitionKey := msg.PartitionKey | ||
| if partitionKey == "" { | ||
| metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1) | ||
| return fmt.Errorf("change event for uri=%s is missing a partition key (must be stamped by the producer)", event.URI) | ||
| } | ||
|
|
||
| c.logger.Infow("received change event", | ||
| "uri", event.URI, | ||
| "attempt", delivery.Attempt(), | ||
| "partition_key", partitionKey, | ||
| ) | ||
|
|
||
| // Core Logic to be added here: | ||
| // - Record the commit as `unknown` (keyed by SHA) | ||
| // - Emit status + log events | ||
|
|
||
| if err := c.publish(ctx, topickey.TopicKeyValidate, event, partitionKey); err != nil { | ||
| metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) | ||
| return fmt.Errorf("failed to publish to validate: %w", err) | ||
| } | ||
|
|
||
| c.logger.Infow("published commit to validate", | ||
| "uri", event.URI, | ||
| "topic_key", topickey.TopicKeyValidate, | ||
| ) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, event entity.ChangeEvent, partitionKey string) error { | ||
| payload, err := event.ToBytes() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to serialize change event: %w", err) | ||
| } | ||
|
|
||
| msg := entityqueue.NewMessage(event.URI, payload, partitionKey, nil) | ||
|
|
||
| q, ok := c.registry.Queue(key) | ||
| if !ok { | ||
| return fmt.Errorf("no queue registered for topic key %s", key) | ||
| } | ||
|
|
||
| topicName, ok := c.registry.TopicName(key) | ||
| if !ok { | ||
| return fmt.Errorf("no topic name registered for topic key %s", key) | ||
| } | ||
|
|
||
| if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { | ||
| return fmt.Errorf("failed to publish message: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Name returns the controller name for logging and metrics. | ||
| func (c *Controller) Name() string { | ||
| return "start" | ||
| } | ||
|
|
||
| // TopicKey returns the topic key this controller subscribes to. | ||
| func (c *Controller) TopicKey() consumer.TopicKey { | ||
| return c.topicKey | ||
| } | ||
|
|
||
| // ConsumerGroup returns the consumer group for offset tracking. | ||
| func (c *Controller) ConsumerGroup() string { | ||
| return c.consumerGroup | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to `change.Change entity as is for this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main limitation of reusing change.Change is it supports multiple URIs, and in this flow, we had discussed having the gateway break them down and publish them to the queue as individual commit events. So that is the main difference, though we could also just reuse and always require a length of 1?