sglang_v0.5.2/sglang/test/srt/test_reasoning_parser.py

604 lines
25 KiB
Python

import unittest
from sglang.srt.parser.reasoning_parser import (
BaseReasoningFormatDetector,
DeepSeekR1Detector,
KimiDetector,
Qwen3Detector,
ReasoningParser,
StreamingParseResult,
)
from sglang.test.test_utils import CustomTestCase
class TestStreamingParseResult(CustomTestCase):
def test_init_default(self):
"""Test default initialization of StreamingParseResult."""
result = StreamingParseResult()
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
def test_init_with_values(self):
"""Test initialization with specific values."""
result = StreamingParseResult("normal", "reasoning")
self.assertEqual(result.normal_text, "normal")
self.assertEqual(result.reasoning_text, "reasoning")
class TestBaseReasoningFormatDetector(CustomTestCase):
def setUp(self):
self.detector = BaseReasoningFormatDetector(
think_start_token="<think>",
think_end_token="</think>",
force_reasoning=False,
stream_reasoning=True,
)
def test_init(self):
"""Test initialization of BaseReasoningFormatDetector."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
self.assertEqual(self.detector._buffer, "")
self.assertFalse(self.detector.stripped_think_start)
def test_detect_and_parse_normal_text(self):
"""Test parsing normal text without reasoning."""
text = "This is normal text"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
def test_detect_and_parse_with_start_token(self):
"""Test parsing text starting with think token."""
text = "<think>This is reasoning"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is reasoning")
self.assertEqual(result.normal_text, "")
def test_detect_and_parse_complete_reasoning(self):
"""Test parsing complete reasoning block."""
text = "<think>This is reasoning</think>This is normal"
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This is reasoning")
self.assertEqual(result.normal_text, "This is normal")
def test_detect_and_parse_force_reasoning(self):
"""Test forced reasoning mode."""
detector = BaseReasoningFormatDetector(
"<think>", "</think>", force_reasoning=True
)
text = "This should be reasoning"
result = detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "This should be reasoning")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_normal(self):
"""Test streaming parse of normal text."""
result = self.detector.parse_streaming_increment("Hello world")
self.assertEqual(result.normal_text, "Hello world")
self.assertEqual(result.reasoning_text, "")
def test_parse_streaming_increment_partial_token(self):
"""Test streaming parse with partial token."""
# Test partial start token
result = self.detector.parse_streaming_increment("<thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
# Reset detector and test partial end token when in reasoning mode
detector = BaseReasoningFormatDetector("<think>", "</think>")
detector._in_reasoning = True
result = detector.parse_streaming_increment("</thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
def test_parse_streaming_increment_complete_start(self):
"""Test streaming parse with complete start token."""
result = self.detector.parse_streaming_increment("<think>")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
self.assertTrue(self.detector._in_reasoning)
self.assertTrue(self.detector.stripped_think_start)
def test_parse_streaming_increment_reasoning_content(self):
"""Test streaming parse of reasoning content."""
# First add start token
self.detector.parse_streaming_increment("<think>")
# Then add reasoning content
result = self.detector.parse_streaming_increment("reasoning content")
self.assertEqual(result.reasoning_text, "reasoning content")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_end_token(self):
"""Test streaming parse with end token."""
# Start reasoning mode
self.detector.parse_streaming_increment("<think>")
self.detector.parse_streaming_increment("reasoning")
# End reasoning - the reasoning content accumulated in previous calls is cleared when end token is found
result = self.detector.parse_streaming_increment("</think>normal text")
self.assertEqual(result.reasoning_text, "") # Buffer cleared, returns empty
self.assertEqual(result.normal_text, "normal text")
self.assertFalse(self.detector._in_reasoning)
def test_parse_streaming_increment_no_stream_reasoning(self):
"""Test streaming parse without streaming reasoning."""
detector = BaseReasoningFormatDetector(
"<think>", "</think>", stream_reasoning=False
)
# Start reasoning mode
detector.parse_streaming_increment("<think>")
# Add reasoning content - should not return content
result = detector.parse_streaming_increment("reasoning content")
self.assertEqual(result.reasoning_text, "")
self.assertEqual(result.normal_text, "")
def test_parse_streaming_increment_mixed_content(self):
"""Test streaming parse with mixed content in one chunk."""
result = self.detector.parse_streaming_increment(
"<think>reasoning</think>normal"
)
self.assertEqual(result.reasoning_text, "reasoning")
self.assertEqual(result.normal_text, "normal")
class TestDeepSeekR1Detector(CustomTestCase):
def setUp(self):
self.detector = DeepSeekR1Detector()
def test_init(self):
"""Test DeepSeekR1Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertTrue(self.detector._in_reasoning) # force_reasoning=True
self.assertTrue(self.detector.stream_reasoning)
def test_init_no_stream_reasoning(self):
"""Test DeepSeekR1Detector with stream_reasoning=False."""
detector = DeepSeekR1Detector(stream_reasoning=False)
self.assertFalse(detector.stream_reasoning)
def test_detect_and_parse_r1_format(self):
"""Test parsing DeepSeek-R1 format."""
text = "I need to think about this. The answer is 42."
result = self.detector.detect_and_parse(text)
# Should be treated as reasoning because force_reasoning=True
self.assertEqual(
result.reasoning_text, "I need to think about this. The answer is 42."
)
self.assertEqual(result.normal_text, "")
def test_detect_and_parse_with_end_token(self):
"""Test parsing with end token."""
text = "I think this is the answer</think>The final answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "I think this is the answer")
self.assertEqual(result.normal_text, "The final answer is 42.")
def test_detect_and_parse_with_start_token(self):
"""Test parsing deepseek-ai/DeepSeek-R1-0528 format, which generates the <think> token."""
text = "<think>I need to think about this.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
# Should be treated as reasoning because force_reasoning=True
self.assertEqual(result.reasoning_text, "I need to think about this.")
self.assertEqual(result.normal_text, "The answer is 42.")
class TestQwen3Detector(CustomTestCase):
def setUp(self):
self.detector = Qwen3Detector()
def test_init(self):
"""Test Qwen3Detector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertFalse(self.detector._in_reasoning) # force_reasoning=False
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_qwen3_format(self):
"""Test parsing Qwen3 format."""
text = "<think>Let me think about this problem</think>The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "Let me think about this problem")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_without_thinking(self):
"""Test parsing without thinking (enable_thinking=False case)."""
text = "Direct answer without thinking."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
class TestQwen3ForcedReasoningDetector(CustomTestCase):
def setUp(self):
self.detector = Qwen3Detector(force_reasoning=True)
def test_init(self):
"""Test Qwen3ForcedReasoningDetector initialization."""
self.assertEqual(self.detector.think_start_token, "<think>")
self.assertEqual(self.detector.think_end_token, "</think>")
self.assertTrue(self.detector._in_reasoning) # force_reasoning=True
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_qwen3_forced_reasoning_format(self):
"""Test parsing Qwen3-ForcedReasoning format (no <think> start tag)."""
text = "I need to think about this step by step.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(
result.reasoning_text, "I need to think about this step by step."
)
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_with_start_token(self):
"""Test parsing Qwen3-ForcedReasoning with optional <think> start tag."""
text = "<think>I need to think about this.</think>The answer is 42."
result = self.detector.detect_and_parse(text)
# Should work because base class logic handles both force_reasoning=True OR start token
self.assertEqual(result.reasoning_text, "I need to think about this.")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_streaming_qwen3_forced_reasoning_format(self):
"""Test streaming parse of Qwen3-ForcedReasoning format."""
# First chunk without <think> start
result = self.detector.parse_streaming_increment("I need to")
self.assertEqual(result.reasoning_text, "I need to")
self.assertEqual(result.normal_text, "")
# More reasoning content
result = self.detector.parse_streaming_increment(" think about this.")
self.assertEqual(result.reasoning_text, " think about this.")
self.assertEqual(result.normal_text, "")
# End token with normal text
result = self.detector.parse_streaming_increment("</think>The answer is 42.")
self.assertEqual(result.reasoning_text, "") # Buffer cleared
self.assertEqual(result.normal_text, "The answer is 42.")
class TestKimiDetector(CustomTestCase):
def setUp(self):
self.detector = KimiDetector()
def test_init(self):
"""Test KimiDetector initialization."""
self.assertEqual(self.detector.think_start_token, "◁think▷")
self.assertEqual(self.detector.think_end_token, "◁/think▷")
self.assertFalse(self.detector._in_reasoning)
self.assertTrue(self.detector.stream_reasoning)
def test_detect_and_parse_kimi_format(self):
"""Test parsing Kimi format."""
text = "◁think▷Let me consider this carefully◁/think▷The answer is 42."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.reasoning_text, "Let me consider this carefully")
self.assertEqual(result.normal_text, "The answer is 42.")
def test_detect_and_parse_kimi_no_thinking(self):
"""Test parsing Kimi format without thinking."""
text = "Direct answer without thinking tokens."
result = self.detector.detect_and_parse(text)
self.assertEqual(result.normal_text, text)
self.assertEqual(result.reasoning_text, "")
def test_streaming_kimi_format(self):
"""Test streaming parse of Kimi format."""
# Test partial token
result = self.detector.parse_streaming_increment("◁thi")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
# Complete start token
result = self.detector.parse_streaming_increment("nk▷Start")
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "Start")
self.assertTrue(self.detector._in_reasoning)
# Add reasoning content
result = self.detector.parse_streaming_increment("thinking...")
self.assertEqual(result.reasoning_text, "thinking...")
self.assertEqual(result.normal_text, "")
# End token - reasoning content is cleared when end token is processed
result = self.detector.parse_streaming_increment("◁/think▷answer")
self.assertEqual(result.reasoning_text, "") # Buffer cleared
self.assertEqual(result.normal_text, "answer")
class TestReasoningParser(CustomTestCase):
def test_init_valid_model(self):
"""Test initialization with valid model types."""
parser = ReasoningParser("deepseek-r1")
self.assertIsInstance(parser.detector, DeepSeekR1Detector)
parser = ReasoningParser("qwen3")
self.assertIsInstance(parser.detector, Qwen3Detector)
parser = ReasoningParser("kimi")
self.assertIsInstance(parser.detector, KimiDetector)
def test_init_invalid_model(self):
"""Test initialization with invalid model type."""
with self.assertRaises(ValueError) as context:
ReasoningParser("invalid-model")
self.assertIn("Unsupported model type", str(context.exception))
def test_init_no_model(self):
"""Test initialization without model type."""
with self.assertRaises(ValueError) as context:
ReasoningParser(None)
self.assertEqual(str(context.exception), "Model type must be specified")
def test_parse_non_stream(self):
"""Test non-streaming parsing."""
parser = ReasoningParser("qwen3")
reasoning, normal = parser.parse_non_stream(
"<think>Let me think</think>The answer is 42."
)
self.assertEqual(reasoning, "Let me think")
self.assertEqual(normal, "The answer is 42.")
def test_parse_stream_chunk(self):
"""Test streaming chunk parsing."""
parser = ReasoningParser("qwen3")
# First chunk with start token
reasoning, normal = parser.parse_stream_chunk("<think>")
self.assertEqual(reasoning, "")
self.assertEqual(normal, "")
# Second chunk with reasoning content
reasoning, normal = parser.parse_stream_chunk("thinking...")
self.assertEqual(reasoning, "thinking...")
self.assertEqual(normal, "")
# Third chunk with end token and normal text
reasoning, normal = parser.parse_stream_chunk("</think>answer")
self.assertEqual(reasoning, "") # Buffer cleared when end token processed
self.assertEqual(normal, "answer")
def test_case_insensitive_model_type(self):
"""Test case insensitive model type matching."""
parser1 = ReasoningParser("DeepSeek-R1")
parser2 = ReasoningParser("QWEN3")
parser3 = ReasoningParser("Kimi")
self.assertIsInstance(parser1.detector, DeepSeekR1Detector)
self.assertIsInstance(parser2.detector, Qwen3Detector)
self.assertIsInstance(parser3.detector, KimiDetector)
def test_stream_reasoning_parameter(self):
"""Test stream_reasoning parameter is passed correctly."""
parser = ReasoningParser("qwen3", stream_reasoning=False)
self.assertFalse(parser.detector.stream_reasoning)
parser = ReasoningParser("qwen3", stream_reasoning=True)
self.assertTrue(parser.detector.stream_reasoning)
class TestIntegrationScenarios(CustomTestCase):
"""Integration tests for realistic usage scenarios."""
def test_deepseek_r1_complete_response(self):
"""Test complete DeepSeek-R1 response parsing."""
parser = ReasoningParser("deepseek-r1")
text = "I need to solve this step by step. First, I'll analyze the problem. The given equation is x + 2 = 5. To solve for x, I subtract 2 from both sides: x = 5 - 2 = 3.</think>The answer is x = 3."
reasoning, normal = parser.parse_non_stream(text)
self.assertIn("step by step", reasoning)
self.assertIn(
"= 3", reasoning
) # The reasoning contains "x = 5 - 2 = 3" which has "= 3"
self.assertEqual(normal, "The answer is x = 3.")
def test_qwen3_streaming_scenario(self):
"""Test Qwen3 streaming scenario."""
parser = ReasoningParser("qwen3")
chunks = [
"<think>",
"Let me analyze this problem.",
" I need to consider multiple factors.",
"</think>",
"Based on my analysis, the solution is to use a different approach.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("multiple factors", all_reasoning)
self.assertIn("different approach", all_normal)
def test_kimi_streaming_scenario(self):
"""Test Kimi streaming scenario."""
parser = ReasoningParser("kimi")
chunks = [
"◁thi",
"nk▷",
"Let me analyze this problem.",
" I need to consider multiple factors.",
"◁/th",
"ink▷",
"The answer is 42.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("multiple factors", all_reasoning)
self.assertIn("42", all_normal)
def test_empty_reasoning_blocks(self):
"""Test handling of empty reasoning blocks."""
parser = ReasoningParser("qwen3")
text = "<think></think>Just the answer."
reasoning, normal = parser.parse_non_stream(text)
self.assertEqual(reasoning, "")
self.assertEqual(normal, "Just the answer.")
def test_qwen3_forced_reasoning_complete_response(self):
"""Test complete Qwen3-ForcedReasoning response parsing."""
parser = ReasoningParser("qwen3", force_reasoning=True)
text = "Let me solve this step by step. The equation is x + 2 = 5. Subtracting 2 from both sides gives x = 3.</think>The solution is x = 3."
reasoning, normal = parser.parse_non_stream(text)
self.assertIn("step by step", reasoning)
self.assertIn("x = 3", reasoning)
self.assertEqual(normal, "The solution is x = 3.")
def test_qwen3_forced_reasoning_streaming_scenario(self):
"""Test Qwen3-ForcedReasoning streaming scenario."""
parser = ReasoningParser("qwen3", force_reasoning=True)
chunks = [
"I need to analyze",
" this problem carefully.",
" Let me break it down.",
"</think>",
"The final answer is 42.",
]
all_reasoning = ""
all_normal = ""
for chunk in chunks:
reasoning, normal = parser.parse_stream_chunk(chunk)
all_reasoning += reasoning
all_normal += normal
self.assertIn("analyze", all_reasoning)
self.assertIn("break it down", all_reasoning)
self.assertIn("final answer", all_normal)
class TestBufferLossBugFix(CustomTestCase):
"""Test cases for the buffer loss bug fix in parse_streaming_increment."""
def test_partial_end_tag_buffer_loss_bug(self):
"""
Test the bug where partial end tag fragments are lost when followed by normal text.
Bug scenario:
1. _in_reasoning is False
2. new_text is "</" (part of closing thinking tag)
3. Fragment is stored in buffer and empty string is returned
4. Next step: new_text is "answer", _in_reasoning still False
5. Buffer is cleared and "answer" is returned directly
6. The "</" from previous step is lost
This test verifies the fix where line 108 was changed from:
return StreamingParseResult(normal_text=new_text)
to:
return StreamingParseResult(normal_text=current_text)
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Step 1: Send partial end tag when not in reasoning mode
# This should be buffered since it could be start of "</think>"
result1 = detector.parse_streaming_increment("</")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Step 2: Send normal text that doesn't complete the end tag
# Before fix: would return only "answer", losing the "</"
# After fix: should return the complete buffered content "</answer"
result2 = detector.parse_streaming_increment("answer")
self.assertEqual(result2.normal_text, "</answer")
self.assertEqual(result2.reasoning_text, "")
def test_partial_start_tag_buffer_preservation(self):
"""
Test that partial start tag fragments are properly preserved.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Send partial start tag
result1 = detector.parse_streaming_increment("<th")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Complete with non-matching text
result2 = detector.parse_streaming_increment("is is text")
self.assertEqual(result2.normal_text, "<this is text")
self.assertEqual(result2.reasoning_text, "")
def test_partial_end_tag_in_reasoning_mode(self):
"""
Test partial end tag handling when already in reasoning mode.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Enter reasoning mode
detector.parse_streaming_increment("<think>")
detector.parse_streaming_increment("some reasoning")
# Send partial end tag
result1 = detector.parse_streaming_increment("</")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
# Complete the end tag with normal text
result2 = detector.parse_streaming_increment("think>normal text")
self.assertEqual(result2.normal_text, "normal text")
# The reasoning text should be empty since buffer was cleared when end tag was processed
self.assertEqual(result2.reasoning_text, "")
def test_multiple_partial_fragments(self):
"""
Test handling of multiple partial fragments that don't match any tokens.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Send multiple partial fragments
result1 = detector.parse_streaming_increment("<")
self.assertEqual(result1.normal_text, "")
self.assertEqual(result1.reasoning_text, "")
result2 = detector.parse_streaming_increment("/")
self.assertEqual(result2.normal_text, "")
self.assertEqual(result2.reasoning_text, "")
result3 = detector.parse_streaming_increment("random>")
self.assertEqual(result3.normal_text, "</random>")
self.assertEqual(result3.reasoning_text, "")
def test_edge_case_exact_token_match(self):
"""
Test edge case where buffer content exactly matches a token.
"""
detector = BaseReasoningFormatDetector("<think>", "</think>")
# Build up the exact start token character by character
detector.parse_streaming_increment("<")
detector.parse_streaming_increment("t")
detector.parse_streaming_increment("h")
detector.parse_streaming_increment("i")
detector.parse_streaming_increment("n")
result = detector.parse_streaming_increment("k>")
# Should enter reasoning mode
self.assertEqual(result.normal_text, "")
self.assertEqual(result.reasoning_text, "")
self.assertTrue(detector._in_reasoning)
self.assertTrue(detector.stripped_think_start)
if __name__ == "__main__":
unittest.main()