import json, datetime, textwrap, requests, gradio as gr from pathlib import Path from collections import deque import queue, threading, time # ────────────────── 基础配置 ────────────────── API_KEY = "token-abc123" MODEL_PATH = Path("/root/.cradle/Alibaba/Qwen3-30B-A3B-Base") def model_name(path: Path): cfg = path / "config.json" if cfg.exists(): data = json.load(cfg.open()) return data.get("architectures", [None])[0] or data.get("model_type") or path.name return path.name MODEL_NAME = model_name(MODEL_PATH) now = lambda: datetime.datetime.now().strftime("%H:%M:%S") # ────────────────── 日志队列 ────────────────── LOG_Q: "queue.Queue[str]" = queue.Queue() LOG_TXT = "" def log(msg): print(msg, flush=True) LOG_Q.put(msg) prev_log_value = "" def consume_logs(dummy=None): global LOG_TXT, prev_log_value buf = deque(LOG_TXT.splitlines(), maxlen=400) while not LOG_Q.empty(): buf.append(LOG_Q.get()) LOG_TXT = "\n".join(buf) if LOG_TXT != prev_log_value: prev_log_value = LOG_TXT return gr.update(value=LOG_TXT) return gr.update() # ────────────────── 后端调用 ────────────────── def backend(text, sampling, api_suffix): url = f"http://localhost:30000{api_suffix}" if api_suffix == "/generate": payload = {"model": MODEL_NAME, "text": text, "sampling_params": sampling} elif api_suffix == "/v1/completions": payload = { "model": MODEL_NAME, "prompt": text, **sampling } log(f"\n🟡 [{now()}] POST {url}\n{json.dumps(payload, ensure_ascii=False, indent=2)}") try: r = requests.post(url, headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, json=payload, timeout=180) try: data = r.json() except Exception: data = {} if api_suffix == "/generate": txt = data.get("text", "").strip() meta = data.get("meta_info", {}) fr = meta.get("finish_reason") ctok = meta.get("completion_tokens") elif api_suffix == "/v1/completions": choice = data.get("choices", [{}])[0] txt = choice.get("text", "").strip() fr = choice.get("finish_reason") ctok = data.get("usage", {}).get("completion_tokens") log(f"🟢 [{now()}] HTTP {r.status_code} tokens={ctok} finish={fr}\n" f"🟢 resp800={r.text[:800]!r}") if r.status_code != 200: return f"[HTTP {r.status_code}] {r.text[:300]}" return txt or "[⚠ 空]" except Exception as e: log(f"[❌ 请求异常] {e}") return f"[❌ 请求异常] {e}" # ────────────────── Chat 回调 ────────────────── def chat( user_msg, history, max_new, temp, top_p, top_k, rep_pen, pres_pen, stop_raw, api_suffix, log_state ): from queue import Queue, Empty # 解析传入的 ChatInput 格式 user = user_msg["text"] if isinstance(user_msg, dict) and "text" in user_msg else user_msg stop = [s.strip() for s in stop_raw.split(",") if s.strip()] or None samp = { ("max_tokens" if api_suffix == "/v1/completions" else "max_new_tokens"): int(max_new), "temperature": temp, "top_p": top_p, "top_k": int(top_k), "repetition_penalty": rep_pen, "presence_penalty": pres_pen, **({"stop": stop} if stop else {}) } result_q = Queue() def worker(): out = backend(user, samp, api_suffix) result_q.put(out) # threading.Thread(target=worker).start() thread = threading.Thread(target=worker, daemon=True) thread.start() yield "⏳ 正在生成中...", log_state while True: # ⚠️ 线程已结束且队列已空 → 直接 return 让生成器终止 if not thread.is_alive() and result_q.empty(): break # return # ← 新增这一行 try: result = result_q.get(timeout=0.1) except Empty: continue # 统一格式 if isinstance(result, str): result = {"text": result} elif not isinstance(result, dict) or "text" not in result: result = {"text": str(result)} yield result, log_state # 第一次真正把模型回复丢给前端 return # ← 把旧的 break 换成 return # while True: # try: # result = result_q.get(timeout=0.1) # # ★ 不论 /generate(纯 str)还是 /v1/completions(dict),都转成 {"text": ...} # if isinstance(result, str): # result = {"text": result} # elif not isinstance(result, dict) or "text" not in result: # result = {"text": str(result)} # yield result, log_state # break # except Empty: # continue # ────────────────── Gradio UI ────────────────── with gr.Blocks(title="调试界面") as demo: gr.Markdown(f"## 💬 调试界面 \n权重 **{MODEL_PATH.name}**") with gr.Row(): api_choice = gr.Dropdown(choices=["/generate", "/v1/completions"], value="/generate", label="选择推理接口") with gr.Row(): max_new = gr.Slider(32, 32768, 128, label="max_new_tokens") temp = gr.Slider(0, 1.5, 0.8, step=0.05, label="temperature") with gr.Row(): top_p = gr.Slider(0, 1, 0.95, step=0.01, label="top_p") top_k = gr.Slider(0, 200, 50, step=1, label="top_k") with gr.Row(): rep_pen = gr.Slider(0.8, 2, 1.05, step=0.01, label="repetition_penalty") pres_pen= gr.Slider(0, 2, 0.0, step=0.05, label="presence_penalty") stop_txt = gr.Textbox("", label="stop 序列(逗号分隔)") log_state = gr.State("") dbg_chk = gr.Checkbox(label="📜 显示 Debug 面板", value=False) log_box = gr.Textbox(label="实时日志", lines=20, interactive=False, visible=False) chatbot = gr.ChatInterface( fn=chat, additional_inputs=[max_new, temp, top_p, top_k, rep_pen, pres_pen, stop_txt, api_choice, log_state], additional_outputs=[log_state], type="messages" ) timer = gr.Timer(1.0, render=True) timer.tick( fn=consume_logs, inputs=[], outputs=[log_box], ) log_state.change(lambda txt: gr.update(value=txt), log_state, log_box) dbg_chk.change(lambda v: gr.update(visible=v), dbg_chk, log_box) demo.launch(server_name="0.0.0.0", server_port=30001)