sglang.0.4.8.post1/meta_ui.py

200 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json, datetime, textwrap, requests, gradio as gr
from pathlib import Path
from collections import deque
import queue, threading, time
# ────────────────── 基础配置 ──────────────────
API_KEY = "token-abc123"
MODEL_PATH = Path("/root/.cradle/Alibaba/Qwen3-30B-A3B-Base")
def model_name(path: Path):
cfg = path / "config.json"
if cfg.exists():
data = json.load(cfg.open())
return data.get("architectures", [None])[0] or data.get("model_type") or path.name
return path.name
MODEL_NAME = model_name(MODEL_PATH)
now = lambda: datetime.datetime.now().strftime("%H:%M:%S")
# ────────────────── 日志队列 ──────────────────
LOG_Q: "queue.Queue[str]" = queue.Queue()
LOG_TXT = ""
def log(msg):
print(msg, flush=True)
LOG_Q.put(msg)
prev_log_value = ""
def consume_logs(dummy=None):
global LOG_TXT, prev_log_value
buf = deque(LOG_TXT.splitlines(), maxlen=400)
while not LOG_Q.empty():
buf.append(LOG_Q.get())
LOG_TXT = "\n".join(buf)
if LOG_TXT != prev_log_value:
prev_log_value = LOG_TXT
return gr.update(value=LOG_TXT)
return gr.update()
# ────────────────── 后端调用 ──────────────────
def backend(text, sampling, api_suffix):
url = f"http://localhost:30000{api_suffix}"
if api_suffix == "/generate":
payload = {"model": MODEL_NAME, "text": text, "sampling_params": sampling}
elif api_suffix == "/v1/completions":
payload = {
"model": MODEL_NAME,
"prompt": text,
**sampling
}
log(f"\n🟡 [{now()}] POST {url}\n{json.dumps(payload, ensure_ascii=False, indent=2)}")
try:
r = requests.post(url,
headers={"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"},
json=payload, timeout=180)
try:
data = r.json()
except Exception:
data = {}
if api_suffix == "/generate":
txt = data.get("text", "").strip()
meta = data.get("meta_info", {})
fr = meta.get("finish_reason")
ctok = meta.get("completion_tokens")
elif api_suffix == "/v1/completions":
choice = data.get("choices", [{}])[0]
txt = choice.get("text", "").strip()
fr = choice.get("finish_reason")
ctok = data.get("usage", {}).get("completion_tokens")
log(f"🟢 [{now()}] HTTP {r.status_code} tokens={ctok} finish={fr}\n"
f"🟢 resp800={r.text[:800]!r}")
if r.status_code != 200:
return f"[HTTP {r.status_code}] {r.text[:300]}"
return txt or "[⚠ 空]"
except Exception as e:
log(f"[❌ 请求异常] {e}")
return f"[❌ 请求异常] {e}"
# ────────────────── Chat 回调 ──────────────────
def chat(
user_msg, history,
max_new, temp, top_p, top_k,
rep_pen, pres_pen, stop_raw,
api_suffix, log_state
):
from queue import Queue, Empty
# 解析传入的 ChatInput 格式
user = user_msg["text"] if isinstance(user_msg, dict) and "text" in user_msg else user_msg
stop = [s.strip() for s in stop_raw.split(",") if s.strip()] or None
samp = {
("max_tokens" if api_suffix == "/v1/completions" else "max_new_tokens"): int(max_new),
"temperature": temp,
"top_p": top_p,
"top_k": int(top_k),
"repetition_penalty": rep_pen,
"presence_penalty": pres_pen,
**({"stop": stop} if stop else {})
}
result_q = Queue()
def worker():
out = backend(user, samp, api_suffix)
result_q.put(out)
# threading.Thread(target=worker).start()
thread = threading.Thread(target=worker, daemon=True)
yield "⏳ 正在生成中...", log_state
while True:
# ⚠️ 线程已结束且队列已空 → 直接 return 让生成器终止
if not thread.is_alive() and result_q.empty():
return # ← 新增这一行
try:
result = result_q.get(timeout=0.1)
except Empty:
continue
# 统一格式
if isinstance(result, str):
result = {"text": result}
elif not isinstance(result, dict) or "text" not in result:
result = {"text": str(result)}
yield result, log_state # 第一次真正把模型回复丢给前端
return # ← 把旧的 break 换成 return
# while True:
# try:
# result = result_q.get(timeout=0.1)
# # ★ 不论 /generate纯 str还是 /v1/completionsdict都转成 {"text": ...}
# if isinstance(result, str):
# result = {"text": result}
# elif not isinstance(result, dict) or "text" not in result:
# result = {"text": str(result)}
# yield result, log_state
# break
# except Empty:
# continue
# ────────────────── Gradio UI ──────────────────
with gr.Blocks(title="调试界面") as demo:
gr.Markdown(f"## 💬 调试界面 \n权重 **{MODEL_PATH.name}**")
with gr.Row():
api_choice = gr.Dropdown(choices=["/generate", "/v1/completions"],
value="/generate", label="选择推理接口")
with gr.Row():
max_new = gr.Slider(32, 32768, 128, label="max_new_tokens")
temp = gr.Slider(0, 1.5, 0.8, step=0.05, label="temperature")
with gr.Row():
top_p = gr.Slider(0, 1, 0.95, step=0.01, label="top_p")
top_k = gr.Slider(0, 200, 50, step=1, label="top_k")
with gr.Row():
rep_pen = gr.Slider(0.8, 2, 1.05, step=0.01, label="repetition_penalty")
pres_pen= gr.Slider(0, 2, 0.0, step=0.05, label="presence_penalty")
stop_txt = gr.Textbox("", label="stop 序列(逗号分隔)")
log_state = gr.State("")
dbg_chk = gr.Checkbox(label="📜 显示 Debug 面板", value=False)
log_box = gr.Textbox(label="实时日志", lines=20, interactive=False, visible=False)
chatbot = gr.ChatInterface(
fn=chat,
additional_inputs=[max_new, temp, top_p, top_k,
rep_pen, pres_pen, stop_txt,
api_choice, log_state],
additional_outputs=[log_state],
type="messages"
)
timer = gr.Timer(1.0, render=True)
timer.tick(
fn=consume_logs,
inputs=[],
outputs=[log_box],
)
log_state.change(lambda txt: gr.update(value=txt), log_state, log_box)
dbg_chk.change(lambda v: gr.update(visible=v), dbg_chk, log_box)
demo.launch(server_name="0.0.0.0", server_port=30001)