Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/stovepipe/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//stovepipe/core/topickey",
"//stovepipe/orchestrator/controller",
"//stovepipe/orchestrator/controller/start",
"//stovepipe/orchestrator/controller/validate",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
Expand Down
22 changes: 21 additions & 1 deletion example/stovepipe/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/submitqueue/stovepipe/core/topickey"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller/start"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -161,7 +162,18 @@ func run() error {
if err := primaryConsumer.Register(startController); err != nil {
return fmt.Errorf("failed to register start controller: %w", err)
}
logger.Info("controllers registered", zap.Int("primary", 1))

validateController := validate.NewController(validate.Params{
Logger: logger.Sugar(),
Scope: scope,
Registry: registry,
TopicKey: topickey.TopicKeyValidate,
ConsumerGroup: "orchestrator-validate",
})
if err := primaryConsumer.Register(validateController); err != nil {
return fmt.Errorf("failed to register validate controller: %w", err)
}
logger.Info("controllers registered", zap.Int("primary", 2))

if err := primaryConsumer.Start(ctx); err != nil {
return fmt.Errorf("failed to start primary consumer: %w", err)
Expand Down Expand Up @@ -243,5 +255,13 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "orchestrator-validate",
),
},
{
Key: topickey.TopicKeyBatch,
Name: "batch",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-batch",
),
},
})
}
3 changes: 3 additions & 0 deletions stovepipe/core/topickey/topickey.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ const (
TopicKeyStart TopicKey = "start"
// TopicKeyValidate is the pipeline stage where commits are published for metadata resolution.
TopicKeyValidate TopicKey = "validate"
// TopicKeyBatch is the pipeline stage where validated commits are aggregated, since the last
// known green, into a contiguous validation batch.
TopicKeyBatch TopicKey = "batch"
)
36 changes: 36 additions & 0 deletions stovepipe/orchestrator/controller/validate/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "validate",
srcs = ["validate.go"],
importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/validate",
visibility = ["//visibility:public"],
deps = [
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/metrics",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "validate_test",
srcs = ["validate_test.go"],
embed = [":validate"],
deps = [
"//platform/base/change",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/messagequeue/mock",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
159 changes: 159 additions & 0 deletions stovepipe/orchestrator/controller/validate/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validate

import (
"context"
"fmt"

"github.com/uber-go/tally"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/stovepipe/core/topickey"
entity "github.com/uber/submitqueue/stovepipe/entity"
"go.uber.org/zap"
)

// Controller handles validate queue messages. It consumes the validate topic and
// forwards the ingest request to the batch stage, propagating the envelope partition
// key for ordering.
//
// This step will include any validation activities prior to adding the commit to a batch.
//
// The ordering key is decided once at ingestion and carried through the pipeline.
//
// Currently a forwarding stub.

var _ consumer.Controller = (*Controller)(nil)

type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
}

// Params are the parameters for creating a new validate controller.
type Params struct {
Registry consumer.TopicRegistry
TopicKey consumer.TopicKey
ConsumerGroup string

Scope tally.Scope
Logger *zap.SugaredLogger
}

// NewController creates a new validate controller for the orchestrator.
func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("validate_controller"),
metricsScope: p.Scope.SubScope("validate_controller"),
registry: p.Registry,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

// Process validates the ingest request and forwards it to the batch stage.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

request, err := entity.IngestRequestFromBytes(msg.Payload)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
// Non-retryable: malformed messages will never succeed regardless of retry count.
return fmt.Errorf("failed to deserialize ingest request: %w", err)
}

// The ordering key lives on the message envelope, stamped by the gateway at
// ingestion; the controller propagates it verbatim to the next stage.
partitionKey := msg.PartitionKey
if partitionKey == "" {
metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1)
return fmt.Errorf("ingest request %s is missing a partition key (must be stamped by the producer)", request.ID)
}

c.logger.Infow("received ingest request",
"spid", request.ID,
"queue", request.Queue,
"change_uris", request.Change.URIs,
"change_count", len(request.Change.URIs),
"attempt", delivery.Attempt(),
"partition_key", partitionKey,
)

// Core logic to be added here:
// - Validation before publishing to batch
// - Emit status + log events

if err := c.publish(ctx, topickey.TopicKeyBatch, request, partitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish to batch: %w", err)
}

c.logger.Infow("published ingest request to batch",
"spid", request.ID,
"topic_key", topickey.TopicKeyBatch,
)

return nil
}

func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.IngestRequest, partitionKey string) error {
payload, err := request.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize ingest request: %w", err)
}

msg := entityqueue.NewMessage(request.ID, payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", key)
}

topicName, ok := c.registry.TopicName(key)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", key)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

// Name returns the controller name for logging and metrics.
func (c *Controller) Name() string {
return "validate"
}

// TopicKey returns the topic key this controller subscribes to.
func (c *Controller) TopicKey() consumer.TopicKey {
return c.topicKey
}

// ConsumerGroup returns the consumer group for offset tracking.
func (c *Controller) ConsumerGroup() string {
return c.consumerGroup
}
Loading