This commit is contained in:
hailin 2025-09-05 19:29:29 +08:00
parent 18e929207a
commit 3309c45a7b
1 changed files with 80 additions and 35 deletions

View File

@ -19,7 +19,8 @@ EXPECTED_TOTAL_SHARDS=$(( EXPECTED_SHARDS_PER_HOST * ${#HOSTS[@]} ))
STAGING_BASE="${CKPT_ROOT}/_staging" STAGING_BASE="${CKPT_ROOT}/_staging"
STAGING_TAG_DIR="${STAGING_BASE}/${TAG}" STAGING_TAG_DIR="${STAGING_BASE}/${TAG}"
OUT_DIR="${CKPT_ROOT}/merged-${TAG}" OUT_DIR="${CKPT_ROOT}/merged-${TAG}"
export OUT_DIR TMP_PT_DIR="${CKPT_ROOT}/_tmp-fp32-pt-${TAG}" # 临时 FP32pytorch_model.bin目录
export OUT_DIR TMP_PT_DIR MAX_SHARD_SIZE
# ================================= # =================================
echo "== 预检查 SSH ==" echo "== 预检查 SSH =="
@ -27,7 +28,7 @@ for h in "${HOSTS[@]}"; do
ssh ${SSH_OPTS} "$h" "true" >/dev/null || { echo "!! 无法免密 SSH 到 $h"; exit 1; } ssh ${SSH_OPTS} "$h" "true" >/dev/null || { echo "!! 无法免密 SSH 到 $h"; exit 1; }
done done
echo "== 0/5 逐节点分片预检(统计各机 ${CKPT_ROOT}/${TAG} 下的 *model_states.pt==" echo "== 0/7 逐节点分片预检(统计各机 ${CKPT_ROOT}/${TAG} 下的 *model_states.pt=="
remote_total=0 remote_total=0
agg_cnt=0 agg_cnt=0
for h in "${HOSTS[@]}"; do for h in "${HOSTS[@]}"; do
@ -63,11 +64,11 @@ if [[ "${STRICT_PRECHECK}" == "true" && "${precheck_ok}" == "false" ]]; then
fi fi
[[ "${precheck_ok}" == "true" ]] && echo "OK: 预检通过(远端=${remote_total}、本机=${agg_cnt},总计期望=${EXPECTED_TOTAL_SHARDS}" || echo "WARN: 预检未通过(分片数量与期望不符),已启用宽松模式,继续执行..." [[ "${precheck_ok}" == "true" ]] && echo "OK: 预检通过(远端=${remote_total}、本机=${agg_cnt},总计期望=${EXPECTED_TOTAL_SHARDS}" || echo "WARN: 预检未通过(分片数量与期望不符),已启用宽松模式,继续执行..."
echo "== 1/5 准备 staging 目录(干净环境)==" echo "== 1/7 准备 staging 目录(干净环境)=="
rm -rf "${STAGING_TAG_DIR}" rm -rf "${STAGING_TAG_DIR}"
mkdir -p "${STAGING_TAG_DIR}" mkdir -p "${STAGING_TAG_DIR}"
echo "== 2/5 收集分片到 staging ==" echo "== 2/7 收集分片到 staging =="
for h in "${HOSTS[@]}"; do for h in "${HOSTS[@]}"; do
if ssh ${SSH_OPTS} "$h" "test -d '${CKPT_ROOT}/${TAG}'"; then if ssh ${SSH_OPTS} "$h" "test -d '${CKPT_ROOT}/${TAG}'"; then
echo " - 收集 ${h}:${CKPT_ROOT}/${TAG}/ -> ${STAGING_TAG_DIR}/" echo " - 收集 ${h}:${CKPT_ROOT}/${TAG}/ -> ${STAGING_TAG_DIR}/"
@ -78,7 +79,7 @@ for h in "${HOSTS[@]}"; do
fi fi
done done
echo "== 3/5 在 staging 校验总分片数(应为 ${EXPECTED_TOTAL_SHARDS}==" echo "== 3/7 在 staging 校验总分片数(应为 ${EXPECTED_TOTAL_SHARDS}=="
mapfile -t SHARDS < <(find "${STAGING_TAG_DIR}" -maxdepth 1 -type f -name "*model_states.pt" | sort -u) mapfile -t SHARDS < <(find "${STAGING_TAG_DIR}" -maxdepth 1 -type f -name "*model_states.pt" | sort -u)
CNT=${#SHARDS[@]} CNT=${#SHARDS[@]}
echo " - staging 中发现分片数:${CNT}" echo " - staging 中发现分片数:${CNT}"
@ -87,44 +88,79 @@ if (( CNT != EXPECTED_TOTAL_SHARDS )); then
exit 3 exit 3
fi fi
echo "== 4/5 合并为 safetensors 到:${OUT_DIR} ==" echo "== 4/7 合并分片 -> 临时 FP32PyTorch .bin避免共享权重导致 safetensors 报错 =="
mkdir -p "${OUT_DIR}" rm -rf "${TMP_PT_DIR}"
mkdir -p "${TMP_PT_DIR}"
# 探测 zero_to_fp32.py 是否支持新参数;不支持就用 API # 直接走 APIsafe_serialization=False -> 生成 pytorch_model.binFP32
USE_Z2FP32_SCRIPT=false python - <<PY
if [[ -f "${CKPT_ROOT}/zero_to_fp32.py" ]]; then
if python "${CKPT_ROOT}/zero_to_fp32.py" --help 2>&1 | grep -q -- "--safe_serialization"; then
USE_Z2FP32_SCRIPT=true
fi
fi
if $USE_Z2FP32_SCRIPT; then
python "${CKPT_ROOT}/zero_to_fp32.py" \
"${STAGING_BASE}" \
"${OUT_DIR}" \
--tag "${TAG}" \
--safe_serialization \
--max_shard_size "${MAX_SHARD_SIZE}"
else
python - <<PY
from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict
convert_zero_checkpoint_to_fp32_state_dict( convert_zero_checkpoint_to_fp32_state_dict(
checkpoint_dir=r"${STAGING_BASE}", checkpoint_dir=r"${STAGING_BASE}",
output_dir=r"${OUT_DIR}", output_dir=r"${TMP_PT_DIR}",
tag=r"${TAG}", tag=r"${TAG}",
safe_serialization=True, safe_serialization=False, # 关键:先落成 .binFP32绕开共享权重的 safetensors 限制
max_shard_size=r"${MAX_SHARD_SIZE}",
) )
print("合并完成", r"${OUT_DIR}") print("合并完成FP32 .bin", r"${TMP_PT_DIR}")
PY PY
fi
echo "== 4.1 拷贝 config/tokenizer 工件(如存在==" echo "== 4.1/7 将 config/tokenizer 工件拷贝到临时 FP32 目录(装载需要)=="
for f in config.json generation_config.json tokenizer_config.json tokenizer.json merges.txt vocab.json special_tokens_map.json added_tokens.json; do for f in config.json generation_config.json tokenizer_config.json tokenizer.json merges.txt vocab.json special_tokens_map.json added_tokens.json; do
[[ -f "${CKPT_ROOT}/${f}" ]] && cp -n "${CKPT_ROOT}/${f}" "${OUT_DIR}/" [[ -f "${CKPT_ROOT}/${f}" ]] && cp -n "${CKPT_ROOT}/${f}" "${TMP_PT_DIR}/" || true
done done
echo "== 5/5 自检(索引与 config==" echo "== 5/7 FP32 -> BF16并解开 lm_head <-> embed_tokens 共享存储,保存为分片 safetensors${MAX_SHARD_SIZE}=="
python - <<'PY'
import os, sys, torch
from transformers import AutoConfig, AutoModelForCausalLM
TMP_PT_DIR = os.environ["TMP_PT_DIR"]
OUT_DIR = os.environ["OUT_DIR"]
MAX_SHARD_SIZE = os.environ.get("MAX_SHARD_SIZE", "5GB")
print("[load] from:", TMP_PT_DIR)
cfg = AutoConfig.from_pretrained(TMP_PT_DIR, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
TMP_PT_DIR,
config=cfg,
trust_remote_code=True,
torch_dtype=torch.bfloat16, # 目标 BF16
low_cpu_mem_usage=True,
device_map={"": "cpu"}, # 全在 CPU 装载,避免吃显存
)
# —— 如 lm_head 与 embed_tokens 权重共享,则手动 untie防止后续 safetensors 报共享存储 —— #
try:
emb = model.get_input_embeddings().weight if hasattr(model, "get_input_embeddings") else None
head = model.lm_head.weight if hasattr(model, "lm_head") else None
if emb is not None and head is not None and emb.data_ptr() == head.data_ptr():
with torch.no_grad():
model.lm_head.weight = torch.nn.Parameter(head.detach().clone())
print("[fix] Untied shared weights: lm_head.weight cloned from embed_tokens.weight")
else:
print("[fix] No shared storage detected between lm_head and embed_tokens")
except Exception as e:
print("[fix] Skip untie check:", e, file=sys.stderr)
# 再确保全模型 dtype 为 BF16
model.to(dtype=torch.bfloat16)
# 分片 safetensors支持大模型
os.makedirs(OUT_DIR, exist_ok=True)
model.save_pretrained(
OUT_DIR,
safe_serialization=True, # 写 safetensors
max_shard_size=MAX_SHARD_SIZE, # 分片上限
)
print("[save] BF16 safetensors saved to:", OUT_DIR)
PY
echo "== 5.1/7 拷贝(/补齐)最终目录的 tokenizer 工件(如存在)=="
for f in tokenizer_config.json tokenizer.json merges.txt vocab.json special_tokens_map.json added_tokens.json; do
[[ -f "${CKPT_ROOT}/${f}" ]] && cp -n "${CKPT_ROOT}/${f}" "${OUT_DIR}/" || true
done
echo "== 6/7 自检(索引与 config=="
python - <<'PY' python - <<'PY'
import os, json, sys import os, json, sys
out_dir = os.environ.get("OUT_DIR") out_dir = os.environ.get("OUT_DIR")
@ -133,13 +169,22 @@ if os.path.exists(idx):
with open(idx) as f: j = json.load(f) with open(idx) as f: j = json.load(f)
print(f"OK: 找到 safetensors 索引:{idx}(参数条目 {len(j.get('weight_map', {}))}") print(f"OK: 找到 safetensors 索引:{idx}(参数条目 {len(j.get('weight_map', {}))}")
else: else:
print("NOTE: 未找到 model.safetensors.index.json可能是单分片") # 单分片也可能没有 index.json
sfts = [x for x in os.listdir(out_dir) if x.endswith(".safetensors")]
if len(sfts) == 1:
print(f"NOTE: 单分片 safetensors{sfts[0]}")
else:
print("WARN: 未找到 model.safetensors.index.json且分片数 != 1", file=sys.stderr)
try: try:
from transformers import AutoConfig from transformers import AutoConfig
cfg = AutoConfig.from_pretrained(out_dir) cfg = AutoConfig.from_pretrained(out_dir, trust_remote_code=True)
print("OK: 读取到 config", cfg.model_type, "hidden:", getattr(cfg,'hidden_size',None), "layers:", getattr(cfg,'num_hidden_layers',None)) print("OK: 读取到 config", cfg.model_type, "hidden:", getattr(cfg,'hidden_size',None), "layers:", getattr(cfg,'num_hidden_layers',None))
except Exception as e: except Exception as e:
print("WARN: 读取 config 失败(若无 config.json 可忽略):", e, file=sys.stderr) print("WARN: 读取 config 失败(若无 config.json 可忽略):", e, file=sys.stderr)
PY PY
echo "== 完成:${OUT_DIR} ==" echo "== 7/7 清理提示 =="
echo "临时 FP32 目录:${TMP_PT_DIR}"
echo "BF16 safetensors 输出:${OUT_DIR}"
echo "完成。"