diff --git a/merge_zero3_safetensors.sh b/merge_zero3_safetensors.sh index 0b5b74e..7aa88f5 100755 --- a/merge_zero3_safetensors.sh +++ b/merge_zero3_safetensors.sh @@ -19,7 +19,8 @@ EXPECTED_TOTAL_SHARDS=$(( EXPECTED_SHARDS_PER_HOST * ${#HOSTS[@]} )) STAGING_BASE="${CKPT_ROOT}/_staging" STAGING_TAG_DIR="${STAGING_BASE}/${TAG}" OUT_DIR="${CKPT_ROOT}/merged-${TAG}" -export OUT_DIR +TMP_PT_DIR="${CKPT_ROOT}/_tmp-fp32-pt-${TAG}" # 临时 FP32(pytorch_model.bin)目录 +export OUT_DIR TMP_PT_DIR MAX_SHARD_SIZE # ================================= echo "== 预检查 SSH ==" @@ -27,7 +28,7 @@ for h in "${HOSTS[@]}"; do ssh ${SSH_OPTS} "$h" "true" >/dev/null || { echo "!! 无法免密 SSH 到 $h"; exit 1; } done -echo "== 0/5 逐节点分片预检(统计各机 ${CKPT_ROOT}/${TAG} 下的 *model_states.pt)==" +echo "== 0/7 逐节点分片预检(统计各机 ${CKPT_ROOT}/${TAG} 下的 *model_states.pt)==" remote_total=0 agg_cnt=0 for h in "${HOSTS[@]}"; do @@ -63,11 +64,11 @@ if [[ "${STRICT_PRECHECK}" == "true" && "${precheck_ok}" == "false" ]]; then fi [[ "${precheck_ok}" == "true" ]] && echo "OK: 预检通过(远端=${remote_total}、本机=${agg_cnt},总计期望=${EXPECTED_TOTAL_SHARDS})" || echo "WARN: 预检未通过(分片数量与期望不符),已启用宽松模式,继续执行..." -echo "== 1/5 准备 staging 目录(干净环境)==" +echo "== 1/7 准备 staging 目录(干净环境)==" rm -rf "${STAGING_TAG_DIR}" mkdir -p "${STAGING_TAG_DIR}" -echo "== 2/5 收集分片到 staging ==" +echo "== 2/7 收集分片到 staging ==" for h in "${HOSTS[@]}"; do if ssh ${SSH_OPTS} "$h" "test -d '${CKPT_ROOT}/${TAG}'"; then echo " - 收集 ${h}:${CKPT_ROOT}/${TAG}/ -> ${STAGING_TAG_DIR}/" @@ -78,7 +79,7 @@ for h in "${HOSTS[@]}"; do fi done -echo "== 3/5 在 staging 校验总分片数(应为 ${EXPECTED_TOTAL_SHARDS})==" +echo "== 3/7 在 staging 校验总分片数(应为 ${EXPECTED_TOTAL_SHARDS})==" mapfile -t SHARDS < <(find "${STAGING_TAG_DIR}" -maxdepth 1 -type f -name "*model_states.pt" | sort -u) CNT=${#SHARDS[@]} echo " - staging 中发现分片数:${CNT}" @@ -87,44 +88,79 @@ if (( CNT != EXPECTED_TOTAL_SHARDS )); then exit 3 fi -echo "== 4/5 合并为 safetensors 到:${OUT_DIR} ==" -mkdir -p "${OUT_DIR}" +echo "== 4/7 合并分片 -> 临时 FP32(PyTorch .bin),避免共享权重导致 safetensors 报错 ==" +rm -rf "${TMP_PT_DIR}" +mkdir -p "${TMP_PT_DIR}" -# 探测 zero_to_fp32.py 是否支持新参数;不支持就用 API -USE_Z2FP32_SCRIPT=false -if [[ -f "${CKPT_ROOT}/zero_to_fp32.py" ]]; then - if python "${CKPT_ROOT}/zero_to_fp32.py" --help 2>&1 | grep -q -- "--safe_serialization"; then - USE_Z2FP32_SCRIPT=true - fi -fi - -if $USE_Z2FP32_SCRIPT; then - python "${CKPT_ROOT}/zero_to_fp32.py" \ - "${STAGING_BASE}" \ - "${OUT_DIR}" \ - --tag "${TAG}" \ - --safe_serialization \ - --max_shard_size "${MAX_SHARD_SIZE}" -else - python - < 生成 pytorch_model.bin(FP32) +python - < BF16,并解开 lm_head <-> embed_tokens 共享存储,保存为分片 safetensors(${MAX_SHARD_SIZE})==" +python - <<'PY' +import os, sys, torch +from transformers import AutoConfig, AutoModelForCausalLM + +TMP_PT_DIR = os.environ["TMP_PT_DIR"] +OUT_DIR = os.environ["OUT_DIR"] +MAX_SHARD_SIZE = os.environ.get("MAX_SHARD_SIZE", "5GB") + +print("[load] from:", TMP_PT_DIR) +cfg = AutoConfig.from_pretrained(TMP_PT_DIR, trust_remote_code=True) +model = AutoModelForCausalLM.from_pretrained( + TMP_PT_DIR, + config=cfg, + trust_remote_code=True, + torch_dtype=torch.bfloat16, # 目标 BF16 + low_cpu_mem_usage=True, + device_map={"": "cpu"}, # 全在 CPU 装载,避免吃显存 +) + +# —— 如 lm_head 与 embed_tokens 权重共享,则手动 untie,防止后续 safetensors 报共享存储 —— # +try: + emb = model.get_input_embeddings().weight if hasattr(model, "get_input_embeddings") else None + head = model.lm_head.weight if hasattr(model, "lm_head") else None + if emb is not None and head is not None and emb.data_ptr() == head.data_ptr(): + with torch.no_grad(): + model.lm_head.weight = torch.nn.Parameter(head.detach().clone()) + print("[fix] Untied shared weights: lm_head.weight cloned from embed_tokens.weight") + else: + print("[fix] No shared storage detected between lm_head and embed_tokens") +except Exception as e: + print("[fix] Skip untie check:", e, file=sys.stderr) + +# 再确保全模型 dtype 为 BF16 +model.to(dtype=torch.bfloat16) + +# 分片 safetensors(支持大模型) +os.makedirs(OUT_DIR, exist_ok=True) +model.save_pretrained( + OUT_DIR, + safe_serialization=True, # 写 safetensors + max_shard_size=MAX_SHARD_SIZE, # 分片上限 +) +print("[save] BF16 safetensors saved to:", OUT_DIR) +PY + +echo "== 5.1/7 拷贝(/补齐)最终目录的 tokenizer 工件(如存在)==" +for f in tokenizer_config.json tokenizer.json merges.txt vocab.json special_tokens_map.json added_tokens.json; do + [[ -f "${CKPT_ROOT}/${f}" ]] && cp -n "${CKPT_ROOT}/${f}" "${OUT_DIR}/" || true +done + +echo "== 6/7 自检(索引与 config)==" python - <<'PY' import os, json, sys out_dir = os.environ.get("OUT_DIR") @@ -133,13 +169,22 @@ if os.path.exists(idx): with open(idx) as f: j = json.load(f) print(f"OK: 找到 safetensors 索引:{idx}(参数条目 {len(j.get('weight_map', {}))})") else: - print("NOTE: 未找到 model.safetensors.index.json(可能是单分片)") + # 单分片也可能没有 index.json + sfts = [x for x in os.listdir(out_dir) if x.endswith(".safetensors")] + if len(sfts) == 1: + print(f"NOTE: 单分片 safetensors:{sfts[0]}") + else: + print("WARN: 未找到 model.safetensors.index.json,且分片数 != 1", file=sys.stderr) + try: from transformers import AutoConfig - cfg = AutoConfig.from_pretrained(out_dir) + cfg = AutoConfig.from_pretrained(out_dir, trust_remote_code=True) print("OK: 读取到 config:", cfg.model_type, "hidden:", getattr(cfg,'hidden_size',None), "layers:", getattr(cfg,'num_hidden_layers',None)) except Exception as e: print("WARN: 读取 config 失败(若无 config.json 可忽略):", e, file=sys.stderr) PY -echo "== 完成:${OUT_DIR} ==" +echo "== 7/7 清理提示 ==" +echo "临时 FP32 目录:${TMP_PT_DIR}" +echo "BF16 safetensors 输出:${OUT_DIR}" +echo "完成。"