Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from configuration import configuration
from constants import (
ENDPOINT_PATH_STREAMING_QUERY,
INTERRUPTED_RESPONSE_MESSAGE,
LLM_TOKEN_EVENT,
LLM_TOOL_CALL_EVENT,
LLM_TOOL_RESULT_EVENT,
Expand Down Expand Up @@ -122,6 +121,7 @@
validate_shield_ids_override,
)
from utils.stream_interrupts import (
build_interrupted_response,
deregister_stream,
persist_interrupted_turn,
register_interrupt_callback,
Expand Down Expand Up @@ -634,16 +634,22 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi
current_task = asyncio.current_task()
if current_task is not None:
current_task.uncancel()
full_text, suffix = build_interrupted_response(turn_summary.partial_tokens)
if not persist_guard[0]:
persist_guard[0] = True
turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE
turn_summary.llm_response = full_text
Comment thread
coderabbitai[bot] marked this conversation as resolved.
await persist_interrupted_turn(
context,
responses_params,
turn_summary,
_background_topic_summary_tasks,
original_input,
)
yield stream_event(
{"id": turn_summary.next_chunk_id, "token": suffix},
LLM_TOKEN_EVENT,
context.query_request.media_type or MEDIA_TYPE_JSON,
)
yield stream_interrupted_event(context.request_id)
finally:
deregister_stream(context.request_id)
Expand Down Expand Up @@ -774,6 +780,7 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
media_type,
)
chunk_id += 1
turn_summary.next_chunk_id = chunk_id

# Store MCP call item info for later lookup when arguments.done event occurs
elif event_type == "response.output_item.added":
Expand All @@ -789,6 +796,7 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
elif event_type == "response.output_text.delta":
delta_chunk = cast(TextDeltaChunk, chunk)
text_parts.append(delta_chunk.delta)
turn_summary.partial_tokens.append(delta_chunk.delta)
yield stream_event(
{
"id": chunk_id,
Expand All @@ -798,6 +806,7 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
media_type,
)
chunk_id += 1
turn_summary.next_chunk_id = chunk_id

# Final text of the output (capture, but emit at response.completed)
elif event_type == "response.output_text.done":
Expand Down Expand Up @@ -886,6 +895,7 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
media_type,
)
chunk_id += 1
turn_summary.next_chunk_id = chunk_id

# Incomplete or failed response - emit error
elif event_type in ("response.incomplete", "response.failed"):
Expand Down
2 changes: 1 addition & 1 deletion src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
UNABLE_TO_PROCESS_RESPONSE: Final[str] = "Unable to process this request"

# Response stored in the conversation when the user interrupts a streaming request
INTERRUPTED_RESPONSE_MESSAGE: Final[str] = "You interrupted this request."
INTERRUPTED_RESPONSE_MESSAGE: Final[str] = "Response stopped by the user."

# Max seconds to wait for topic summary in background task after interrupt persist.
TOPIC_SUMMARY_INTERRUPT_TIMEOUT_SECONDS: Final[float] = 30.0
Expand Down
10 changes: 10 additions & 0 deletions src/models/common/turn_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ class TurnSummary(BaseModel):
description="Structured response output items, captured for compacted-mode "
"turn persistence (LCORE-1572). Empty on the non-compacted path.",
)
partial_tokens: list[str] = Field(
default_factory=list,
description="Accumulated text deltas during streaming, used to reconstruct "
"partial content on interruption.",
)
next_chunk_id: int = Field(
default=0,
description="Next monotonic SSE chunk index, kept in sync with the inner "
"generator so the interrupt handler can emit a sequentially valid id.",
)


class ToolInfoSummary(BaseModel):
Expand Down
15 changes: 13 additions & 2 deletions src/utils/agents/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)

