Skip to content

feat(stovepipe): add start controller for the change ingress flow#229

Open
mnoah1 wants to merge 1 commit into
mainfrom
mnoah1/stovepipe-start-controller
Open

feat(stovepipe): add start controller for the change ingress flow#229
mnoah1 wants to merge 1 commit into
mainfrom
mnoah1/stovepipe-start-controller

Conversation

@mnoah1

@mnoah1 mnoah1 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Setting up start controller. Per our discussions, the gateway will be responsible for polling and finding new commits, then enqueuing them in order for the orchestrator to pick up.

Start will kick off the rest of the workflow for a commit.

Following SubmitQueue pattern of having Gateway set the partition key when creating the event, instead of deriving it internally within the orchestrator.

Why?

Initial step for commit validation workflow

What?

Setting up "start" step of this workflow: https://github.com/uber/submitqueue/blob/main/doc/rfc/stovepipe/workflow.md

Test Plan

  • Wire into example app
  • Manually inject an event via sql, confirm that that the event triggers, and inserts a validate message
rchestrator-service-1  | 2026-06-16T14:31:29.811Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:217	fetched messages	{"topic": "start", "partition_key": "git.example.com/uber/monorepo", "count": 1}
orchestrator-service-1  | 2026-06-16T14:31:29.815Z	DEBUG	consumer/consumer.go:351	processing delivery	{"controller": "start", "topic_key": "start", "message_id": "test-msg-1", "partition_key": "git.example.com/uber/monorepo", "attempt": 1}
orchestrator-service-1  | 2026-06-16T14:31:29.815Z	INFO	start_controller	start/start.go:85	received change event	{"uri": "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01", "attempt": 1, "partition_key": "git.example.com/uber/monorepo"}
orchestrator-service-1  | 2026-06-16T14:31:29.816Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:72	inserting messages	{"topic": "validate", "count": 1, "visible_after": 0}
orchestrator-service-1  | 2026-06-16T14:31:29.821Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:125	inserted messages	{"topic": "validate", "count": 1}
orchestrator-service-1  | 2026-06-16T14:31:29.821Z	DEBUG	queue_mysql.publisher	mysql/publisher.go:65	published message	{"topic": "validate", "message_id": "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01"}
orchestrator-service-1  | 2026-06-16T14:31:29.821Z	INFO	start_controller	start/start.go:96	published commit to validate	{"uri": "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01", "topic_key": "validate"}
orchestrator-service-1  | 2026-06-16T14:31:29.825Z	DEBUG	consumer/consumer.go:479	message processed successfully	{"controller": "start", "topic_key": "start", "message_id": "test-msg-1", "partition_key": "git.example.com/uber/monorepo", "attempt": 1, "elapsed_ms": 5}
orchestrator-service-1  | 2026-06-16T14:31:29.900Z	DEBUG	queue_mysql.partition_lease_store	mysql/partition_lease_store.go:191	retrieved leased partitions	{"topic": "start", "count": 1}
orchestrator-service-1  | 2026-06-16T14:31:29.900Z	DEBUG	queue_mysql.subscriber_heartbeat_store	mysql/subscriber_heartbeat_store.go:95	found active subscribers	{"topic": "start", "count": 1, "subscribers": ["orchestrator-dev"]}
orchestrator-service-1  | 2026-06-16T14:31:29.901Z	DEBUG	queue_mysql.partition_lease_store	mysql/partition_lease_store.go:232	discovered partitions	{"topic": "start", "count": 1}
orchestrator-service-1  | 2026-06-16T14:31:29.901Z	DEBUG	queue_mysql.partition_lease_store	mysql/partition_lease_store.go:83	acquired lease	{"topic": "start", "partition_key": "git.example.com/uber/monorepo"}

@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-start-controller branch from 424e438 to 5ca558a Compare June 9, 2026 21:58
@mnoah1 mnoah1 force-pushed the mnoah1/lift-core-consumer branch from 2a53a64 to 425f060 Compare June 12, 2026 15:40
Base automatically changed from mnoah1/lift-core-consumer to main June 12, 2026 15:45
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-start-controller branch 4 times, most recently from 2788fc5 to bf2716b Compare June 16, 2026 21:32
@mnoah1 mnoah1 marked this pull request as ready for review June 16, 2026 21:44
@mnoah1 mnoah1 requested review from a team, behinddwalls and sbalabanov as code owners June 16, 2026 21:44
@mnoah1 mnoah1 enabled auto-merge June 16, 2026 21:45
…ingress flow

Adds the ChangeEvent entity (commit URI + carried, gateway-stamped
PartitionKey) and the orchestrator start controller, the pipeline entry
point. start is the reading side of the partitioning model: it consumes
the partition key stamped by the producer and forwards it to validate
verbatim, never re-parsing the URI (mirrors SubmitQueue's request.Queue).
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-start-controller branch from 57a2397 to 03a6c15 Compare June 17, 2026 03:03
// The repository-scoped ordering key is carried on the queue message envelope
// (Message.PartitionKey), stamped once at ingestion and propagated unchanged
// across stages, so it is deliberately not duplicated here.
type ChangeEvent struct {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to `change.Change entity as is for this one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main limitation of reusing change.Change is it supports multiple URIs, and in this flow, we had discussed having the gateway break them down and publish them to the queue as individual commit events. So that is the main difference, though we could also just reuse and always require a length of 1?


msg := delivery.Message()

event, err := entity.ChangeEventFromBytes(msg.Payload)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should ideally be the whole Request that is sent to gateway

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If have the poller passing ChangeEvent (or Change - depending on what we decide above), isn't this already doing that? Or do you mean something else?

 poller -> Individual ordered ChangeEvents -> Ingest [gateway]
 queue
 ChangeEvent -> start [orchestrator]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants