import json import requests from config.logger import setup_logging from core.providers.llm.base import LLMProviderBase TAG = __name__ logger = setup_logging() class LLMProvider(LLMProviderBase): """ 蚂蚁阿福 LLM Provider 通过 Frida HTTP Bridge (port 18900) 对接蚂蚁阿福 App 的文字对话 API。 Bridge 运行在手机上,通过 adb forward 或网络暴露 SSE 流式接口。 """ def __init__(self, config): self.bridge_url = config.get("bridge_url", "http://127.0.0.1:18900") self.timeout = config.get("timeout", 60) self.should_idle = False # signal to send system idle after TTS logger.bind(tag=TAG).info( f"AntafLLM 初始化: bridge={self.bridge_url}, timeout={self.timeout}s" ) @staticmethod def _is_thinking(text): """检测蚂蚁阿福的内心思考/推理过程,这些不应该发给用户""" thinking_patterns = [ "用户问", "用户说", "用户的", "用户可能", "用户真正", "我得", "我会", "我在想", "我决定", "我要", "语气比较", "感觉他", "让他知道", "让他觉得", "先安抚", "得先", "不想表现", "整体语气", "这样能", "这样他", "所以我", "还带了个", ] for p in thinking_patterns: if p in text: return True return False @staticmethod def _clean_text(text): """清理阿福返回文本中的脏数据、链接、markdown、表格""" import re # 去掉阿福内部状态文本 junk = [ "完成资料引用", "内容生成", "正在思考", "正在搜索", "开始获取资料", "找到资料", ] for j in junk: text = text.replace(j, "") # Markdown链接 [文字](url) → 只保留文字 text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) # 裸URL text = re.sub(r'https?://\S+', '', text) # Markdown加粗 **文字** → 文字(包括空的 ****) text = re.sub(r'\*{2,}([^*]*)\*{2,}', r'\1', text) # Markdown斜体 *文字* → 文字 text = re.sub(r'\*([^*]+)\*', r'\1', text) # 残留星号 text = text.replace('*', '') # Markdown表格行 | xxx | xxx | text = re.sub(r'\|[^|]*\|[^|]*\|[^|\n]*\|?', '', text) # 表格对齐行 | :--- | :--- | text = re.sub(r'\|\s*:?-+:?\s*(\|\s*:?-+:?\s*)+\|?', '', text) # Markdown标题 ### 文字 text = re.sub(r'#{1,6}\s*', '', text) # 多余空格和空行 text = re.sub(r' +', ' ', text) text = re.sub(r'\n{2,}', '\n', text) return text.strip() @staticmethod def _is_system_injected(content): """检测是否为系统注入的消息(非用户真实输入)""" if not content: return True markers = [ "[系统提示]", "tool_call", "", "TOOL USE", "系统提示", "工具调用", "function_call", "handle_exit_intent", "你有以下工具", "You have access", ] for m in markers: if m in content: return True # 超过200字的 user 消息大概率是系统注入的 if len(content) > 200: return True return False def response(self, session_id, dialogue, **kwargs): # 从 dialogue 中提取真正的用户消息(跳过系统注入的 user 消息) query = "" for msg in reversed(dialogue): if msg.get("role") == "user": content = msg.get("content", "") if not self._is_system_injected(content): # ASR 结果可能是 JSON: {"content":"...", "language":"zh", "emotion":"..."} try: parsed = json.loads(content) if isinstance(parsed, dict) and "content" in parsed: query = parsed["content"] else: query = content except (json.JSONDecodeError, TypeError): query = content break if not query: logger.bind(tag=TAG).warning("对话中没有用户消息") yield "抱歉,我没有收到您的问题。" return self.should_idle = False logger.bind(tag=TAG).info(f"AntafLLM 请求: {query[:50]}...") try: url = f"{self.bridge_url}/chat" resp = requests.get( url, params={"q": query}, stream=True, timeout=self.timeout, ) resp.encoding = "utf-8" seen_texts = set() for line in resp.iter_lines(decode_unicode=True): if not line: continue if line.startswith("data: "): data = line[6:] if data == "[DONE]": break if not data or len(data.strip()) == 0: continue # 去重:跳过完全相同的文本块 if data in seen_texts: continue seen_texts.add(data) # 过滤思考过程 if self._is_thinking(data): logger.bind(tag=TAG).debug(f"过滤思考内容: {data[:50]}...") continue # 清理脏数据 data = self._clean_text(data) if not data: continue yield data except requests.exceptions.ConnectionError: logger.bind(tag=TAG).error("无法连接蚂蚁阿福 Bridge,请检查手机和 Frida 状态") self.should_idle = True yield "抱歉,蚂蚁阿福服务暂时不可用。" except requests.exceptions.Timeout: logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)") self.should_idle = True yield "抱歉,回答超时了。" except Exception as e: logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}") self.should_idle = True yield "抱歉,发生了错误。"