diff --git a/Makefile b/Makefile index a8467fda..b85707a6 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,12 @@ STOVEPIPE_STACK_COMPOSE_FILE = example/stovepipe/docker-compose.yml # Fixed project name for local manual testing (tests use unique random names) STOVEPIPE_LOCAL_PROJECT = stovepipe +# Runway compose files +RUNWAY_ORCHESTRATOR_COMPOSE_FILE = example/runway/orchestrator/server/docker-compose.yml + +# Fixed project name for local manual testing (tests use unique random names) +RUNWAY_LOCAL_PROJECT = runway + # yamlfmt version for YAML formatting (override with: make fmt YAMLFMT_VERSION=v0.16.0) YAMLFMT_VERSION ?= v0.16.0 @@ -45,7 +51,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-runway-orchestrator-start local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -54,9 +60,17 @@ build: ## Build all services and examples @echo "Build complete!" # Build Linux binaries required for Docker containers -build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker +build-all-linux: build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux ## Build all Linux binaries for Docker @echo "All Linux binaries ready for Docker" +build-runway-orchestrator-linux: ## Build Runway orchestrator Linux binary for Docker + @echo "Building Runway orchestrator Linux binary for Docker..." + @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator + @mkdir -p .docker-bin + @cp -f bazel-bin/example/runway/orchestrator/server/orchestrator_/orchestrator .docker-bin/runway-orchestrator 2>/dev/null || \ + cp -f bazel-bin/example/runway/orchestrator/server/orchestrator .docker-bin/runway-orchestrator + @echo "Runway orchestrator Linux binary ready at .docker-bin/runway-orchestrator" + build-submitqueue-gateway-linux: ## Build Gateway Linux binary for Docker @echo "Building Gateway Linux binary for Docker..." @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/submitqueue/gateway/server:gateway @@ -215,6 +229,14 @@ local-init-stovepipe-queue-schema: ## Apply queue schema only (mysql-queue) for done @echo "✅ Stovepipe queue schema applied successfully" +local-init-runway-queue-schema: ## Apply queue schema for Runway compose stacks + @echo "Applying queue schema to mysql-queue (Runway; no app database)..." + @for file in extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "✅ Runway queue schema applied successfully" + local-submitqueue-logs: ## View logs from all running services @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) logs -f @@ -280,8 +302,21 @@ local-stop: ## Stop all services (keep data) @echo "Stopping all services..." @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) down @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) down + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down @echo "Services stopped. Data volumes preserved." +local-runway-orchestrator-start: build-runway-orchestrator-linux ## Start Runway orchestrator locally (orchestrator + MySQL queue) + @echo "Starting Runway orchestrator with compose..." + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait + @echo "Applying queue schema..." + @$(MAKE) -s local-init-runway-queue-schema + @echo "" + @echo "✅ Runway orchestrator is running!" + @echo "" + @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps + @echo "" + @echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')" + local-stovepipe-logs: ## View logs from all running Stovepipe services @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) logs -f diff --git a/example/runway/orchestrator/server/BUILD.bazel b/example/runway/orchestrator/server/BUILD.bazel new file mode 100644 index 00000000..67c2e59b --- /dev/null +++ b/example/runway/orchestrator/server/BUILD.bazel @@ -0,0 +1,30 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "server_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/runway/orchestrator/server", + visibility = ["//visibility:private"], + deps = [ + "//core/consumer", + "//core/errs", + "//core/errs/generic", + "//core/errs/mysql", + "//extension/messagequeue", + "//extension/messagequeue/mysql", + "//runway/core/topickey", + "//runway/extension/vcs", + "//runway/extension/vcs/noop", + "//runway/orchestrator/controller/check", + "//runway/orchestrator/controller/land", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "server", + embed = [":server_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/runway/orchestrator/server/Dockerfile b/example/runway/orchestrator/server/Dockerfile new file mode 100644 index 00000000..dd982f9e --- /dev/null +++ b/example/runway/orchestrator/server/Dockerfile @@ -0,0 +1,11 @@ +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /root/ + +# Built via: make build-runway-orchestrator-linux +COPY .docker-bin/runway-orchestrator ./orchestrator + +EXPOSE 8080 + +CMD ["./orchestrator"] diff --git a/example/runway/orchestrator/server/docker-compose.yml b/example/runway/orchestrator/server/docker-compose.yml new file mode 100644 index 00000000..f9bb7c6d --- /dev/null +++ b/example/runway/orchestrator/server/docker-compose.yml @@ -0,0 +1,43 @@ +# Docker Compose for Runway orchestrator manual testing +# +# Runway is stateless — no application database, only the queue infrastructure. +# +# IMPORTANT: Before running compose, build the Linux binary: +# make build-runway-orchestrator-linux +# +# Quick start: +# make local-runway-orchestrator-start + +services: + # Queue Database - Messaging infrastructure (messages, offsets, partition leases) + # Runway has no application database — it is stateless. + mysql-queue: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + + orchestrator-service: + build: + context: ${REPO_ROOT} + dockerfile: example/runway/orchestrator/server/Dockerfile + ports: + - "8080" # Random ephemeral port to avoid conflicts + environment: + - PORT=:8080 + # Queue infrastructure connection (separate database) + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=runway-orchestrator-dev + depends_on: + mysql-queue: + condition: service_healthy diff --git a/example/runway/orchestrator/server/main.go b/example/runway/orchestrator/server/main.go new file mode 100644 index 00000000..73479980 --- /dev/null +++ b/example/runway/orchestrator/server/main.go @@ -0,0 +1,245 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally" + "go.uber.org/zap" + + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" + extqueue "github.com/uber/submitqueue/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/extension/vcs" + "github.com/uber/submitqueue/runway/extension/vcs/noop" + "github.com/uber/submitqueue/runway/orchestrator/controller/check" + "github.com/uber/submitqueue/runway/orchestrator/controller/land" +) + +func main() { + code := 0 + if err := run(); err != nil { + if errors.Is(err, context.Canceled) { + fmt.Println("Runway orchestrator stopped by signal") + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Runway orchestrator failure: %v\n", err) + code = 1 + } + } + os.Exit(code) +} + +// noopVCSFactory returns the noop VCS for any configuration. +type noopVCSFactory struct{} + +func (noopVCSFactory) For(_ vcs.Config) (vcs.VCS, error) { + return noop.New(), nil +} + +func run() error { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logger, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + defer logger.Sync() + + scope := tally.NewTestScope("runway_orchestrator", nil) + metricsStopCh := make(chan any, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) + go func() { + defer metricsWgDone.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } + } + }() + + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + + // Open queue database connection (runway is stateless — no app database). + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + logger.Info("initialized queue", zap.String("dsn", queueDSN)) + + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("runway-orchestrator-%d", time.Now().Unix()) + } + + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + count, err := registerControllers(primaryConsumer, logger.Sugar(), scope, registry) + if err != nil { + return fmt.Errorf("failed to register controllers: %w", err) + } + logger.Info("registered controllers", zap.Int("count", count)) + + if err := primaryConsumer.Start(ctx); err != nil { + return fmt.Errorf("failed to start consumer: %w", err) + } + + fmt.Println("Runway orchestrator is running. Press Ctrl+C to stop, or send a SIGTERM.") + + <-ctx.Done() + fmt.Println("Shutting down runway orchestrator...") + + err = ctx.Err() + + stopErr := primaryConsumer.Stop(30000) + if stopErr != nil { + err = fmt.Errorf("failed to stop consumer: %w", stopErr) + } + + return err +} + +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + type topicSpec struct { + key consumer.TopicKey + name string + groupSuffix string + } + + // Inbound topics: runway subscribes to these. + inboundTopics := []topicSpec{ + {topickey.TopicKeyCheck, "runway-check", "orchestrator-check"}, + {topickey.TopicKeyLand, "runway-land", "orchestrator-land"}, + } + + configs := make([]consumer.TopicConfig, 0, len(inboundTopics)+2) + for _, t := range inboundTopics { + configs = append(configs, consumer.TopicConfig{ + Key: t.key, + Name: t.name, + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, t.groupSuffix, + ), + }) + } + + // Outbound topics: runway publishes to these, submitqueue consumes them. + // No subscription — publish-only. + configs = append(configs, + consumer.TopicConfig{ + Key: topickey.TopicKeyCheckResult, + Name: "sq-check-result", + Queue: q, + }, + consumer.TopicConfig{ + Key: topickey.TopicKeyLandResult, + Name: "sq-land-result", + Queue: q, + }, + ) + + return consumer.NewTopicRegistry(configs) +} + +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry) (int, error) { + var count int + vcsFactory := noopVCSFactory{} + + checkController := check.NewController( + logger, + scope, + registry, + vcsFactory, + topickey.TopicKeyCheck, + "orchestrator-check", + ) + if err := c.Register(checkController); err != nil { + return count, fmt.Errorf("failed to register check controller: %w", err) + } + count++ + + landController := land.NewController( + logger, + scope, + registry, + vcsFactory, + topickey.TopicKeyLand, + "orchestrator-land", + ) + if err := c.Register(landController); err != nil { + return count, fmt.Errorf("failed to register land controller: %w", err) + } + count++ + + return count, nil +}