taixf/modules/antaf/antaf_llm.py

175 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
from config.logger import setup_logging
from core.providers.llm.base import LLMProviderBase
TAG = __name__
logger = setup_logging()
class LLMProvider(LLMProviderBase):
"""
蚂蚁阿福 LLM Provider
通过 Frida HTTP Bridge (port 18900) 对接蚂蚁阿福 App 的文字对话 API。
Bridge 运行在手机上,通过 adb forward 或网络暴露 SSE 流式接口。
"""
def __init__(self, config):
self.bridge_url = config.get("bridge_url", "http://127.0.0.1:18900")
self.timeout = config.get("timeout", 60)
self.should_idle = False # signal to send system idle after TTS
logger.bind(tag=TAG).info(
f"AntafLLM 初始化: bridge={self.bridge_url}, timeout={self.timeout}s"
)
@staticmethod
def _is_thinking(text):
"""检测蚂蚁阿福的内心思考/推理过程,这些不应该发给用户"""
thinking_patterns = [
"用户问", "用户说", "用户的", "用户可能", "用户真正", "用户让我",
"我得", "我会", "我在想", "我决定", "我要", "我需要确保", "我需要",
"语气比较", "感觉他", "让他知道", "让他觉得",
"先安抚", "得先", "不想表现",
"整体语气", "这样能", "这样他",
"所以我", "还带了个",
"首先记录", "是否有其他", "在这种情况下",
"必须遵循", "角色设定", "确保回复",
"避免任何生硬", "技术性的表达",
"提供的日期", "作为测试", "模拟使用",
"需要确认或澄清", "普遍适用性",
]
for p in thinking_patterns:
if p in text:
return True
return False
@staticmethod
def _clean_text(text):
"""清理阿福返回文本中的脏数据、链接、markdown、表格"""
import re
# 去掉阿福内部状态文本
junk = [
"完成资料引用", "内容生成", "正在思考", "正在搜索",
"开始获取资料", "找到资料",
]
for j in junk:
text = text.replace(j, "")
# Markdown链接 [文字](url) → 只保留文字
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
# 裸URL
text = re.sub(r'https?://\S+', '', text)
# Markdown加粗 **文字** → 文字(包括空的 ****
text = re.sub(r'\*{2,}([^*]*)\*{2,}', r'\1', text)
# Markdown斜体 *文字* → 文字
text = re.sub(r'\*([^*]+)\*', r'\1', text)
# 残留星号
text = text.replace('*', '')
# Markdown表格行 | xxx | xxx |
text = re.sub(r'\|[^|]*\|[^|]*\|[^|\n]*\|?', '', text)
# 表格对齐行 | :--- | :--- |
text = re.sub(r'\|\s*:?-+:?\s*(\|\s*:?-+:?\s*)+\|?', '', text)
# Markdown标题 ### 文字
text = re.sub(r'#{1,6}\s*', '', text)
# 人格替换
text = text.replace("蚂蚁阿福", "泰小虎")
text = text.replace("阿福", "泰小虎")
text = text.replace("蚂蚁集团", "星之鑫")
text = text.replace("蚂蚁", "星之鑫")
text = text.replace("支付宝", "星之鑫")
text = text.replace("健康是福", "健康是虎")
# 多余空格和空行
text = re.sub(r' +', ' ', text)
text = re.sub(r'\n{2,}', '\n', text)
return text.strip()
@staticmethod
def _is_system_injected(content):
"""检测是否为系统注入的消息(非用户真实输入)"""
if not content:
return True
markers = [
"[系统提示]", "tool_call", "<tool_call>", "TOOL USE",
"系统提示", "工具调用", "function_call",
"handle_exit_intent", "你有以下工具", "You have access",
]
for m in markers:
if m in content:
return True
# 超过200字的 user 消息大概率是系统注入的
if len(content) > 200:
return True
return False
def response(self, session_id, dialogue, **kwargs):
# 从 dialogue 中提取真正的用户消息(跳过系统注入的 user 消息)
query = ""
for msg in reversed(dialogue):
if msg.get("role") == "user":
content = msg.get("content", "")
if not self._is_system_injected(content):
# ASR 结果可能是 JSON: {"content":"...", "language":"zh", "emotion":"..."}
try:
parsed = json.loads(content)
if isinstance(parsed, dict) and "content" in parsed:
query = parsed["content"]
else:
query = content
except (json.JSONDecodeError, TypeError):
query = content
break
if not query:
logger.bind(tag=TAG).warning("对话中没有用户消息")
yield "抱歉,我没有收到您的问题。"
return
self.should_idle = False
logger.bind(tag=TAG).info(f"AntafLLM 请求: {query[:50]}...")
try:
url = f"{self.bridge_url}/chat"
resp = requests.get(
url,
params={"q": query},
stream=True,
timeout=self.timeout,
)
resp.encoding = "utf-8"
seen_texts = set()
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
if not data or len(data.strip()) == 0:
continue
# 去重:跳过完全相同的文本块
if data in seen_texts:
continue
seen_texts.add(data)
# 过滤思考过程
if self._is_thinking(data):
logger.bind(tag=TAG).debug(f"过滤思考内容: {data[:50]}...")
continue
# 清理脏数据
data = self._clean_text(data)
if not data:
continue
yield data
except requests.exceptions.ConnectionError:
logger.bind(tag=TAG).error("无法连接蚂蚁阿福 Bridge请检查手机和 Frida 状态")
self.should_idle = True
yield "抱歉,蚂蚁阿福服务暂时不可用。"
except requests.exceptions.Timeout:
logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)")
self.should_idle = True
yield "抱歉,回答超时了。"
except Exception as e:
logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}")
self.should_idle = True
yield "抱歉,发生了错误。"