Make the async processing service (psv2) the default and update all existing projects#1353
Conversation
Turn on the `async_pipeline_workers` feature flag for every project that exists at deploy time, rolling out async ML processing (workers that pull tasks from the NATS queue instead of the synchronous push API) across the whole platform at once. The flag lives in the `feature_flags` JSONB column. The data migration reads each project's flags, sets the one boolean, and writes it back, leaving the other feature flags untouched. The reverse flips the flag back off for every project. New projects keep the model default of False until opted in separately. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
👷 Deploy Preview for antenna-preview processing.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Caution Review failedPull request was closed or merged during review No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughUpdates the ChangesProject flag rollout
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/main/migrations/0094_enable_async_pipeline_workers.py (1)
20-37: 🗄️ Data Integrity & Integration | 🔵 Trivial | ⚡ Quick winPrefer an atomic JSONB update for this rollout.
The read/modify/save loop writes the whole
feature_flagsvalue back per project, so a concurrent update to another flag between Line 23/33 and Line 27/37 can be lost. A DB-sidejsonb_setupdate also avoids caching everyProjectinstance and issuing one save per row.♻️ Proposed direction
+def set_async_pipeline_workers(apps, schema_editor, enabled): + Project = apps.get_model("main", "Project") + table = schema_editor.connection.ops.quote_name(Project._meta.db_table) + with schema_editor.connection.cursor() as cursor: + cursor.execute( + f""" + UPDATE {table} + SET feature_flags = jsonb_set( + COALESCE(feature_flags, '{{}}'::jsonb), + '{{async_pipeline_workers}}', + to_jsonb(%s::boolean), + true + ) + WHERE COALESCE((feature_flags->>'async_pipeline_workers')::boolean, false) + IS DISTINCT FROM %s + """, + [enabled, enabled], + ) + + def enable_async_pipeline_workers(apps, schema_editor): - Project = apps.get_model("main", "Project") - for project in Project.objects.all(): - flags = project.feature_flags - if not flags.async_pipeline_workers: - flags.async_pipeline_workers = True - project.feature_flags = flags - project.save(update_fields=["feature_flags"]) + set_async_pipeline_workers(apps, schema_editor, True) def disable_async_pipeline_workers(apps, schema_editor): - Project = apps.get_model("main", "Project") - for project in Project.objects.all(): - flags = project.feature_flags - if flags.async_pipeline_workers: - flags.async_pipeline_workers = False - project.feature_flags = flags - project.save(update_fields=["feature_flags"]) + set_async_pipeline_workers(apps, schema_editor, False)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ami/main/migrations/0094_enable_async_pipeline_workers.py` around lines 20 - 37, The `enable_async_pipeline_workers` and `disable_async_pipeline_workers` migration helpers currently read/modify/save `Project.feature_flags` in Python, which can overwrite concurrent flag changes and is inefficient per row. Update these rollout functions to use an atomic DB-side JSONB update on `feature_flags` (for example via an `update()` with `jsonb_set`-style logic) so only `async_pipeline_workers` is toggled in place without loading each `Project` instance or rewriting the full JSON object.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@ami/main/migrations/0094_enable_async_pipeline_workers.py`:
- Around line 20-37: The `enable_async_pipeline_workers` and
`disable_async_pipeline_workers` migration helpers currently read/modify/save
`Project.feature_flags` in Python, which can overwrite concurrent flag changes
and is inefficient per row. Update these rollout functions to use an atomic
DB-side JSONB update on `feature_flags` (for example via an `update()` with
`jsonb_set`-style logic) so only `async_pipeline_workers` is toggled in place
without loading each `Project` instance or rewriting the full JSON object.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9c5db873-d1d4-45fa-a0a1-b151588515b1
📒 Files selected for processing (1)
ami/main/migrations/0094_enable_async_pipeline_workers.py
There was a problem hiding this comment.
Pull request overview
This PR introduces a Django data migration that globally enables async ML processing for all existing projects by setting the feature_flags.async_pipeline_workers flag to True, with a reversible migration that disables it again.
Changes:
- Add migration
0094_enable_async_pipeline_workersto enableasync_pipeline_workersfor every existingProject. - Add reverse migration logic to disable
async_pipeline_workersfor everyProject.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Flip the `async_pipeline_workers` default in `ProjectFeatureFlags` to True so projects created from now on use the async processing service (psv2) — workers that pull tasks from the NATS queue — without an operator opting them in. No migration is required: the `feature_flags` field deconstructs by its schema class and default-factory references, neither of which changes when a default inside the pydantic model changes (`makemigrations --check` reports no changes). The companion data migration handles existing projects. Co-Authored-By: Claude <noreply@anthropic.com>
Address CodeRabbit review on the data migration: replace the read/modify/save loop with a single DB-side `jsonb_set` UPDATE that toggles only the `async_pipeline_workers` key. Updating the one key server-side leaves the other feature flags untouched even if another process changes one of them during the deploy (the previous loop rewrote the whole JSONB value and could clobber a concurrent sibling change), and it runs as one statement instead of one save per row. A `WHERE ... IS DISTINCT FROM` guard skips rows already at the target value. Co-Authored-By: Claude <noreply@anthropic.com>
|
Claude says: Addressed the data-migration nitpick in |
`test_ml_job_dispatch_mode_set_on_creation` asserted that an ML job on a default project dispatches via sync_api, which relied on `async_pipeline_workers` defaulting to False. Now that the default is True, the sync branch must set the flag off explicitly to exercise that path — the async branch already sets it on. Pins both transitions instead of leaning on the default. Co-Authored-By: Claude <noreply@anthropic.com>
The reverse is a blanket disable; some projects may have had the flag enabled individually before this rollout, so the docstring no longer claims no project was True beforehand — it states the reverse returns every project to the off state rather than to its prior value. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
This flips the switch on the new async processing service (psv2) platform-wide. It does two things together: every project that already exists is switched over, and every project created from now on uses psv2 by default. Concretely, the
async_pipeline_workersfeature flag drives whether a project's ML jobs run on workers that pull tasks from the NATS queue (psv2) instead of the synchronous push API; this PR makes that the default and rolls it out to the back-catalogue so operators don't have to opt each project in by hand.We are flipping the default now because psv2 has proven substantially more reliable and much faster than the synchronous service in operator testing — on the order of two orders of magnitude higher throughput on large capture sets (an operational observation on production data, not a controlled benchmark). The remaining psv2 work is tracked but none of it blocks turning psv2 on by default; the open items are refinements, hardening, and follow-ups rather than correctness gates for the common path.
List of Changes
ProjectFeatureFlags.async_pipeline_workersdefault changed fromFalsetoTrueinami/main/models.py. No migration is needed: thefeature_flagsfield deconstructs by itsschema=class reference and default-factory reference, neither of which changes when a default inside the pydantic model changes —makemigrations --checkconfirms no model migration.0094_enable_async_pipeline_workersruns a single DB-sideUPDATE ... jsonb_set(...)that toggles only theasync_pipeline_workerskey in place for every existingProject, with aWHERE ... IS DISTINCT FROMguard so rows already at the target are skipped. Updating the one key server-side (rather than reading the whole JSON into Python and writing it back) leaves the other feature flags untouched even under a concurrent change, and runs as one statement instead of one save per row.Falsefor every project (a blanket disable). It does not restore per-project values from before the rollout — some projects may have been opted in individually beforehand, so the reverse returns every project to the off state. The reverse does not change the new model default; that is a code change, reverted by reverting the commit.Notes
async_pipeline_workers = Truewith the other defaults unchanged.Known follow-ups (PSv2) — none blocking this change
psv2 is the new async/distributed ML backend tracked under the umbrella issue #515 and the PSv2 label. The items below are known and open at the time of this change. None of them block making psv2 the default — they are hardening, performance, and feature-completeness follow-ups. Full lists: Antenna PSv2 issues · Antenna PSv2 PRs · ADC (ami-data-companion) PRs.
Reliability / job-state correctness (Antenna)
/nextfilter and starve newer jobs.max_deliverwithout posting results (mitigated by the stale-job reaper).Job.DoesNotExistproperly inprocess_source_images_async.Performance / scaling (Antenna)
process_nats_pipeline_result(expected 5–10× on the result path).Auth & permissions (Antenna + ADC)
Pipeline config & result contract (in flight)
PipelineRequestConfigParametersthrough pull-mode (NATS) tasks so workers honor per-request config.Docs, templates, and infra (in flight)
Summary by CodeRabbit
New Features
Bug Fixes