diff --git a/run_ds.sh b/run_ds.sh index 800d6b3..44aff89 100644 --- a/run_ds.sh +++ b/run_ds.sh @@ -1,6 +1,9 @@ #!/usr/bin/env bash set -euo pipefail +# 可选:调试输出 +# [ "${DEBUG:-0}" = "1" ] && set -x + export NCCL_DEBUG=INFO # 如走 IB/RoCE,请按实际网卡开启: # export NCCL_IB_HCA="mlx5_0,mlx5_1" @@ -8,10 +11,14 @@ export NCCL_DEBUG=INFO # 纯以太: # export NCCL_SOCKET_IFNAME="eth0" +# 解析脚本目录,避免相对路径问题 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DS_CONFIG="${DS_CONFIG:-$SCRIPT_DIR/ds_config_zero3.json}" + # ==== 超参数(本地路径;可用 VAR=xxx ./launch_ds.sh 覆写)==== MODEL_NAME_OR_PATH="${MODEL_NAME_OR_PATH:-/home/test/Qwen3-8B}" -DATA_GLOB="${DATA_GLOB:-/data/datasets/my_corpus/*.jsonl}" # 每台机器都放相同路径 -OUTDIR="${OUTDIR:-/data/checkpoints/run-qwen3-8b}" # 每台机器各自本地输出 +DATA_GLOB="${DATA_GLOB:-$HOME/datasets/my_corpus/*.jsonl}" # 每台机器都放相同路径 +OUTDIR="${OUTDIR:-$HOME/checkpoints/run-qwen3-8b}" # 每台机器各自本地输出 SEQ_LEN="${SEQ_LEN:-4096}" LR="${LR:-2e-5}" GAS="${GAS:-64}" @@ -19,10 +26,21 @@ LOG_STEPS="${LOG_STEPS:-10}" SAVE_STEPS="${SAVE_STEPS:-500}" MAX_STEPS="${MAX_STEPS:-10000}" +# 轻量校验(只在发起节点做;远端各 rank 会在各自脚本里也 mkdir) +[ -d "$MODEL_NAME_OR_PATH" ] || { echo "ERR: model not found at $MODEL_NAME_OR_PATH"; exit 1; } +[ -f "$DS_CONFIG" ] || { echo "ERR: deepspeed config not found at $DS_CONFIG"; exit 1; } +# 数据是通配符,这里只做一次“至少匹配到 1 个文件”的检查(发起节点);各节点需自行确保相同路径存在 +shopt -s nullglob +matches=( $DATA_GLOB ) +if [ ${#matches[@]} -eq 0 ]; then + echo "WARN: no files matched by DATA_GLOB=$DATA_GLOB on this node (确保每台机器该路径下有数据)" +fi +shopt -u nullglob + mkdir -p "${OUTDIR}" # ==== 多机 DeepSpeed ==== -deepspeed --hostfile hostfile train_sft_ds.py \ +deepspeed --hostfile "$SCRIPT_DIR/hostfile" "$SCRIPT_DIR/train_sft_ds.py" \ --model_name_or_path "${MODEL_NAME_OR_PATH}" \ --data_glob "${DATA_GLOB}" \ --output_dir "${OUTDIR}" \ @@ -35,6 +53,6 @@ deepspeed --hostfile hostfile train_sft_ds.py \ --max_steps "${MAX_STEPS}" \ --log_interval "${LOG_STEPS}" \ --save_steps "${SAVE_STEPS}" \ - --deepspeed ds_config_zero3.json \ + --deepspeed "${DS_CONFIG}" \ --gradient_checkpointing \ --bf16 diff --git a/train_sft_ds.py b/train_sft_ds.py index dc65a4e..79700b2 100644 --- a/train_sft_ds.py +++ b/train_sft_ds.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os import glob +import socket import argparse from typing import Dict, List, Iterable, Iterator @@ -131,11 +132,21 @@ def main(): if args.gradient_checkpointing: model.gradient_checkpointing_enable() - # 本地数据(每台机器同路径各自读取) + # ===== 数据鲁棒性检查(多机各自执行)===== + host = socket.gethostname() + rank = int(os.environ.get("RANK", "0")) + files = sorted(glob.glob(args.data_glob)) if len(files) == 0: - raise FileNotFoundError(f"No files match: {args.data_glob}") + raise FileNotFoundError( + f"[host={host} rank={rank}] No files matched DATA_GLOB={args.data_glob}\n" + "每台机器都必须在相同本地路径下放置数据;" + "可通过 DATA_GLOB= ./launch_ds.sh 覆写。" + ) + if is_main_process(): + print(f"[data] matched {len(files)} files on host={host}, example[0]={files[0]}", flush=True) + # streaming 逐行读取,字段名为 'text' dataset_iter = load_dataset( "json", data_files={"train": files}, @@ -149,38 +160,59 @@ def main(): if isinstance(txt, str) and len(txt.strip()) > 0: yield txt - train_stream = ConstantLengthDataset(texts_iter=text_iter(), tokenizer=tokenizer, seq_len=args.seq_len) + # 先构造一次流,做“非空探针” + train_stream_probe = ConstantLengthDataset(texts_iter=text_iter(), tokenizer=tokenizer, seq_len=args.seq_len) + _probe = iter(train_stream_probe) + try: + _ = next(_probe) # 拉一个 chunk,确保真的能产出训练样本 + except StopIteration: + raise RuntimeError( + f"[host={host} rank={rank}] 数据文件匹配到了,但没有产生任何可训练样本。\n" + "常见原因:jsonl 缺少 'text' 字段、内容全为空/空白行、或 --seq_len 过大。\n" + "请检查样例行,或将 --seq_len 调小后再试。" + ) + # 探针消耗了流,重新构造一次“干净”的训练流 + dataset_iter2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True) + def text_iter2(): + for ex in dataset_iter2: + txt = ex.get("text", None) + if isinstance(txt, str) and len(txt.strip()) > 0: + yield txt + train_stream = ConstantLengthDataset(texts_iter=text_iter2(), tokenizer=tokenizer, seq_len=args.seq_len) + + # 可选 eval(从头部抽样) eval_dataset = None if args.eval_ratio and args.eval_ratio > 0: - # 简易 eval:从训练流头部抽若干 batch(可按需关闭) desired_eval_batches = 200 gen = iter(train_stream) eval_samples = [] for _ in range(desired_eval_batches): - try: eval_samples.append(next(gen)) - except StopIteration: break + try: + eval_samples.append(next(gen)) + except StopIteration: + break class ListDataset(torch.utils.data.Dataset): def __init__(self, items): self.items = items def __len__(self): return len(self.items) def __getitem__(self, idx): return self.items[idx] eval_dataset = ListDataset(eval_samples) - # 重新构造训练流 - dataset_iter2 = load_dataset("json", data_files={"train": files}, split="train", streaming=True) - def text_iter2(): - for ex in dataset_iter2: + + # 抽样后再重建训练流,防止“吃掉”头部 + dataset_iter3 = load_dataset("json", data_files={"train": files}, split="train", streaming=True) + def text_iter3(): + for ex in dataset_iter3: txt = ex.get("text", None) if isinstance(txt, str) and len(txt.strip()) > 0: yield txt - train_stream = ConstantLengthDataset(texts_iter=text_iter2(), tokenizer=tokenizer, seq_len=args.seq_len) + train_stream = ConstantLengthDataset(texts_iter=text_iter3(), tokenizer=tokenizer, seq_len=args.seq_len) data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) os.makedirs(args.output_dir, exist_ok=True) logging_dir = os.path.join(args.output_dir, "logs") - # 关键点:无共享盘时,HF/DS 的分片 checkpoint 会在每个 rank 的本地 output_dir 下各自写入 - # 训练恢复时,各 rank 从各自本地的相同路径读取自己的分片即可(保持相同 world_size 即可) + # 无共享盘:各 rank 在各自本地 output_dir 下写入自己的分片 training_args = TrainingArguments( output_dir=args.output_dir, logging_dir=logging_dir, @@ -208,7 +240,7 @@ def main(): gradient_checkpointing=args.gradient_checkpointing, remove_unused_columns=False, torch_compile=False, - save_on_each_node=False # 保持默认;DeepSpeed 的分片 checkpoint 本就会各 rank 本地写 + save_on_each_node=False ) trainer = Trainer( @@ -226,10 +258,10 @@ def main(): and any(n.startswith("checkpoint-") for n in os.listdir(args.output_dir))) resume_flag = True if ckpt_exists else None - print_once(f"Resume = {resume_flag is True}") + print_once(f"[host={host}] Resume = {resume_flag is True}") print_once("***** Starting training *****") train_result = trainer.train(resume_from_checkpoint=resume_flag) - trainer.save_model() # 配合 DS 配置中的 gather,会在全局 rank0 聚合保存 16-bit 整模型 + trainer.save_model() # 配合 DS 配置 stage3_gather_16bit_weights_on_model_save=true,仅在全局 rank0 聚合保存整模型 metrics = train_result.metrics trainer.log_metrics("train", metrics)