Compare commits
9 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
04b6a5c3a8 | |
|
|
a53178dacb | |
|
|
2d06f0ab90 | |
|
|
43111064cc | |
|
|
5a633d4b1c | |
|
|
12ec3aa1f4 | |
|
|
c89eda419d | |
|
|
4ff11c2ef7 | |
|
|
fd9971f1c8 |
|
|
@ -2,8 +2,8 @@ WANDB_BASE_URL=https://wandb.szaiai.com
|
|||
WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1
|
||||
WANDB_PROJECT=ds-qwen3
|
||||
WANDB_ENTITY=hailin
|
||||
WANDB_GROUP=q3-32b-ds4-2025-09-05
|
||||
WANDB_NAME=q3-32b-lr2e-5-train3
|
||||
WANDB_GROUP=q3-32b-ds4-2025-09-24
|
||||
WANDB_NAME=q3-32b-lr2e-5-train1
|
||||
WANDB_RESUME=allow
|
||||
WANDB_INIT_TIMEOUT=300
|
||||
WANDB_DIR=/tmp/$USER/wandb
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@
|
|||
set -euo pipefail
|
||||
|
||||
# ===== 可调参数 =====
|
||||
CKPT_ROOT="/home/test/checkpoints/q3-32b-lora" # 若实际是 .../checkpoint-62/global_step62,请把 CKPT_ROOT 改成 .../checkpoint-62
|
||||
TAG="global_step30"
|
||||
CKPT_ROOT="/home/test/checkpoints/q3-32b-ds4/checkpoint-60" # 若实际是 .../checkpoint-62/global_step62,请把 CKPT_ROOT 改成 .../checkpoint-62
|
||||
TAG="global_step60"
|
||||
HOSTS=(tn01 tn02 tn03 tn04 tn05 tn06)
|
||||
AGGREGATOR_HOST="tn06" # 本脚本运行/汇总所在机器
|
||||
EXPECTED_SHARDS_PER_HOST=4 # 每机应写出分片数(按你的并行布局)
|
||||
|
|
|
|||
29
mm-zero3.sh
29
mm-zero3.sh
|
|
@ -1,6 +1,13 @@
|
|||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# ==== 0) 日志文件 ====
|
||||
LOG_DIR="/home/test/logs"
|
||||
mkdir -p "$LOG_DIR"
|
||||
LOG_FILE="${LOG_DIR}/train_$(date +%F_%H%M%S)_$$.log"
|
||||
export PYTHONUNBUFFERED=1
|
||||
echo ">> logging to ${LOG_FILE}"
|
||||
|
||||
# 1) 可选:若含 API Key,保护权限
|
||||
[ -f .deepspeed_env ] && chmod 600 .deepspeed_env
|
||||
|
||||
|
|
@ -9,12 +16,6 @@ set -a
|
|||
. ./.deepspeed_env
|
||||
set +a
|
||||
|
||||
# unset PYTHONNOUSERSITE
|
||||
# USER_SITE=$(python3 -c 'import site;print(site.getusersitepackages())')
|
||||
# export PATH="$HOME/.local/bin:$PATH"
|
||||
# export PYTHONPATH="$USER_SITE:/home/test/jd_train:${PYTHONPATH:-}"
|
||||
|
||||
|
||||
# 统一环境(会被 deepspeed 的 ssh 继承到各节点)
|
||||
unset DS_BUILD_OPS DS_SKIP_CUDA_BUILD PYTHONNOUSERSITE
|
||||
export TORCH_EXTENSIONS_DIR=/tmp/$USER/torch_ext
|
||||
|
|
@ -25,6 +26,8 @@ export OMP_NUM_THREADS=8
|
|||
export MKL_NUM_THREADS=8
|
||||
export OPENBLAS_NUM_THREADS=8
|
||||
|
||||
# ==== 3) 运行并同步到文件 + 控制台(保留退出码)====
|
||||
set +e
|
||||
FORCE_COLOR=1 deepspeed --hostfile hostfile \
|
||||
--num_nodes 6 --num_gpus 4 \
|
||||
/home/test/jd_train/train_sft_ds.py \
|
||||
|
|
@ -35,7 +38,7 @@ FORCE_COLOR=1 deepspeed --hostfile hostfile \
|
|||
--per_device_train_batch_size 1 \
|
||||
--gradient_accumulation_steps 1 \
|
||||
--learning_rate 2e-5 --weight_decay 0.1 --warmup_ratio 0.02 \
|
||||
--max_steps 20 \
|
||||
--max_steps 300 \
|
||||
--log_interval 1 \
|
||||
--gradient_checkpointing \
|
||||
--bf16 \
|
||||
|
|
@ -43,5 +46,13 @@ FORCE_COLOR=1 deepspeed --hostfile hostfile \
|
|||
--report_to wandb \
|
||||
--wandb_project ds-qwen3 \
|
||||
--eval_steps 10 \
|
||||
--eval_data_glob "/home/test/datasets/my_corpus/test.jsonl"
|
||||
|
||||
--save_steps 10 \
|
||||
--load_best_model_at_end \
|
||||
--early_stopping_patience 5 \
|
||||
--early_stopping_threshold 0.0 \
|
||||
--metric_for_best_model eval_loss \
|
||||
--eval_data_glob "/home/test/datasets/my_corpus/test.jsonl" \
|
||||
2>&1 | tee -a "$LOG_FILE"
|
||||
DS_RC=${PIPESTATUS[0]}
|
||||
set -e
|
||||
exit "$DS_RC"
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
pdsh -R ssh -w tn[01-05] 'git -C /home/test/jd_train pull --ff-only'
|
||||
171
train_sft_ds.py
171
train_sft_ds.py
|
|
@ -27,7 +27,8 @@ from transformers import (
|
|||
)
|
||||
from transformers.trainer_callback import TrainerCallback
|
||||
from transformers.trainer_utils import get_last_checkpoint
|
||||
from torch.optim import AdamW as TorchAdamW
|
||||
# from torch.optim import AdamW as TorchAdamW
|
||||
from transformers import EarlyStoppingCallback
|
||||
|
||||
# ==== make sure CLI ninja/nvcc are reachable even in non-interactive ssh ====
|
||||
import site, shutil
|
||||
|
|
@ -72,23 +73,23 @@ if shutil.which("ninja") is None:
|
|||
os.environ["USE_NINJA"] = "0"
|
||||
print("[env] no CLI ninja on PATH -> USE_NINJA=0 fallback", flush=True)
|
||||
|
||||
# 4) 立即验证 ninja 与 CPUAdam 的 JIT(若这里失败,日志会第一时间告诉你是哪台/哪 rank 环境不对)
|
||||
try:
|
||||
from deepspeed.ops.op_builder import CPUAdamBuilder
|
||||
CPUAdamBuilder().load()
|
||||
print("[env] CPUAdamBuilder JIT OK", flush=True)
|
||||
except Exception as e:
|
||||
# ninja 可执行找不到时走兜底:禁用 ninja,用 setuptools 构建(首次会慢一点,但必过)
|
||||
if "Ninja is required to load C++ extensions" in str(e):
|
||||
os.environ["USE_NINJA"] = "0"
|
||||
print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True)
|
||||
from deepspeed.ops.op_builder import CPUAdamBuilder
|
||||
CPUAdamBuilder().load()
|
||||
print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True)
|
||||
else:
|
||||
import socket
|
||||
print(f"[env][host={socket.gethostname()} RANK={os.environ.get('RANK','?')}] PRE-JIT FAILED: {e}", flush=True)
|
||||
raise
|
||||
# # 4) 立即验证 ninja 与 CPUAdam 的 JIT(若这里失败,日志会第一时间告诉你是哪台/哪 rank 环境不对)
|
||||
# try:
|
||||
# from deepspeed.ops.op_builder import CPUAdamBuilder
|
||||
# CPUAdamBuilder().load()
|
||||
# print("[env] CPUAdamBuilder JIT OK", flush=True)
|
||||
# except Exception as e:
|
||||
# # ninja 可执行找不到时走兜底:禁用 ninja,用 setuptools 构建(首次会慢一点,但必过)
|
||||
# if "Ninja is required to load C++ extensions" in str(e):
|
||||
# os.environ["USE_NINJA"] = "0"
|
||||
# print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True)
|
||||
# from deepspeed.ops.op_builder import CPUAdamBuilder
|
||||
# CPUAdamBuilder().load()
|
||||
# print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True)
|
||||
# else:
|
||||
# import socket
|
||||
# print(f"[env][host={socket.gethostname()} RANK={os.environ.get('RANK','?')}] PRE-JIT FAILED: {e}", flush=True)
|
||||
# raise
|
||||
|
||||
|
||||
# ----------------- 进程工具 -----------------
|
||||
|
|
@ -126,7 +127,31 @@ class DebugTrainer(Trainer):
|
|||
return super().training_step(model, inputs)
|
||||
|
||||
# return super().training_step(model, inputs, num_items_in_batch)
|
||||
|
||||
|
||||
def get_train_dataloader(self):
|
||||
dl = super().get_train_dataloader()
|
||||
if getattr(self.args, "max_steps", 0) and self.args.max_steps > 0:
|
||||
RepeatingLoader = None
|
||||
try:
|
||||
from deepspeed.utils import RepeatingLoader
|
||||
except Exception:
|
||||
try:
|
||||
from deepspeed.runtime.dataloader import RepeatingLoader
|
||||
except Exception:
|
||||
RepeatingLoader = None
|
||||
|
||||
if RepeatingLoader is not None:
|
||||
return RepeatingLoader(dl)
|
||||
else:
|
||||
# 纯 Python 兜底
|
||||
def _infinite(loader):
|
||||
while True:
|
||||
for batch in loader:
|
||||
yield batch
|
||||
return _infinite(dl)
|
||||
return dl
|
||||
|
||||
|
||||
# ----------------- 日志回调 -----------------
|
||||
class CsvLossLogger(TrainerCallback):
|
||||
def __init__(self, csv_path: str):
|
||||
|
|
@ -177,7 +202,7 @@ class CsvLossLogger(TrainerCallback):
|
|||
f"{cur},{logs.get('loss','')},{logs.get('learning_rate','')},{logs.get('total_flos','')}\n"
|
||||
)
|
||||
|
||||
from typing import List, Tuple, Iterable, Iterator, Dict
|
||||
# from typing import List, Tuple, Iterable, Iterator, Dict
|
||||
|
||||
# ----------------- 仅监督 assistant 内容(token-id 级,不用 offsets) -----------------
|
||||
class QwenChatSFTDataset(IterableDataset):
|
||||
|
|
@ -444,6 +469,17 @@ def parse_args():
|
|||
ap.add_argument("--eval_steps", type=int, default=10,
|
||||
help="Evaluate every N optimizer steps when eval_dataset is provided")
|
||||
|
||||
ap.add_argument("--load_best_model_at_end", action="store_true",
|
||||
help="训练结束时自动加载最优 checkpoint")
|
||||
ap.add_argument("--metric_for_best_model", type=str, default="eval_loss",
|
||||
help="用哪个指标选最优,默认 eval_loss")
|
||||
ap.add_argument("--greater_is_better", action="store_true",
|
||||
help="是否指标越大越好;eval_loss 用 False(默认不传即可)")
|
||||
ap.add_argument("--early_stopping_patience", type=int, default=0,
|
||||
help=">0 启用早停;单位是 eval 轮次数(非 step 数)")
|
||||
ap.add_argument("--early_stopping_threshold", type=float, default=0.0,
|
||||
help="改进阈值,0 表示严格变好才算改进")
|
||||
|
||||
return ap.parse_args()
|
||||
|
||||
|
||||
|
|
@ -560,6 +596,26 @@ def main():
|
|||
dbg(f"HfDeepSpeedConfig loaded from {src}")
|
||||
|
||||
|
||||
# ---- DeepSpeed JIT 预检测:仅在启用 DS 时检查 CPUAdam(更稳)
|
||||
if use_ds:
|
||||
try:
|
||||
from deepspeed.ops.op_builder import CPUAdamBuilder
|
||||
try:
|
||||
CPUAdamBuilder().load()
|
||||
print("[env] CPUAdamBuilder JIT OK", flush=True)
|
||||
except Exception as e:
|
||||
if "Ninja is required to load C++ extensions" in str(e):
|
||||
os.environ["USE_NINJA"] = "0"
|
||||
print("[env] no CLI ninja, retry with USE_NINJA=0 (fallback build)", flush=True)
|
||||
CPUAdamBuilder().load()
|
||||
print("[env] CPUAdamBuilder JIT OK (fallback)", flush=True)
|
||||
else:
|
||||
print(f"[env] CPUAdamBuilder pre-JIT failed: {e}", flush=True)
|
||||
raise
|
||||
except ImportError:
|
||||
print("[env] DeepSpeed not installed; skip CPUAdam pre-JIT", flush=True)
|
||||
|
||||
|
||||
if args.report_to == "wandb":
|
||||
os.environ.setdefault("WANDB_PROJECT", args.wandb_project)
|
||||
|
||||
|
|
@ -567,8 +623,8 @@ def main():
|
|||
# 仅在 rank0 预初始化 W&B
|
||||
is_rank0 = os.environ.get("RANK", "0") == "0" and os.environ.get("LOCAL_RANK", "-1") in ("0", "-1")
|
||||
if is_rank0:
|
||||
import wandb
|
||||
try:
|
||||
import wandb
|
||||
# 避免外部遗留的 RUN_ID 强制续跑导致卡住
|
||||
os.environ.pop("WANDB_RUN_ID", None)
|
||||
|
||||
|
|
@ -645,7 +701,15 @@ def main():
|
|||
dbg(f"dist query error: {e}")
|
||||
|
||||
# 1) 先补 tokenizer 的 pad
|
||||
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True)
|
||||
# tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True)
|
||||
try:
|
||||
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=True, trust_remote_code=True)
|
||||
except Exception as e:
|
||||
print(f"[warn] fast tokenizer unavailable ({e}); falling back to slow tokenizer.", flush=True)
|
||||
tokenizer = AutoTokenizer.from_pretrained(
|
||||
args.model_name_or_path, use_fast=False, trust_remote_code=True
|
||||
)
|
||||
|
||||
if tokenizer.pad_token is None:
|
||||
tokenizer.pad_token = tokenizer.eos_token
|
||||
|
||||
|
|
@ -683,6 +747,10 @@ def main():
|
|||
return (major, minor) >= (8, 0) # Ampere 及以上
|
||||
|
||||
use_bf16 = bool(args.bf16 and _bf16_supported())
|
||||
|
||||
if args.bf16 and not use_bf16:
|
||||
print("[warn] bf16 not supported on this GPU; falling back to fp16/fp32.", flush=True)
|
||||
|
||||
dtype = (torch.bfloat16 if use_bf16 else
|
||||
(torch.float16 if torch.cuda.is_available() else torch.float32))
|
||||
|
||||
|
|
@ -771,9 +839,38 @@ def main():
|
|||
assert bool((labs[attn == 0] == -100).all()), "[fatal] padded tokens must have label -100"
|
||||
|
||||
|
||||
def endless_examples(files, base_seed: int, buf: int = 50000):
|
||||
"""从本地 JSONL 反复流式读取并打乱,形成无限数据流。"""
|
||||
epoch = 0
|
||||
while True:
|
||||
s = load_dataset("json", data_files={"train": files}, split="train", streaming=True)
|
||||
s = s.shuffle(buffer_size=buf, seed=base_seed + epoch)
|
||||
if hasattr(s, "set_epoch"):
|
||||
s.set_epoch(epoch)
|
||||
for ex in s:
|
||||
yield ex
|
||||
epoch += 1
|
||||
|
||||
|
||||
# ====== 正式训练流(不做任何手动分片,交给 Accelerate/Trainer)======
|
||||
ds_stream2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True).shuffle(buffer_size=50000, seed=args.seed)
|
||||
train_stream = QwenChatSFTDataset((ex for ex in ds_stream2), tokenizer, seq_len=args.seq_len)
|
||||
# ds_stream2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True)\
|
||||
# .shuffle(buffer_size=50000, seed=args.seed)
|
||||
|
||||
ex_iter = endless_examples(files, args.seed, buf=50000)
|
||||
|
||||
# 先尝试 datasets 的无限流;没有就用我们自己的无限生成器
|
||||
# try:
|
||||
# ds_stream2 = ds_stream2.repeat() # ★ 若可用:官方无限流
|
||||
# ex_iter = (ex for ex in ds_stream2) # ★ 统一用 ex_iter 作为上游
|
||||
# except AttributeError:
|
||||
# ex_iter = endless_examples(files, args.seed, buf=50000) # ★ 兜底:自制无限流
|
||||
|
||||
# 关键:这里一定要用 ex_iter,而不是重新从 ds_stream2 取一次
|
||||
train_stream = QwenChatSFTDataset(ex_iter, tokenizer, seq_len=args.seq_len)
|
||||
|
||||
# Safety: IterableDataset 需要明确的 max_steps
|
||||
if (args.max_steps is None) or (args.max_steps <= 0):
|
||||
raise ValueError("Detected streaming/IterableDataset. Please set --max_steps > 0.")
|
||||
|
||||
# ====== 一致性探针(不分片)======
|
||||
ds_stream_probe2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True)
|
||||
|
|
@ -921,9 +1018,9 @@ def main():
|
|||
lr_scheduler_type="cosine",
|
||||
logging_steps=args.log_interval,
|
||||
save_steps=args.save_steps,
|
||||
save_total_limit=2,
|
||||
save_total_limit=None,
|
||||
deepspeed=(args.deepspeed if use_ds else None),
|
||||
dataloader_drop_last=False,
|
||||
dataloader_drop_last=True,
|
||||
dataloader_num_workers=0,
|
||||
label_smoothing_factor=0.0,
|
||||
per_device_eval_batch_size=args.per_device_eval_batch_size,
|
||||
|
|
@ -948,6 +1045,21 @@ def main():
|
|||
"fp16": (torch.cuda.is_available() and not use_bf16),
|
||||
})
|
||||
|
||||
ta_sig = inspect.signature(TrainingArguments.__init__).parameters
|
||||
if "save_strategy" in ta_sig:
|
||||
ta_kwargs2["save_strategy"] = "steps"
|
||||
|
||||
ta_kwargs2.update({
|
||||
"load_best_model_at_end": args.load_best_model_at_end,
|
||||
"metric_for_best_model": args.metric_for_best_model,
|
||||
"greater_is_better": args.greater_is_better, # 对 eval_loss 保持 False(默认)
|
||||
# "save_strategy": "steps", # 与 eval_steps 对齐
|
||||
})
|
||||
|
||||
|
||||
if args.early_stopping_patience > 0 and eval_dataset is None:
|
||||
print("[warn] early_stopping_patience>0 但未提供 eval 数据集;早停将不会触发。", flush=True)
|
||||
|
||||
training_args = TrainingArguments(**ta_kwargs2)
|
||||
|
||||
trainer_kwargs = {}
|
||||
|
|
@ -966,6 +1078,13 @@ def main():
|
|||
**trainer_kwargs,
|
||||
)
|
||||
|
||||
|
||||
if args.early_stopping_patience and args.early_stopping_patience > 0:
|
||||
trainer.add_callback(EarlyStoppingCallback(
|
||||
early_stopping_patience=args.early_stopping_patience,
|
||||
early_stopping_threshold=args.early_stopping_threshold
|
||||
))
|
||||
|
||||
trainer.add_callback(CsvLossLogger(csv_path=os.path.join(args.output_dir, "loss.csv")))
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
export WB_ENTITY=hailin
|
||||
export WANDB_BASE_URL=https://wandb.szaiai.com
|
||||
export WANDB_API_KEY=local-701636f51b4741d3862007df5cf7f12cca53d8d1
|
||||
export WANDB_PROJECT=ds-qwen3-lora
|
||||
export WANDB_GROUP=q3-32b-ds4-2025-09-05 # 如果训练时没用 WANDB_RUN_GROUP,这里只是“期望值”
|
||||
export WANDB_PROJECT=ds-qwen3
|
||||
export WANDB_GROUP=q3-32b-ds4-2025-09-25 # 如果训练时没用 WANDB_RUN_GROUP,这里只是“期望值”
|
||||
export MATCH_NAME_REGEX='^q3-32b' # 回退方案:按名字匹配
|
||||
|
||||
python3 - <<'PY'
|
||||
|
|
|
|||
Loading…
Reference in New Issue