from configuration import configuration
from constants import INTERRUPTED_RESPONSE_MESSAGE, MEDIA_TYPE_JSON
from constants import MEDIA_TYPE_JSON
from log import get_logger
from models.common.agents import (
AgentTurnAccumulator,
Expand Down Expand Up @@ -65,6 +65,7 @@
maybe_get_topic_summary,
)
from utils.stream_interrupts import (
build_interrupted_response,
deregister_stream,
persist_interrupted_turn,
register_interrupt_callback,
Expand Down Expand Up @@ -197,16 +198,23 @@ async def generate_agent_response(
current_task = asyncio.current_task()
if current_task is not None:
current_task.uncancel()
full_text, suffix = build_interrupted_response(turn_summary.partial_tokens)
if not persist_guard[0]:
persist_guard[0] = True
turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE
turn_summary.llm_response = full_text
await persist_interrupted_turn(
context,
responses_params,
turn_summary,
background_topic_summary_tasks,
original_input,
)
yield serialize_event(
TokenStreamPayload.create(
chunk_id=turn_summary.next_chunk_id, token=suffix
),
media_type,
)
yield serialize_event(
InterruptedStreamPayload.create(request_id=context.request_id),
media_type,
Expand Down Expand Up @@ -347,11 +355,13 @@ def _process_token(
Token stream payload containing the emitted token chunk.
"""
state.text_parts.append(text)
state.turn_summary.partial_tokens.append(text)
payload = TokenStreamPayload.create(
chunk_id=state.chunk_id,
token=text,
)
state.chunk_id += 1
state.turn_summary.next_chunk_id = state.chunk_id
return payload


Expand Down Expand Up @@ -402,6 +412,7 @@ def _(
token=final_text,
)
state.chunk_id += 1
state.turn_summary.next_chunk_id = state.chunk_id
return payload


Expand Down
105 changes: 105 additions & 0 deletions src/utils/markdown_repair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Utilities for repairing truncated markdown content.

Used when a streaming response is interrupted mid-content to close
any open markdown constructs (code fences, HTML block tags) that
would otherwise break rendering.
"""

import re
from typing import Final

BLOCK_HTML_TAGS: Final[frozenset[str]] = frozenset(
{
"div",
"table",
"tr",
"td",
"th",
"thead",
"tbody",
"details",
"summary",
"pre",
}
)

_FENCE_RE: Final[re.Pattern[str]] = re.compile(r"^(\s{0,3})((`{3,})|(~{3,}))")
_TAG_RE: Final[re.Pattern[str]] = re.compile(r"<(/?)(\w+)([^>]*?)(/?)>")


def _process_html_tags(line: str, html_stack: list[str]) -> None:
"""Update *html_stack* with block-level HTML open/close tags found in *line*.

Parameters:
line: A single line of text to scan for HTML tags.
html_stack: Mutable stack tracking open block-level tags.
"""
Comment on lines +31 to +36

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win

Align new function docstrings with required Google docstring sections.

The new docstrings should use the repository’s required Google format (Args, Returns, Raises) for consistency.

As per coding guidelines, "Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes."

Also applies to: 53-64

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/markdown_repair.py` around lines 31 - 36, The docstring for the
function starting at line 31 uses "Parameters:" section header, but Google
Python docstring conventions require "Args:" instead. Update the docstring
header from "Parameters:" to "Args:" in the function at line 31-36, and apply
the same fix to the other docstring mentioned at lines 53-64. Additionally,
review both docstrings to ensure they include all required Google format
sections (Args, Returns, Raises) where applicable based on what each function
actually does.

Source: Coding guidelines

for tag_match in _TAG_RE.finditer(line):
is_closing = tag_match.group(1) == "/"
tag_name = tag_match.group(2).lower()
is_self_closing = tag_match.group(4) == "/"

if tag_name not in BLOCK_HTML_TAGS or is_self_closing:
continue

if is_closing:
if html_stack and html_stack[-1] == tag_name:
html_stack.pop()
else:
html_stack.append(tag_name)

Comment on lines +30 to +50

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win

Avoid in-place mutation of function parameters in _process_html_tags.

_process_html_tags currently mutates html_stack directly. Return an updated stack instead so the helper remains side-effect explicit and easier to reason about.

As per coding guidelines, "Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters."

Proposed refactor
-def _process_html_tags(line: str, html_stack: list[str]) -> None:
+def _process_html_tags(line: str, html_stack: list[str]) -> list[str]:
@@
-    for tag_match in _TAG_RE.finditer(line):
+    updated_stack = [*html_stack]
+    for tag_match in _TAG_RE.finditer(line):
@@
-        if is_closing:
-            if html_stack and html_stack[-1] == tag_name:
-                html_stack.pop()
+        if is_closing:
+            if updated_stack and updated_stack[-1] == tag_name:
+                updated_stack.pop()
         else:
-            html_stack.append(tag_name)
+            updated_stack.append(tag_name)
+    return updated_stack
@@
-                _process_html_tags(line, html_stack)
+                html_stack = _process_html_tags(line, html_stack)

Also applies to: 77-79

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/markdown_repair.py` around lines 30 - 50, The function
`_process_html_tags` currently mutates its `html_stack` parameter in-place using
`.pop()` and `.append()` operations. Refactor this function to return the
updated stack instead of modifying the parameter directly. Change the function
signature to return a list (the updated html_stack) and update all callers to
capture the returned value rather than relying on side effects. This applies to
both the main function definition and any other similar patterns mentioned at
lines 77-79.

Source: Coding guidelines


def close_open_markdown(text: str) -> str:
"""Return a suffix that closes any open markdown constructs in *text*.

Scans for unclosed fenced code blocks and unclosed HTML block-level
tags. Returns only the closing characters (callers append the result).
Returns an empty string when nothing needs closing.

Parameters:
text: Partial markdown content that may contain open constructs.

Returns:
A suffix string to append that closes open constructs.
"""
if not text or not text.strip():
return ""

lines = text.split("\n")
in_code_fence = False
fence_char = ""
fence_len = 0
html_stack: list[str] = []

for line in lines:
fence_match = _FENCE_RE.match(line)
if not fence_match:
if not in_code_fence:
_process_html_tags(line, html_stack)
continue

group_3 = fence_match.group(3)
group_4 = fence_match.group(4)
matched_group = group_3 or group_4
char = "`" if group_3 else "~"
if not in_code_fence:
in_code_fence = True
fence_char = char
fence_len = len(matched_group)
elif (
char == fence_char
and len(matched_group) >= fence_len
and line[fence_match.end() :].strip(" \t") == ""
):
in_code_fence = False
Comment thread
coderabbitai[bot] marked this conversation as resolved.
fence_char = ""
fence_len = 0

suffix_parts: list[str] = []
if in_code_fence:
suffix_parts.append(f"\n{fence_char * fence_len}")

for tag in reversed(html_stack):
suffix_parts.append(f"\n</{tag}>")

return "".join(suffix_parts)
30 changes: 27 additions & 3 deletions src/utils/stream_interrupts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from models.common.responses.types import ResponseInput
from models.common.turn_summary import TurnSummary
from utils.conversations import append_turn_items_to_conversation
from utils.markdown_repair import close_open_markdown
from utils.query import store_query_results, update_conversation_topic_summary
from utils.responses import get_topic_summary
from utils.shields import append_turn_to_conversation
Expand Down Expand Up @@ -215,6 +216,28 @@ async def background_update_topic_summary(
)


def build_interrupted_response(partial_tokens: list[str]) -> tuple[str, str]:
"""Build the final interrupted response text from accumulated tokens.

Joins partial tokens, repairs any open markdown constructs, and appends
an italicized interruption indicator.

Parameters:
partial_tokens: List of text deltas accumulated during streaming.

Returns:
A tuple of (full_response_text, suffix_to_emit) where full_response_text
is the complete message for persistence and suffix_to_emit is the new
content to send as a final SSE token event.
"""
Comment on lines +219 to +232

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win

Use required Google-style docstring sections in build_interrupted_response.

Please update the new helper docstring to the required convention (Args, Returns, Raises) for consistency with repository standards.

As per coding guidelines, "Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/stream_interrupts.py` around lines 219 - 232, The docstring for the
build_interrupted_response function uses "Parameters:" and "Returns:" sections,
but the repository standard requires Google-style docstring format with "Args",
"Returns", and "Raises" sections. Update the docstring by renaming the
"Parameters:" section to "Args:" to match the required convention. Additionally,
add a "Raises:" section if the function can raise any exceptions during
execution, following the repository's Google Python docstring conventions.

Source: Coding guidelines

partial_text = "".join(partial_tokens)
repaired_text = close_open_markdown(partial_text)
interrupted_indicator = f"\n\n*{INTERRUPTED_RESPONSE_MESSAGE}*"
suffix = repaired_text + interrupted_indicator
final_text = partial_text + suffix
return final_text, suffix


async def persist_interrupted_turn(
context: ResponseGeneratorContext,
responses_params: ResponsesApiParams,
Expand Down Expand Up @@ -251,7 +274,7 @@ async def persist_interrupted_turn(
original_input,
[
OpenAIResponseMessage(
role="assistant", content=INTERRUPTED_RESPONSE_MESSAGE
role="assistant", content=turn_summary.llm_response
)
],
)
Expand All @@ -260,7 +283,7 @@ async def persist_interrupted_turn(
context.client,
responses_params.conversation,
cast(str, responses_params.input),
INTERRUPTED_RESPONSE_MESSAGE,
turn_summary.llm_response,
)
except Exception: # pylint: disable=broad-except
logger.exception(
Expand Down Expand Up @@ -342,7 +365,8 @@ async def _on_interrupt() -> None:
if guard[0]:
return
guard[0] = True
turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE
full_text, _ = build_interrupted_response(turn_summary.partial_tokens)
turn_summary.llm_response = full_text
await persist_interrupted_turn(
context,
responses_params,
Expand Down
8 changes: 6 additions & 2 deletions tests/unit/app/endpoints/test_streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
)
from configuration import AppConfig
from constants import (
INTERRUPTED_RESPONSE_MESSAGE,
MEDIA_TYPE_JSON,
MEDIA_TYPE_TEXT,
)
Expand All @@ -70,6 +71,8 @@
from utils.stream_interrupts import StreamInterruptRegistry
from utils.token_counter import TokenCounter

INTERRUPTED_INDICATOR = f"\n\n*{INTERRUPTED_RESPONSE_MESSAGE}*"

MOCK_AUTH_STREAMING = (
"00000001-0001-0001-0001-000000000001",
"mock_username",
Expand Down Expand Up @@ -1379,6 +1382,7 @@ async def mock_generator() -> AsyncIterator[str]:
result.append(item)

assert any("start" in item for item in result)
assert any('"event": "token"' in item for item in result)
assert any('"event": "interrupted"' in item for item in result)
assert not any('"event": "end"' in item for item in result)
consume_query_tokens_mock.assert_not_called()
Expand All @@ -1387,13 +1391,13 @@ async def mock_generator() -> AsyncIterator[str]:
mock_context.client,
existing_conv_id,
"test",
"You interrupted this request.",
INTERRUPTED_INDICATOR,
)
store_query_results_mock.assert_called_once()
call_kwargs = store_query_results_mock.call_args[1]
assert call_kwargs["user_id"] == "user_123"
assert call_kwargs["conversation_id"] == existing_conv_id
assert call_kwargs["summary"].llm_response == "You interrupted this request."
assert call_kwargs["summary"].llm_response == INTERRUPTED_INDICATOR
assert call_kwargs["topic_summary"] is None

isolate_stream_interrupt_registry.deregister_stream.assert_called_once_with(
Expand Down
Loading
Loading