taixf/modules/antaf/antaf_llm.py

142 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
from config.logger import setup_logging
from core.providers.llm.base import LLMProviderBase
TAG = __name__
logger = setup_logging()
class LLMProvider(LLMProviderBase):
"""
蚂蚁阿福 LLM Provider
通过 Frida HTTP Bridge (port 18900) 对接蚂蚁阿福 App 的文字对话 API。
Bridge 运行在手机上,通过 adb forward 或网络暴露 SSE 流式接口。
"""
def __init__(self, config):
self.bridge_url = config.get("bridge_url", "http://127.0.0.1:18900")
self.timeout = config.get("timeout", 60)
self.should_idle = False # signal to send system idle after TTS
logger.bind(tag=TAG).info(
f"AntafLLM 初始化: bridge={self.bridge_url}, timeout={self.timeout}s"
)
@staticmethod
def _is_thinking(text):
"""检测蚂蚁阿福的内心思考/推理过程,这些不应该发给用户"""
thinking_patterns = [
"用户问", "用户说", "用户的", "用户可能", "用户真正",
"我得", "我会", "我在想", "我决定", "我要",
"语气比较", "感觉他", "让他知道", "让他觉得",
"先安抚", "得先", "不想表现",
"整体语气", "这样能", "这样他",
"所以我", "还带了个",
]
for p in thinking_patterns:
if p in text:
return True
return False
@staticmethod
def _clean_text(text):
"""清理阿福返回文本中的脏数据"""
# 去掉阿福内部状态文本
junk = [
"完成资料引用", "内容生成", "正在思考", "正在搜索",
]
for j in junk:
text = text.replace(j, "")
return text.strip()
@staticmethod
def _is_system_injected(content):
"""检测是否为系统注入的消息(非用户真实输入)"""
if not content:
return True
markers = [
"[系统提示]", "tool_call", "<tool_call>", "TOOL USE",
"系统提示", "工具调用", "function_call",
"handle_exit_intent", "你有以下工具", "You have access",
]
for m in markers:
if m in content:
return True
# 超过200字的 user 消息大概率是系统注入的
if len(content) > 200:
return True
return False
def response(self, session_id, dialogue, **kwargs):
# 从 dialogue 中提取真正的用户消息(跳过系统注入的 user 消息)
query = ""
for msg in reversed(dialogue):
if msg.get("role") == "user":
content = msg.get("content", "")
if not self._is_system_injected(content):
# ASR 结果可能是 JSON: {"content":"...", "language":"zh", "emotion":"..."}
try:
parsed = json.loads(content)
if isinstance(parsed, dict) and "content" in parsed:
query = parsed["content"]
else:
query = content
except (json.JSONDecodeError, TypeError):
query = content
break
if not query:
logger.bind(tag=TAG).warning("对话中没有用户消息")
yield "抱歉,我没有收到您的问题。"
return
self.should_idle = False
logger.bind(tag=TAG).info(f"AntafLLM 请求: {query[:50]}...")
try:
url = f"{self.bridge_url}/chat"
resp = requests.get(
url,
params={"q": query},
stream=True,
timeout=self.timeout,
)
resp.encoding = "utf-8"
seen_texts = set()
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
if not data or len(data.strip()) == 0:
continue
# 去重:跳过完全相同的文本块
if data in seen_texts:
continue
seen_texts.add(data)
# 过滤思考过程
if self._is_thinking(data):
logger.bind(tag=TAG).debug(f"过滤思考内容: {data[:50]}...")
continue
# 清理脏数据
data = self._clean_text(data)
if not data:
continue
yield data
except requests.exceptions.ConnectionError:
logger.bind(tag=TAG).error("无法连接蚂蚁阿福 Bridge请检查手机和 Frida 状态")
self.should_idle = True
yield "抱歉,蚂蚁阿福服务暂时不可用。"
except requests.exceptions.Timeout:
logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)")
self.should_idle = True
yield "抱歉,回答超时了。"
except Exception as e:
logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}")
self.should_idle = True
yield "抱歉,发生了错误。"