# Copyright 2023-2024 SGLang Team # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Inference-only DeepSeek NextN Speculative Decoding.""" from typing import Iterable, Optional, Tuple import torch from torch import nn from transformers import PretrainedConfig from sglang.srt.layers.layernorm import RMSNorm from sglang.srt.layers.linear import ReplicatedLinear from sglang.srt.layers.logits_processor import LogitsProcessor from sglang.srt.layers.moe.ep_moe.layer import EPMoE from sglang.srt.layers.moe.fused_moe_triton import FusedMoE from sglang.srt.layers.quantization.base_config import QuantizationConfig from sglang.srt.layers.quantization.fp8_utils import ( block_quant_to_tensor_quant, normalize_e4m3fn_to_e4m3fnuz, ) from sglang.srt.layers.quantization.int8_utils import ( block_dequant as int8_block_dequant, ) from sglang.srt.layers.vocab_parallel_embedding import ( ParallelLMHead, VocabParallelEmbedding, ) from sglang.srt.managers.schedule_batch import global_server_args_dict from sglang.srt.model_executor.forward_batch_info import ForwardBatch from sglang.srt.model_loader.weight_utils import default_weight_loader from sglang.srt.models.deepseek_v2 import DeepseekV2DecoderLayer, DeepseekV3ForCausalLM from sglang.srt.utils import add_prefix, is_cuda, is_hip _is_hip = is_hip() _is_cuda = is_cuda() if _is_cuda: from sgl_kernel import awq_dequantize else: from vllm import _custom_ops as ops class DeepseekModelNextN(nn.Module): def __init__( self, config: PretrainedConfig, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", ) -> None: super().__init__() self.vocab_size = config.vocab_size self.embed_tokens = VocabParallelEmbedding( config.vocab_size, config.hidden_size, enable_tp=not global_server_args_dict["enable_dp_attention"], prefix=add_prefix("embed_tokens", prefix), ) self.enorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.hnorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.eh_proj = nn.Linear(2 * config.hidden_size, config.hidden_size, bias=False) self.decoder = DeepseekV2DecoderLayer( config, 0, quant_config=quant_config, is_nextn=True, prefix=add_prefix("decoder", prefix), ) self.shared_head = nn.Module() self.shared_head.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, forward_batch: ForwardBatch, input_embeds: torch.Tensor = None, ) -> torch.Tensor: if input_embeds is None: hidden_states = self.embed_tokens(input_ids) else: hidden_states = input_embeds hidden_states = self.eh_proj( torch.cat( ( self.enorm(hidden_states), self.hnorm(forward_batch.spec_info.hidden_states), ), dim=-1, ) ) residual = None hidden_states, residual = self.decoder( positions, hidden_states, forward_batch, residual ) if not forward_batch.forward_mode.is_idle(): hidden_states, _ = self.shared_head.norm(hidden_states, residual) return hidden_states class DeepseekV3ForCausalLMNextN(DeepseekV3ForCausalLM): def __init__( self, config: PretrainedConfig, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", ) -> None: nn.Module.__init__(self) self.config = config self.quant_config = quant_config self.model = DeepseekModelNextN( config, quant_config, prefix=add_prefix("model", prefix) ) if global_server_args_dict["enable_dp_attention"]: self.lm_head = ReplicatedLinear( config.hidden_size, config.vocab_size, bias=False, prefix=add_prefix("model.shared_head.head", prefix), ) self.logits_processor = LogitsProcessor(config, skip_all_gather=True) else: self.lm_head = ParallelLMHead( config.vocab_size, config.hidden_size, quant_config=quant_config, prefix=add_prefix("model.shared_head.head", prefix), ) self.logits_processor = LogitsProcessor(config) @torch.no_grad() def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, forward_batch: ForwardBatch, ) -> torch.Tensor: hidden_states = self.model(input_ids, positions, forward_batch) return self.logits_processor( input_ids, hidden_states, self.lm_head, forward_batch ) def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]): if hasattr(self.config, "num_nextn_predict_layers"): num_nextn_layers = self.config.num_nextn_predict_layers assert num_nextn_layers == 1, "Only 1 nextn layer is supportted" assert num_nextn_layers == self.config.num_hidden_layers else: raise ValueError("num_nextn_predict_layers is not in the config") stacked_params_mapping = [ # (param_name, shard_name, shard_id) ("gate_up_proj", "gate_proj", 0), ("gate_up_proj", "up_proj", 1), ] # Params for weights, fp8 weight scales, fp8 activation scales # (param_name, weight_name, expert_id, shard_id) MoEImpl = EPMoE if global_server_args_dict["enable_ep_moe"] else FusedMoE expert_params_mapping = MoEImpl.make_expert_params_mapping( ckpt_gate_proj_name="gate_proj", ckpt_down_proj_name="down_proj", ckpt_up_proj_name="up_proj", num_experts=self.config.n_routed_experts, ) nextn_layer_prefix = "model.layers.0" nextn_spec_weight_names = [ "shared_head.norm", "eh_proj", "enorm", "hnorm", ] params_dict = dict(self.named_parameters()) for name, loaded_weight in weights: if not name.startswith(nextn_layer_prefix): continue # Use shared head and embed weights from target model if "shared_head.head" in name or "embed_tokens" in name: continue is_decoder = True # For nextn specific weights for weight_name in nextn_spec_weight_names: if weight_name in name: name = name.replace(nextn_layer_prefix, "model") is_decoder = False break # For decoder layer weights if is_decoder: name = name.replace(nextn_layer_prefix, "model.decoder") if "rotary_emb.inv_freq" in name: continue for param_name, weight_name, shard_id in stacked_params_mapping: # Skip non-stacked layers and experts (experts handled below). if weight_name not in name: continue # We have mlp.experts[0].gate_proj in the checkpoint. # Since we handle the experts below in expert_params_mapping, # we need to skip here BEFORE we update the name, otherwise # name will be updated to mlp.experts[0].gate_up_proj, which # will then be updated below in expert_params_mapping # for mlp.experts[0].gate_gate_up_proj, which breaks load. if ("mlp.experts." in name) and name not in params_dict: continue name = name.replace(weight_name, param_name) # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue param = params_dict[name] weight_loader = param.weight_loader weight_loader(param, loaded_weight, shard_id) break else: for mapping in expert_params_mapping: param_name, weight_name, expert_id, shard_id = mapping if weight_name not in name: continue name = name.replace(weight_name, param_name) param = params_dict[name] weight_loader = param.weight_loader weight_loader( param, loaded_weight, name, shard_id=shard_id, expert_id=expert_id, ) break else: # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue param = params_dict[name] weight_loader = getattr( param, "weight_loader", default_weight_loader ) weight_loader(param, loaded_weight) if not global_server_args_dict["disable_mla"]: self_attn = self.model.decoder.self_attn if hasattr(self_attn.kv_b_proj, "qweight"): # AWQ compatible if _is_cuda: w = awq_dequantize( self_attn.kv_b_proj.qweight, self_attn.kv_b_proj.scales, self_attn.kv_b_proj.qzeros, ).T else: w = ops.awq_dequantize( self_attn.kv_b_proj.qweight, self_attn.kv_b_proj.scales, self_attn.kv_b_proj.qzeros, 0, 0, 0, ).T else: w = self_attn.kv_b_proj.weight # NOTE(HandH1998): Since `bmm_fp8` only supports per-tensor scale, we have to requantize `self_attn.kv_b_proj`. # This may affect the accuracy of fp8 model. if hasattr(self.quant_config, "weight_block_size") and w.dtype in ( torch.float8_e4m3fn, torch.float8_e4m3fnuz, ): weight_block_size = self.quant_config.weight_block_size if weight_block_size is not None: assert hasattr(self_attn.kv_b_proj, "weight_scale_inv") if _is_hip: weight, weight_scale, _ = normalize_e4m3fn_to_e4m3fnuz( weight=w, weight_scale=self_attn.kv_b_proj.weight_scale_inv, input_scale=None, ) else: weight = w weight_scale = self_attn.kv_b_proj.weight_scale_inv w, scale = block_quant_to_tensor_quant( weight, weight_scale, weight_block_size ) self_attn.w_scale = scale if w.dtype == torch.int8: if hasattr(self.quant_config, "weight_block_size"): # block-wise int8 need it weight_block_size = self.quant_config.weight_block_size if weight_block_size is not None: assert hasattr(self_attn.kv_b_proj, "weight_scale_inv") weight = w weight_scale = self_attn.kv_b_proj.weight_scale_inv w = int8_block_dequant( weight, weight_scale, weight_block_size ).to(torch.bfloat16) else: # channel-wise int8 need it assert hasattr(self_attn.kv_b_proj, "weight_scale") w = w.to(torch.bfloat16) * self_attn.kv_b_proj.weight_scale.to( torch.bfloat16 ) w_kc, w_vc = w.unflatten( 0, (-1, self_attn.qk_nope_head_dim + self_attn.v_head_dim) ).split([self_attn.qk_nope_head_dim, self_attn.v_head_dim], dim=1) self_attn.w_kc = w_kc.transpose(1, 2).contiguous().transpose(1, 2) self_attn.w_vc = w_vc.contiguous().transpose(1, 2) if ( hasattr(self_attn.kv_b_proj, "weight_scale") and self_attn.w_scale is None ): self_attn.w_scale = self_attn.kv_b_proj.weight_scale if _is_hip: self_attn.w_scale *= 2.0 EntryClass = [DeepseekV3ForCausalLMNextN]