fix(provider): 修复 DeepSeek V4 工具调用兼容问题#9014
Conversation
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The DeepSeek-specific logic for stripping or normalizing
tool_choicecurrently keys offmodelprefixes or theapi.deepseek.comhost; if only V4/reasoning models should omittool_choice, consider tightening this condition so non-reasoning DeepSeek models can still use explicittool_choicewhen supported. - Both
_queryand_query_streamduplicate the same steps to attach tools, splitpayloadsintoextra_body, mergecustom_extra_body, and sanitize messages; extracting this into a shared helper would reduce the chance of these paths diverging or missing future DeepSeek-specific tweaks. - The
_handle_api_errorbranch for DeepSeek thinking mode relies on matching the exact error message substring; if the API wording changes slightly this recovery will stop working, so consider making the check more robust (e.g., looser matching onthinking/reasoningandtool_choiceor using a structured error field if available).
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The DeepSeek-specific logic for stripping or normalizing `tool_choice` currently keys off `model` prefixes *or* the `api.deepseek.com` host; if only V4/reasoning models should omit `tool_choice`, consider tightening this condition so non-reasoning DeepSeek models can still use explicit `tool_choice` when supported.
- Both `_query` and `_query_stream` duplicate the same steps to attach tools, split `payloads` into `extra_body`, merge `custom_extra_body`, and sanitize messages; extracting this into a shared helper would reduce the chance of these paths diverging or missing future DeepSeek-specific tweaks.
- The `_handle_api_error` branch for DeepSeek thinking mode relies on matching the exact error message substring; if the API wording changes slightly this recovery will stop working, so consider making the check more robust (e.g., looser matching on `thinking`/`reasoning` and `tool_choice` or using a structured error field if available).
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/sources/deepseek_source.py" line_range="103-96" />
<code_context>
+
+ extra_body = {}
+
+ custom_extra_body = self.provider_config.get("custom_extra_body", {})
+ if isinstance(custom_extra_body, dict):
+ extra_body.update(custom_extra_body)
+
+ to_del = []
+ for key in payloads:
+ if key not in self.default_params:
+ extra_body[key] = payloads[key]
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The precedence of `custom_extra_body` vs payload-derived `extra_body` differs between `_query` and `_query_stream`, which may lead to inconsistent behavior.
In `_query`, `custom_extra_body` is applied after deriving `extra_body` from `payloads`, so it overrides payload-derived values. In `_query_stream`, `custom_extra_body` is applied first and then overwritten by non-default `payloads` keys. This discrepancy can lead to different behavior between streaming and non-streaming calls with the same config. If `custom_extra_body` is meant to take precedence, please align `_query_stream`’s update order with `_query`.
Suggested implementation:
```python
extra_body = {}
to_del = []
for key in payloads:
if key not in self.default_params:
extra_body[key] = payloads[key]
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
```
If `_query_stream` should also remove non-default keys from `payloads` (as `_query` does), ensure that in the same method you also:
1. Collect such keys into `to_del` (as shown in your other snippet), and
2. Delete them from `payloads` afterward with `for key in to_del: del payloads[key]`.
This deletion loop may already exist elsewhere in the method; if not, add it immediately after populating `to_del`.
</issue_to_address>
### Comment 2
<location path="astrbot/core/provider/sources/deepseek_source.py" line_range="84" />
<code_context>
+ image_fallback_used=image_fallback_used,
+ )
+
+ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+ if tools:
+ tool_list = tools.get_func_desc_openai_style(
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared request-building and chunk-handling helpers and making streaming responses stateless to reduce duplication and local complexity without changing behavior.
You can reduce the new complexity without changing behavior by extracting a few small helpers and avoiding the shared mutable `llm_response` in streaming.
### 1. Factor out shared request-building logic
Both `_query` and `_query_stream` do the same steps (tools, tool_choice, extra_body extraction, sanitization). Centralizing this will make future changes safer.
```python
def _build_deepseek_request(
self,
payloads: dict,
tools: ToolSet | None,
*,
stream: bool,
) -> tuple[dict, dict]:
# 1) tools
if tools:
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=False,
)
if tool_list:
payloads["tools"] = tool_list
# 2) tool_choice normalization
self._normalize_deepseek_tool_choice(payloads)
# 3) extra_body construction
extra_body = {}
# keep stream/non-stream behavior differences if needed
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
# move non-default params into extra_body
base_payloads, payload_extra_body = self._extract_extra_body(payloads)
extra_body.update(payload_extra_body)
# provider-specific overrides
self._apply_provider_specific_extra_body_overrides(extra_body)
# sanitize assistant messages on payloads we actually send
self._sanitize_assistant_messages(base_payloads)
return base_payloads, extra_body
```
Usage in `_query` and `_query_stream` stays very local:
```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
payloads, extra_body = self._build_deepseek_request(
payloads,
tools,
stream=False,
)
completion = await self.client.chat.completions.create(
**payloads,
stream=False,
extra_body=extra_body,
)
...
```
```python
async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
payloads, extra_body = self._build_deepseek_request(
payloads,
tools,
stream=True,
)
stream = await self.client.chat.completions.create(
**payloads,
stream=True,
extra_body=extra_body,
stream_options={"include_usage": True},
)
...
```
### 2. Simplify extra_body extraction
The `to_del` pattern can be replaced with a small helper that does the pruning once and clearly:
```python
def _extract_extra_body(self, payloads: dict) -> tuple[dict, dict]:
base_payloads = {}
extra_body = {}
for key, value in payloads.items():
if key in self.default_params:
base_payloads[key] = value
else:
extra_body[key] = value
return base_payloads, extra_body
```
This keeps your behavior (non-default params moved to `extra_body`) but removes the two-step delete loop.
### 3. Extract tool-call normalization
Move the DeepSeek-specific normalization out of the streaming loop to reduce noise and make intent clear:
```python
def _normalize_tool_calls(self, tool_calls) -> None:
if not tool_calls:
return
for idx, tc in enumerate(tool_calls):
if getattr(tc, "function", None) and getattr(tc.function, "arguments", None):
tc.type = "function"
if not hasattr(tc, "index") or tc.index is None:
tc.index = idx
```
Then in `_query_stream`:
```python
async for chunk in stream:
choice = chunk.choices[0] if chunk.choices else None
delta = choice.delta if choice else None
if delta and delta.tool_calls:
self._normalize_tool_calls(delta.tool_calls)
...
```
### 4. Make per-chunk streaming responses stateless
Instead of mutating one `LLMResponse` instance for every chunk, construct a fresh one per chunk and put the merging logic in a helper. This keeps the stream state (`ChatCompletionStreamState`) separate from chunk delivery.
```python
async def _chunk_to_llm_response(
self,
chunk,
choice,
delta,
tools: ToolSet | None,
) -> LLMResponse | None:
reasoning = self._extract_reasoning_content(chunk)
has_delta = bool(reasoning or (delta and delta.content))
if not has_delta and not chunk.usage and not getattr(choice, "usage", None):
return None
resp = LLMResponse("assistant", is_chunk=True)
resp.id = chunk.id
if reasoning is not None:
resp.reasoning_content = reasoning
if delta and delta.content:
completion_text = self._normalize_content(delta.content, strip=False)
resp.result_chain = MessageChain(
chain=[Comp.Plain(completion_text)],
)
if chunk.usage:
resp.usage = self._extract_usage(chunk.usage)
elif choice and (choice_usage := getattr(choice, "usage", None)):
resp.usage = self._extract_usage(choice_usage)
return resp
```
And `_query_stream` becomes easier to follow:
```python
async def _query_stream(...):
...
state = ChatCompletionStreamState()
async for chunk in stream:
choice = chunk.choices[0] if chunk.choices else None
delta = choice.delta if choice else None
if delta and delta.tool_calls:
self._normalize_tool_calls(delta.tool_calls)
if delta is not None or chunk.usage:
try:
state.handle_chunk(chunk)
except Exception as e:
logger.error("Saving chunk state error: " + str(e))
resp = await self._chunk_to_llm_response(chunk, choice, delta, tools)
if resp:
# if you still need to keep usage snapshot in state:
if choice and getattr(choice, "usage", None):
state.current_completion_snapshot.usage = choice.usage
yield resp
try:
final_completion = state.get_final_completion()
final_response = await self._parse_openai_completion(final_completion, tools)
yield final_response
except Exception as e:
logger.error("get_final_completion error: " + str(e))
return
```
This keeps all existing functionality (reasoning extraction, usage handling, `ChatCompletionStreamState` reconstruction) but removes the shared mutable `llm_response` and localizes the per-chunk logic.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| image_fallback_used=image_fallback_used, | ||
| ) | ||
|
|
||
| async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: |
There was a problem hiding this comment.
issue (complexity): Consider extracting shared request-building and chunk-handling helpers and making streaming responses stateless to reduce duplication and local complexity without changing behavior.
You can reduce the new complexity without changing behavior by extracting a few small helpers and avoiding the shared mutable llm_response in streaming.
1. Factor out shared request-building logic
Both _query and _query_stream do the same steps (tools, tool_choice, extra_body extraction, sanitization). Centralizing this will make future changes safer.
def _build_deepseek_request(
self,
payloads: dict,
tools: ToolSet | None,
*,
stream: bool,
) -> tuple[dict, dict]:
# 1) tools
if tools:
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=False,
)
if tool_list:
payloads["tools"] = tool_list
# 2) tool_choice normalization
self._normalize_deepseek_tool_choice(payloads)
# 3) extra_body construction
extra_body = {}
# keep stream/non-stream behavior differences if needed
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
# move non-default params into extra_body
base_payloads, payload_extra_body = self._extract_extra_body(payloads)
extra_body.update(payload_extra_body)
# provider-specific overrides
self._apply_provider_specific_extra_body_overrides(extra_body)
# sanitize assistant messages on payloads we actually send
self._sanitize_assistant_messages(base_payloads)
return base_payloads, extra_bodyUsage in _query and _query_stream stays very local:
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
payloads, extra_body = self._build_deepseek_request(
payloads,
tools,
stream=False,
)
completion = await self.client.chat.completions.create(
**payloads,
stream=False,
extra_body=extra_body,
)
...async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
payloads, extra_body = self._build_deepseek_request(
payloads,
tools,
stream=True,
)
stream = await self.client.chat.completions.create(
**payloads,
stream=True,
extra_body=extra_body,
stream_options={"include_usage": True},
)
...2. Simplify extra_body extraction
The to_del pattern can be replaced with a small helper that does the pruning once and clearly:
def _extract_extra_body(self, payloads: dict) -> tuple[dict, dict]:
base_payloads = {}
extra_body = {}
for key, value in payloads.items():
if key in self.default_params:
base_payloads[key] = value
else:
extra_body[key] = value
return base_payloads, extra_bodyThis keeps your behavior (non-default params moved to extra_body) but removes the two-step delete loop.
3. Extract tool-call normalization
Move the DeepSeek-specific normalization out of the streaming loop to reduce noise and make intent clear:
def _normalize_tool_calls(self, tool_calls) -> None:
if not tool_calls:
return
for idx, tc in enumerate(tool_calls):
if getattr(tc, "function", None) and getattr(tc.function, "arguments", None):
tc.type = "function"
if not hasattr(tc, "index") or tc.index is None:
tc.index = idxThen in _query_stream:
async for chunk in stream:
choice = chunk.choices[0] if chunk.choices else None
delta = choice.delta if choice else None
if delta and delta.tool_calls:
self._normalize_tool_calls(delta.tool_calls)
...4. Make per-chunk streaming responses stateless
Instead of mutating one LLMResponse instance for every chunk, construct a fresh one per chunk and put the merging logic in a helper. This keeps the stream state (ChatCompletionStreamState) separate from chunk delivery.
async def _chunk_to_llm_response(
self,
chunk,
choice,
delta,
tools: ToolSet | None,
) -> LLMResponse | None:
reasoning = self._extract_reasoning_content(chunk)
has_delta = bool(reasoning or (delta and delta.content))
if not has_delta and not chunk.usage and not getattr(choice, "usage", None):
return None
resp = LLMResponse("assistant", is_chunk=True)
resp.id = chunk.id
if reasoning is not None:
resp.reasoning_content = reasoning
if delta and delta.content:
completion_text = self._normalize_content(delta.content, strip=False)
resp.result_chain = MessageChain(
chain=[Comp.Plain(completion_text)],
)
if chunk.usage:
resp.usage = self._extract_usage(chunk.usage)
elif choice and (choice_usage := getattr(choice, "usage", None)):
resp.usage = self._extract_usage(choice_usage)
return respAnd _query_stream becomes easier to follow:
async def _query_stream(...):
...
state = ChatCompletionStreamState()
async for chunk in stream:
choice = chunk.choices[0] if chunk.choices else None
delta = choice.delta if choice else None
if delta and delta.tool_calls:
self._normalize_tool_calls(delta.tool_calls)
if delta is not None or chunk.usage:
try:
state.handle_chunk(chunk)
except Exception as e:
logger.error("Saving chunk state error: " + str(e))
resp = await self._chunk_to_llm_response(chunk, choice, delta, tools)
if resp:
# if you still need to keep usage snapshot in state:
if choice and getattr(choice, "usage", None):
state.current_completion_snapshot.usage = choice.usage
yield resp
try:
final_completion = state.get_final_completion()
final_response = await self._parse_openai_completion(final_completion, tools)
yield final_response
except Exception as e:
logger.error("get_final_completion error: " + str(e))
returnThis keeps all existing functionality (reasoning extraction, usage handling, ChatCompletionStreamState reconstruction) but removes the shared mutable llm_response and localizes the per-chunk logic.
There was a problem hiding this comment.
Code Review
This pull request introduces a dedicated deepseek_chat_completion provider adapter (ProviderDeepSeek) to handle DeepSeek-specific behaviors, such as omitting or normalizing tool_choice in thinking modes, and registers it in the provider manager and default configuration. The review feedback identifies a critical TypeError vulnerability in _query and _query_stream due to a missing request_max_retries parameter, which also bypasses the retry mechanism. Additionally, there is a potential issue where tool_choice could be re-added during retries in _handle_api_error. To address these issues and reduce code redundancy, the reviewer suggests refactoring the logic using a shared helper class DeepSeekPayloadDict to dynamically intercept and prevent tool_choice modifications.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: | ||
| if tools: | ||
| tool_list = tools.get_func_desc_openai_style( | ||
| omit_empty_parameter_field=False, | ||
| ) | ||
| if tool_list: | ||
| payloads["tools"] = tool_list | ||
|
|
||
| self._normalize_deepseek_tool_choice(payloads) | ||
|
|
||
| extra_body = {} | ||
| to_del = [] | ||
| for key in payloads: | ||
| if key not in self.default_params: | ||
| extra_body[key] = payloads[key] | ||
| to_del.append(key) | ||
| for key in to_del: | ||
| del payloads[key] | ||
|
|
||
| custom_extra_body = self.provider_config.get("custom_extra_body", {}) | ||
| if isinstance(custom_extra_body, dict): | ||
| extra_body.update(custom_extra_body) | ||
| self._apply_provider_specific_extra_body_overrides(extra_body) | ||
|
|
||
| self._sanitize_assistant_messages(payloads) | ||
|
|
||
| completion = await self.client.chat.completions.create( | ||
| **payloads, | ||
| stream=False, | ||
| extra_body=extra_body, | ||
| ) | ||
|
|
||
| if not isinstance(completion, ChatCompletion): | ||
| raise Exception( | ||
| f"API 返回的 completion 类型错误:{type(completion)}: {completion}。", | ||
| ) | ||
|
|
||
| logger.debug(f"completion: {completion}") | ||
|
|
||
| return await self._parse_openai_completion(completion, tools) | ||
|
|
||
| async def _query_stream( | ||
| self, | ||
| payloads: dict, | ||
| tools: ToolSet | None, | ||
| ) -> AsyncGenerator[LLMResponse, None]: | ||
| if tools: | ||
| tool_list = tools.get_func_desc_openai_style( | ||
| omit_empty_parameter_field=False, | ||
| ) | ||
| if tool_list: | ||
| payloads["tools"] = tool_list | ||
|
|
||
| self._normalize_deepseek_tool_choice(payloads) | ||
|
|
||
| extra_body = {} | ||
|
|
||
| custom_extra_body = self.provider_config.get("custom_extra_body", {}) | ||
| if isinstance(custom_extra_body, dict): | ||
| extra_body.update(custom_extra_body) | ||
|
|
||
| to_del = [] | ||
| for key in payloads: | ||
| if key not in self.default_params: | ||
| extra_body[key] = payloads[key] | ||
| to_del.append(key) | ||
| for key in to_del: | ||
| del payloads[key] | ||
| self._apply_provider_specific_extra_body_overrides(extra_body) | ||
| self._sanitize_assistant_messages(payloads) | ||
|
|
||
| stream = await self.client.chat.completions.create( | ||
| **payloads, | ||
| stream=True, | ||
| extra_body=extra_body, | ||
| stream_options={"include_usage": True}, | ||
| ) | ||
|
|
||
| llm_response = LLMResponse("assistant", is_chunk=True) | ||
| state = ChatCompletionStreamState() | ||
|
|
||
| async for chunk in stream: | ||
| choice = chunk.choices[0] if chunk.choices else None | ||
| delta = choice.delta if choice else None | ||
|
|
||
| if delta and (dtcs := delta.tool_calls): | ||
| for idx, tc in enumerate(dtcs): | ||
| if tc.function and tc.function.arguments: | ||
| tc.type = "function" | ||
| if not hasattr(tc, "index") or tc.index is None: | ||
| tc.index = idx | ||
|
|
||
| if delta is not None or chunk.usage: | ||
| try: | ||
| state.handle_chunk(chunk) | ||
| except Exception as e: | ||
| logger.error("Saving chunk state error: " + str(e)) | ||
|
|
||
| reasoning = self._extract_reasoning_content(chunk) | ||
| has_delta = False | ||
| llm_response.id = chunk.id | ||
| llm_response.reasoning_content = None | ||
| llm_response.completion_text = "" | ||
| if reasoning is not None: | ||
| llm_response.reasoning_content = reasoning | ||
| has_delta = True | ||
| if delta and delta.content: | ||
| completion_text = self._normalize_content(delta.content, strip=False) | ||
| llm_response.result_chain = MessageChain( | ||
| chain=[Comp.Plain(completion_text)], | ||
| ) | ||
| has_delta = True | ||
| if chunk.usage: | ||
| llm_response.usage = self._extract_usage(chunk.usage) | ||
| elif choice and (choice_usage := getattr(choice, "usage", None)): | ||
| llm_response.usage = self._extract_usage(choice_usage) | ||
| state.current_completion_snapshot.usage = choice_usage | ||
| if has_delta: | ||
| yield llm_response | ||
|
|
||
| try: | ||
| final_completion = state.get_final_completion() | ||
| llm_response = await self._parse_openai_completion(final_completion, tools) | ||
| yield llm_response | ||
| except Exception as e: | ||
| logger.error("get_final_completion error: " + str(e)) | ||
| return |
There was a problem hiding this comment.
🔴 严重缺陷:缺少 request_max_retries 参数导致 TypeError 崩溃 & 丢失重试机制
当前重写的 _query 和 _query_stream 方法存在以下严重问题:
- 运行时崩溃 (TypeError):基类
ProviderOpenAIOfficial中的text_chat和text_chat_stream在调用_query/_query_stream时会传入request_max_retries关键字参数。由于当前重写的方法签名中缺少该参数,运行时会直接抛出TypeError: _query() got an unexpected keyword argument 'request_max_retries'导致请求崩溃。 - 丢失重试机制:当前实现中直接调用了
self.client.chat.completions.create,而没有使用retry_provider_request包装,导致 DeepSeek 适配器完全失去了配置中指定的请求重试能力。 - 代码冗余:复制了基类中大量的流式处理和错误处理逻辑,不利于后续维护。
💡 优雅的解决方案
我们可以通过继承基类的 _query 和 _query_stream,并使用一个自定义的 dict 子类(DeepSeekPayloadDict)来动态拦截并阻止 tool_choice 的写入。为了避免代码重复,应将 DeepSeekPayloadDict 提取为共享的辅助类,而不是在每个方法中重复定义。
class DeepSeekPayloadDict(dict):
def __setitem__(self, key, value):
if key == "tool_choice":
return
super().__setitem__(key, value)
async def _query(
self,
payloads: dict,
tools: ToolSet | None,
*,
request_max_retries: int | None = None,
) -> LLMResponse:
if self._deepseek_omits_tool_choice(payloads):
self._normalize_deepseek_tool_choice(payloads)
payloads = DeepSeekPayloadDict(payloads)
return await super()._query(payloads, tools, request_max_retries=request_max_retries)
async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
*,
request_max_retries: int | None = None,
) -> AsyncGenerator[LLMResponse, None]:
if self._deepseek_omits_tool_choice(payloads):
self._normalize_deepseek_tool_choice(payloads)
payloads = DeepSeekPayloadDict(payloads)
async for response in super()._query_stream(payloads, tools, request_max_retries=request_max_retries):
yield responseReferences
- When implementing similar functionality for different cases, refactor the logic into a shared helper function to avoid code duplication.
说明
修复 DeepSeek V4 思考模式下的工具调用兼容性问题。
当前 DeepSeek V4 在思考模式下会拒绝带有
tool_choice的请求,请求会返回类似下面的错误:Thinking mode does not support this tool_choice这会导致 AstrBot 在开启工具调用时,DeepSeek V4 模型请求直接失败。
改动
deepseek_chat_completion提供商适配器。tools,移除tool_choice,交给 DeepSeek 使用默认工具选择策略。deepseek_chat_completion。deepseek_chat_completion。设计考虑
这次没有继续往通用
openai_source.py里叠加 DeepSeek V4 的特殊分支,而是单独增加 DeepSeek source,尽量把供应商特定兼容逻辑收敛在自己的 adapter 里,减少对通用 OpenAI 兼容层的侵入。验证
python -m py_compile astrbot/core/provider/sources/deepseek_source.py astrbot/core/provider/manager.py astrbot/core/config/default.pyuv run ruff check astrbot/core/provider/sources/deepseek_source.py astrbot/core/provider/manager.py astrbot/core/config/default.pytools,并移除tool_choice。Summary by Sourcery
Introduce a dedicated DeepSeek chat completion provider to fix tool-calling compatibility issues in DeepSeek V4 reasoning mode and make it the default DeepSeek adapter.
New Features:
Bug Fixes:
Enhancements: