Compare commits

..

9 Commits
v3.0.0 ... main

Author SHA1 Message Date
hailin 04b6a5c3a8 . 2025-09-26 15:25:16 +08:00
hailin a53178dacb . 2025-09-26 15:19:40 +08:00
hailin 2d06f0ab90 . 2025-09-25 13:47:11 +08:00
hailin 43111064cc . 2025-09-24 20:25:25 +08:00
hailin 5a633d4b1c . 2025-09-24 19:57:13 +08:00
hailin 12ec3aa1f4 . 2025-09-24 18:17:18 +08:00
hailin c89eda419d . 2025-09-24 17:39:50 +08:00
hailin 4ff11c2ef7 . 2025-09-24 17:35:16 +08:00
hailin fd9971f1c8 . 2025-09-24 17:30:31 +08:00
6 changed files with 172 additions and 41 deletions

View File

@ -2,8 +2,8 @@ WANDB_BASE_URL=https://wandb.szaiai.com
WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1 WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1
WANDB_PROJECT=ds-qwen3 WANDB_PROJECT=ds-qwen3
WANDB_ENTITY=hailin WANDB_ENTITY=hailin
WANDB_GROUP=q3-32b-ds4-2025-09-05 WANDB_GROUP=q3-32b-ds4-2025-09-24
WANDB_NAME=q3-32b-lr2e-5-train3 WANDB_NAME=q3-32b-lr2e-5-train1
WANDB_RESUME=allow WANDB_RESUME=allow
WANDB_INIT_TIMEOUT=300 WANDB_INIT_TIMEOUT=300
WANDB_DIR=/tmp/$USER/wandb WANDB_DIR=/tmp/$USER/wandb

View File

@ -2,8 +2,8 @@
set -euo pipefail set -euo pipefail
# ===== 可调参数 ===== # ===== 可调参数 =====
CKPT_ROOT="/home/test/checkpoints/q3-32b-lora" # 若实际是 .../checkpoint-62/global_step62请把 CKPT_ROOT 改成 .../checkpoint-62 CKPT_ROOT="/home/test/checkpoints/q3-32b-ds4/checkpoint-60" # 若实际是 .../checkpoint-62/global_step62请把 CKPT_ROOT 改成 .../checkpoint-62
TAG="global_step30" TAG="global_step60"
HOSTS=(tn01 tn02 tn03 tn04 tn05 tn06) HOSTS=(tn01 tn02 tn03 tn04 tn05 tn06)
AGGREGATOR_HOST="tn06" # 本脚本运行/汇总所在机器 AGGREGATOR_HOST="tn06" # 本脚本运行/汇总所在机器
EXPECTED_SHARDS_PER_HOST=4 # 每机应写出分片数(按你的并行布局) EXPECTED_SHARDS_PER_HOST=4 # 每机应写出分片数(按你的并行布局)

View File

@ -1,6 +1,13 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -euo pipefail set -euo pipefail
# ==== 0) 日志文件 ====
LOG_DIR="/home/test/logs"
mkdir -p "$LOG_DIR"
LOG_FILE="${LOG_DIR}/train_$(date +%F_%H%M%S)_$$.log"
export PYTHONUNBUFFERED=1
echo ">> logging to ${LOG_FILE}"
# 1) 可选:若含 API Key保护权限 # 1) 可选:若含 API Key保护权限
[ -f .deepspeed_env ] && chmod 600 .deepspeed_env [ -f .deepspeed_env ] && chmod 600 .deepspeed_env
@ -9,12 +16,6 @@ set -a
. ./.deepspeed_env . ./.deepspeed_env
set +a set +a
# unset PYTHONNOUSERSITE
# USER_SITE=$(python3 -c 'import site;print(site.getusersitepackages())')
# export PATH="$HOME/.local/bin:$PATH"
# export PYTHONPATH="$USER_SITE:/home/test/jd_train:${PYTHONPATH:-}"
# 统一环境(会被 deepspeed 的 ssh 继承到各节点) # 统一环境(会被 deepspeed 的 ssh 继承到各节点)
unset DS_BUILD_OPS DS_SKIP_CUDA_BUILD PYTHONNOUSERSITE unset DS_BUILD_OPS DS_SKIP_CUDA_BUILD PYTHONNOUSERSITE
export TORCH_EXTENSIONS_DIR=/tmp/$USER/torch_ext export TORCH_EXTENSIONS_DIR=/tmp/$USER/torch_ext
@ -25,6 +26,8 @@ export OMP_NUM_THREADS=8
export MKL_NUM_THREADS=8 export MKL_NUM_THREADS=8
export OPENBLAS_NUM_THREADS=8 export OPENBLAS_NUM_THREADS=8
# ==== 3) 运行并同步到文件 + 控制台(保留退出码)====
set +e
FORCE_COLOR=1 deepspeed --hostfile hostfile \ FORCE_COLOR=1 deepspeed --hostfile hostfile \
--num_nodes 6 --num_gpus 4 \ --num_nodes 6 --num_gpus 4 \
/home/test/jd_train/train_sft_ds.py \ /home/test/jd_train/train_sft_ds.py \
@ -35,7 +38,7 @@ FORCE_COLOR=1 deepspeed --hostfile hostfile \
--per_device_train_batch_size 1 \ --per_device_train_batch_size 1 \
--gradient_accumulation_steps 1 \ --gradient_accumulation_steps 1 \
--learning_rate 2e-5 --weight_decay 0.1 --warmup_ratio 0.02 \ --learning_rate 2e-5 --weight_decay 0.1 --warmup_ratio 0.02 \
--max_steps 20 \ --max_steps 300 \
--log_interval 1 \ --log_interval 1 \
--gradient_checkpointing \ --gradient_checkpointing \
--bf16 \ --bf16 \
@ -43,5 +46,13 @@ FORCE_COLOR=1 deepspeed --hostfile hostfile \
--report_to wandb \ --report_to wandb \
--wandb_project ds-qwen3 \ --wandb_project ds-qwen3 \
--eval_steps 10 \ --eval_steps 10 \
--eval_data_glob "/home/test/datasets/my_corpus/test.jsonl" --save_steps 10 \
--load_best_model_at_end \
--early_stopping_patience 5 \
--early_stopping_threshold 0.0 \
--metric_for_best_model eval_loss \
--eval_data_glob "/home/test/datasets/my_corpus/test.jsonl" \
2>&1 | tee -a "$LOG_FILE"
DS_RC=${PIPESTATUS[0]}
set -e
exit "$DS_RC"

View File

@ -0,0 +1 @@
pdsh -R ssh -w tn[01-05] 'git -C /home/test/jd_train pull --ff-only'

View File

@ -27,7 +27,8 @@ from transformers import (
) )
from transformers.trainer_callback import TrainerCallback from transformers.trainer_callback import TrainerCallback
from transformers.trainer_utils import get_last_checkpoint from transformers.trainer_utils import get_last_checkpoint
from torch.optim import AdamW as TorchAdamW # from torch.optim import AdamW as TorchAdamW
from transformers import EarlyStoppingCallback
# ==== make sure CLI ninja/nvcc are reachable even in non-interactive ssh ==== # ==== make sure CLI ninja/nvcc are reachable even in non-interactive ssh ====
import site, shutil import site, shutil
@ -72,23 +73,23 @@ if shutil.which("ninja") is None:
os.environ["USE_NINJA"] = "0" os.environ["USE_NINJA"] = "0"
print("[env] no CLI ninja on PATH -> USE_NINJA=0 fallback", flush=True) print("[env] no CLI ninja on PATH -> USE_NINJA=0 fallback", flush=True)
# 4) 立即验证 ninja 与 CPUAdam 的 JIT若这里失败日志会第一时间告诉你是哪台/哪 rank 环境不对) # # 4) 立即验证 ninja 与 CPUAdam 的 JIT若这里失败日志会第一时间告诉你是哪台/哪 rank 环境不对)
try: # try:
from deepspeed.ops.op_builder import CPUAdamBuilder # from deepspeed.ops.op_builder import CPUAdamBuilder
CPUAdamBuilder().load() # CPUAdamBuilder().load()
print("[env] CPUAdamBuilder JIT OK", flush=True) # print("[env] CPUAdamBuilder JIT OK", flush=True)
except Exception as e: # except Exception as e:
# ninja 可执行找不到时走兜底:禁用 ninja用 setuptools 构建(首次会慢一点,但必过) # # ninja 可执行找不到时走兜底:禁用 ninja用 setuptools 构建(首次会慢一点,但必过)
if "Ninja is required to load C++ extensions" in str(e): # if "Ninja is required to load C++ extensions" in str(e):
os.environ["USE_NINJA"] = "0" # os.environ["USE_NINJA"] = "0"
print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True) # print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True)
from deepspeed.ops.op_builder import CPUAdamBuilder # from deepspeed.ops.op_builder import CPUAdamBuilder
CPUAdamBuilder().load() # CPUAdamBuilder().load()
print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True) # print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True)
else: # else:
import socket # import socket
print(f"[env][host={socket.gethostname()} RANK={os.environ.get('RANK','?')}] PRE-JIT FAILED: {e}", flush=True) # print(f"[env][host={socket.gethostname()} RANK={os.environ.get('RANK','?')}] PRE-JIT FAILED: {e}", flush=True)
raise # raise
# ----------------- 进程工具 ----------------- # ----------------- 进程工具 -----------------
@ -126,7 +127,31 @@ class DebugTrainer(Trainer):
return super().training_step(model, inputs) return super().training_step(model, inputs)
# return super().training_step(model, inputs, num_items_in_batch) # return super().training_step(model, inputs, num_items_in_batch)
def get_train_dataloader(self):
dl = super().get_train_dataloader()
if getattr(self.args, "max_steps", 0) and self.args.max_steps > 0:
RepeatingLoader = None
try:
from deepspeed.utils import RepeatingLoader
except Exception:
try:
from deepspeed.runtime.dataloader import RepeatingLoader
except Exception:
RepeatingLoader = None
if RepeatingLoader is not None:
return RepeatingLoader(dl)
else:
# 纯 Python 兜底
def _infinite(loader):
while True:
for batch in loader:
yield batch
return _infinite(dl)
return dl
# ----------------- 日志回调 ----------------- # ----------------- 日志回调 -----------------
class CsvLossLogger(TrainerCallback): class CsvLossLogger(TrainerCallback):
def __init__(self, csv_path: str): def __init__(self, csv_path: str):
@ -177,7 +202,7 @@ class CsvLossLogger(TrainerCallback):
f"{cur},{logs.get('loss','')},{logs.get('learning_rate','')},{logs.get('total_flos','')}\n" f"{cur},{logs.get('loss','')},{logs.get('learning_rate','')},{logs.get('total_flos','')}\n"
) )
from typing import List, Tuple, Iterable, Iterator, Dict # from typing import List, Tuple, Iterable, Iterator, Dict
# ----------------- 仅监督 assistant 内容token-id 级,不用 offsets ----------------- # ----------------- 仅监督 assistant 内容token-id 级,不用 offsets -----------------
class QwenChatSFTDataset(IterableDataset): class QwenChatSFTDataset(IterableDataset):
@ -444,6 +469,17 @@ def parse_args():
ap.add_argument("--eval_steps", type=int, default=10, ap.add_argument("--eval_steps", type=int, default=10,
help="Evaluate every N optimizer steps when eval_dataset is provided") help="Evaluate every N optimizer steps when eval_dataset is provided")
ap.add_argument("--load_best_model_at_end", action="store_true",
help="训练结束时自动加载最优 checkpoint")
ap.add_argument("--metric_for_best_model", type=str, default="eval_loss",
help="用哪个指标选最优,默认 eval_loss")
ap.add_argument("--greater_is_better", action="store_true",
help="是否指标越大越好eval_loss 用 False默认不传即可")
ap.add_argument("--early_stopping_patience", type=int, default=0,
help=">0 启用早停;单位是 eval 轮次数(非 step 数)")
ap.add_argument("--early_stopping_threshold", type=float, default=0.0,
help="改进阈值0 表示严格变好才算改进")
return ap.parse_args() return ap.parse_args()
@ -560,6 +596,26 @@ def main():
dbg(f"HfDeepSpeedConfig loaded from {src}") dbg(f"HfDeepSpeedConfig loaded from {src}")
# ---- DeepSpeed JIT 预检测:仅在启用 DS 时检查 CPUAdam更稳
if use_ds:
try:
from deepspeed.ops.op_builder import CPUAdamBuilder
try:
CPUAdamBuilder().load()
print("[env] CPUAdamBuilder JIT OK", flush=True)
except Exception as e:
if "Ninja is required to load C++ extensions" in str(e):
os.environ["USE_NINJA"] = "0"
print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True)
CPUAdamBuilder().load()
print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True)
else:
print(f"[env] CPUAdamBuilder pre-JIT failed: {e}", flush=True)
raise
except ImportError:
print("[env] DeepSpeed not installed; skip CPUAdam pre-JIT", flush=True)
if args.report_to == "wandb": if args.report_to == "wandb":
os.environ.setdefault("WANDB_PROJECT", args.wandb_project) os.environ.setdefault("WANDB_PROJECT", args.wandb_project)
@ -567,8 +623,8 @@ def main():
# 仅在 rank0 预初始化 W&B # 仅在 rank0 预初始化 W&B
is_rank0 = os.environ.get("RANK", "0") == "0" and os.environ.get("LOCAL_RANK", "-1") in ("0", "-1") is_rank0 = os.environ.get("RANK", "0") == "0" and os.environ.get("LOCAL_RANK", "-1") in ("0", "-1")
if is_rank0: if is_rank0:
import wandb
try: try:
import wandb
# 避免外部遗留的 RUN_ID 强制续跑导致卡住 # 避免外部遗留的 RUN_ID 强制续跑导致卡住
os.environ.pop("WANDB_RUN_ID", None) os.environ.pop("WANDB_RUN_ID", None)
@ -645,7 +701,15 @@ def main():
dbg(f"dist query error: {e}") dbg(f"dist query error: {e}")
# 1) 先补 tokenizer 的 pad # 1) 先补 tokenizer 的 pad
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True) # tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True)
try:
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True)
except Exception as e:
print(f"[warn] fast tokenizer unavailable ({e}); falling back to slow tokenizer.", flush=True)
tokenizer = AutoTokenizer.from_pretrained(
args.model_name_or_path, use_fast=False, trust_remote_code=True
)
if tokenizer.pad_token is None: if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token = tokenizer.eos_token
@ -683,6 +747,10 @@ def main():
return (major, minor) >= (8, 0) # Ampere 及以上 return (major, minor) >= (8, 0) # Ampere 及以上
use_bf16 = bool(args.bf16 and _bf16_supported()) use_bf16 = bool(args.bf16 and _bf16_supported())
if args.bf16 and not use_bf16:
print("[warn] bf16 not supported on this GPU; falling back to fp16/fp32.", flush=True)
dtype = (torch.bfloat16 if use_bf16 else dtype = (torch.bfloat16 if use_bf16 else
(torch.float16 if torch.cuda.is_available() else torch.float32)) (torch.float16 if torch.cuda.is_available() else torch.float32))
@ -771,9 +839,38 @@ def main():
assert bool((labs[attn == 0] == -100).all()), "[fatal] padded tokens must have label -100" assert bool((labs[attn == 0] == -100).all()), "[fatal] padded tokens must have label -100"
def endless_examples(files, base_seed: int, buf: int = 50000):
"""从本地 JSONL 反复流式读取并打乱,形成无限数据流。"""
epoch = 0
while True:
s = load_dataset("json", data_files={"train": files}, split="train", streaming=True)
s = s.shuffle(buffer_size=buf, seed=base_seed + epoch)
if hasattr(s, "set_epoch"):
s.set_epoch(epoch)
for ex in s:
yield ex
epoch += 1
# ====== 正式训练流(不做任何手动分片,交给 Accelerate/Trainer====== # ====== 正式训练流(不做任何手动分片,交给 Accelerate/Trainer======
ds_stream2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True).shuffle(buffer_size=50000, seed=args.seed) # ds_stream2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True)\
train_stream = QwenChatSFTDataset((ex for ex in ds_stream2), tokenizer, seq_len=args.seq_len) # .shuffle(buffer_size=50000, seed=args.seed)
ex_iter = endless_examples(files, args.seed, buf=50000)
# 先尝试 datasets 的无限流;没有就用我们自己的无限生成器
# try:
# ds_stream2 = ds_stream2.repeat() # ★ 若可用:官方无限流
# ex_iter = (ex for ex in ds_stream2) # ★ 统一用 ex_iter 作为上游
# except AttributeError:
# ex_iter = endless_examples(files, args.seed, buf=50000) # ★ 兜底:自制无限流
# 关键:这里一定要用 ex_iter而不是重新从 ds_stream2 取一次
train_stream = QwenChatSFTDataset(ex_iter, tokenizer, seq_len=args.seq_len)
# Safety: IterableDataset 需要明确的 max_steps
if (args.max_steps is None) or (args.max_steps <= 0):
raise ValueError("Detected streaming/IterableDataset. Please set --max_steps > 0.")
# ====== 一致性探针(不分片)====== # ====== 一致性探针(不分片)======
ds_stream_probe2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True) ds_stream_probe2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True)
@ -921,9 +1018,9 @@ def main():
lr_scheduler_type="cosine", lr_scheduler_type="cosine",
logging_steps=args.log_interval, logging_steps=args.log_interval,
save_steps=args.save_steps, save_steps=args.save_steps,
save_total_limit=2, save_total_limit=None,
deepspeed=(args.deepspeed if use_ds else None), deepspeed=(args.deepspeed if use_ds else None),
dataloader_drop_last=False, dataloader_drop_last=True,
dataloader_num_workers=0, dataloader_num_workers=0,
label_smoothing_factor=0.0, label_smoothing_factor=0.0,
per_device_eval_batch_size=args.per_device_eval_batch_size, per_device_eval_batch_size=args.per_device_eval_batch_size,
@ -948,6 +1045,21 @@ def main():
"fp16": (torch.cuda.is_available() and not use_bf16), "fp16": (torch.cuda.is_available() and not use_bf16),
}) })
ta_sig = inspect.signature(TrainingArguments.__init__).parameters
if "save_strategy" in ta_sig:
ta_kwargs2["save_strategy"] = "steps"
ta_kwargs2.update({
"load_best_model_at_end": args.load_best_model_at_end,
"metric_for_best_model": args.metric_for_best_model,
"greater_is_better": args.greater_is_better, # 对 eval_loss 保持 False默认
# "save_strategy": "steps", # 与 eval_steps 对齐
})
if args.early_stopping_patience > 0 and eval_dataset is None:
print("[warn] early_stopping_patience>0 但未提供 eval 数据集;早停将不会触发。", flush=True)
training_args = TrainingArguments(**ta_kwargs2) training_args = TrainingArguments(**ta_kwargs2)
trainer_kwargs = {} trainer_kwargs = {}
@ -966,6 +1078,13 @@ def main():
**trainer_kwargs, **trainer_kwargs,
) )
if args.early_stopping_patience and args.early_stopping_patience > 0:
trainer.add_callback(EarlyStoppingCallback(
early_stopping_patience=args.early_stopping_patience,
early_stopping_threshold=args.early_stopping_threshold
))
trainer.add_callback(CsvLossLogger(csv_path=os.path.join(args.output_dir, "loss.csv"))) trainer.add_callback(CsvLossLogger(csv_path=os.path.join(args.output_dir, "loss.csv")))

View File

@ -1,8 +1,8 @@
export WB_ENTITY=hailin export WB_ENTITY=hailin
export WANDB_BASE_URL=https://wandb.szaiai.com export WANDB_BASE_URL=https://wandb.szaiai.com
export WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1 export WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1
export WANDB_PROJECT=ds-qwen3-lora export WANDB_PROJECT=ds-qwen3
export WANDB_GROUP=q3-32b-ds4-2025-09-05 # 如果训练时没用 WANDB_RUN_GROUP这里只是“期望值” export WANDB_GROUP=q3-32b-ds4-2025-09-25 # 如果训练时没用 WANDB_RUN_GROUP这里只是“期望值”
export MATCH_NAME_REGEX='^q3-32b' # 回退方案:按名字匹配 export MATCH_NAME_REGEX='^q3-32b' # 回退方案:按名字匹配
python3 - <<'PY' python3 - <<'PY'