Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions stovepipe/entity/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
load("@rules_go//go:def.bzl", "go_library")
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "entity",
srcs = ["entity.go"],
srcs = [
"change_event.go",
"entity.go",
],
importpath = "github.com/uber/submitqueue/stovepipe/entity",
visibility = ["//visibility:public"],
)

go_test(
name = "entity_test",
srcs = ["change_event_test.go"],
embed = [":entity"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
71 changes: 71 additions & 0 deletions stovepipe/entity/change_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"encoding/json"
"fmt"
"net/url"
)

// ChangeEvent represents a change in the pipeline. URI represents the identity
// of the change.
//
// The repository-scoped ordering key is carried on the queue message envelope
// (Message.PartitionKey), stamped once at ingestion and propagated unchanged
// across stages, so it is deliberately not duplicated here.
type ChangeEvent struct {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to `change.Change entity as is for this one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main limitation of reusing change.Change is it supports multiple URIs, and in this flow, we had discussed having the gateway break them down and publish them to the queue as individual commit events. So that is the main difference, though we could also just reuse and always require a length of 1?

// URI represents the identity of the change. The scheme names the VCS; the rest
// is provider-specific (e.g. git://remote/repo/ref/commit_sha).
URI string `json:"uri"`
}

// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload.
func (e ChangeEvent) ToBytes() ([]byte, error) {
return json.Marshal(e)
}

// Scheme returns the URI scheme identifying the VCS (e.g. "git"), or "" if the
// URI is empty or not scheme-qualified. It is the key used to select a resolver.
func (e ChangeEvent) Scheme() string {
u, err := url.Parse(e.URI)
if err != nil {
return ""
}
return u.Scheme
}

// Validate checks that the change event carries a scheme-qualified commit URI.
func (e ChangeEvent) Validate() error {
if e.URI == "" {
return fmt.Errorf("change event requires a commit URI")
}
if e.Scheme() == "" {
return fmt.Errorf("change event URI %q must be scheme-qualified", e.URI)
}
return nil
}

// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes.
func ChangeEventFromBytes(data []byte) (ChangeEvent, error) {
var event ChangeEvent
if err := json.Unmarshal(data, &event); err != nil {
return ChangeEvent{}, err
}
if err := event.Validate(); err != nil {
return ChangeEvent{}, err
}
return event, nil
}
59 changes: 59 additions & 0 deletions stovepipe/entity/change_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01"

func TestChangeEvent_Validate(t *testing.T) {
t.Run("valid scheme-qualified URI", func(t *testing.T) {
require.NoError(t, ChangeEvent{URI: testURI}.Validate())
})

t.Run("rejects empty URI", func(t *testing.T) {
require.Error(t, ChangeEvent{}.Validate())
})

t.Run("rejects URI without scheme", func(t *testing.T) {
require.Error(t, ChangeEvent{URI: "not-a-uri"}.Validate())
})

// Validate is VCS-agnostic: a non-git but scheme-qualified URI passes here;
// rejecting an unsupported VCS is the resolver/wiring layer's job.
t.Run("accepts non-git scheme", func(t *testing.T) {
require.NoError(t, ChangeEvent{URI: "hg://example.com/repo/rev"}.Validate())
})
}

func TestChangeEvent_Scheme(t *testing.T) {
assert.Equal(t, "git", ChangeEvent{URI: testURI}.Scheme())
assert.Equal(t, "", ChangeEvent{URI: "not-a-uri"}.Scheme())
}

func TestChangeEventFromBytes(t *testing.T) {
original := ChangeEvent{URI: testURI}
data, err := original.ToBytes()
require.NoError(t, err)

got, err := ChangeEventFromBytes(data)
require.NoError(t, err)
assert.Equal(t, original.URI, got.URI)
}
35 changes: 35 additions & 0 deletions stovepipe/orchestrator/controller/start/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "start",
srcs = ["start.go"],
importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/metrics",
"//entity/messagequeue",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "start_test",
srcs = ["start_test.go"],
embed = [":start"],
deps = [
"//core/consumer",
"//entity/messagequeue",
"//extension/messagequeue/mock",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
155 changes: 155 additions & 0 deletions stovepipe/orchestrator/controller/start/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package start

import (
"context"
"fmt"

"github.com/uber-go/tally"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/metrics"
entityqueue "github.com/uber/submitqueue/entity/messagequeue"
"github.com/uber/submitqueue/stovepipe/core/topickey"
entity "github.com/uber/submitqueue/stovepipe/entity"
"go.uber.org/zap"
)

// Controller handles start queue messages. It is the pipeline entry point: it
// deserializes the inbound change event and forwards the commit reference to the
// validate stage using the partition key for ordering.
//
// Ordering key is decided once at ingestion and carried through the pipeline.
//
// Currently a forwarding stub. Per the Stovepipe workflow RFC, start will
// also record the Commit as `unknown` (keyed by SHA, making ingest idempotent across
// the webhook and poll producers) and emit status + log events.

var _ consumer.Controller = (*Controller)(nil)

type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
}

// Params are the parameters for creating a new start controller.
type Params struct {
Registry consumer.TopicRegistry
TopicKey consumer.TopicKey
ConsumerGroup string

Scope tally.Scope
Logger *zap.SugaredLogger
}

// NewController creates a new start controller for the orchestrator.
func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("start_controller"),
metricsScope: p.Scope.SubScope("start_controller"),
registry: p.Registry,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

// Process deserializes the change event and forwards the commit to the validate stage.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

event, err := entity.ChangeEventFromBytes(msg.Payload)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should ideally be the whole Request that is sent to gateway

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If have the poller passing ChangeEvent (or Change - depending on what we decide above), isn't this already doing that? Or do you mean something else?

 poller -> Individual ordered ChangeEvents -> Ingest [gateway]
 queue
 ChangeEvent -> start [orchestrator]

if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
return fmt.Errorf("failed to deserialize change event: %w", err)
}

// The ordering key lives on the message envelope, stamped by the producer at
// ingestion; the controller propagates it verbatim to the next stage.
partitionKey := msg.PartitionKey
if partitionKey == "" {
metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1)
return fmt.Errorf("change event for uri=%s is missing a partition key (must be stamped by the producer)", event.URI)
}

c.logger.Infow("received change event",
"uri", event.URI,
"attempt", delivery.Attempt(),
"partition_key", partitionKey,
)

// Core Logic to be added here:
// - Record the commit as `unknown` (keyed by SHA)
// - Emit status + log events

if err := c.publish(ctx, topickey.TopicKeyValidate, event, partitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish to validate: %w", err)
}

c.logger.Infow("published commit to validate",
"uri", event.URI,
"topic_key", topickey.TopicKeyValidate,
)

return nil
}

func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, event entity.ChangeEvent, partitionKey string) error {
payload, err := event.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize change event: %w", err)
}

msg := entityqueue.NewMessage(event.URI, payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", key)
}

topicName, ok := c.registry.TopicName(key)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", key)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

// Name returns the controller name for logging and metrics.
func (c *Controller) Name() string {
return "start"
}

// TopicKey returns the topic key this controller subscribes to.
func (c *Controller) TopicKey() consumer.TopicKey {
return c.topicKey
}

// ConsumerGroup returns the consumer group for offset tracking.
func (c *Controller) ConsumerGroup() string {
return c.consumerGroup
}
Loading
Loading