435 lines
14 KiB
Python
435 lines
14 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import unittest
|
|
from types import SimpleNamespace
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
from sglang.test.few_shot_gsm8k import run_eval as run_eval_few_shot_gsm8k
|
|
from sglang.test.test_disaggregation_utils import TestDisaggregationBase
|
|
from sglang.test.test_utils import (
|
|
DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST,
|
|
DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST,
|
|
DEFAULT_MODEL_NAME_FOR_TEST,
|
|
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
DEFAULT_URL_FOR_TEST,
|
|
popen_launch_pd_server,
|
|
)
|
|
|
|
|
|
class TestDisaggregationAccuracy(TestDisaggregationBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
|
parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
|
|
cls.base_host = parsed_url.hostname
|
|
base_port = str(parsed_url.port)
|
|
cls.lb_port = base_port
|
|
cls.prefill_port = f"{int(base_port) + 100}"
|
|
cls.decode_port = f"{int(base_port) + 200}"
|
|
cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
|
|
cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
|
|
cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
|
|
print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
|
|
|
|
# Non blocking start servers
|
|
cls.start_prefill()
|
|
cls.start_decode()
|
|
|
|
# Block until both
|
|
cls.wait_server_ready(cls.prefill_url + "/health")
|
|
cls.wait_server_ready(cls.decode_url + "/health")
|
|
|
|
cls.launch_lb()
|
|
|
|
@classmethod
|
|
def start_prefill(cls):
|
|
prefill_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"prefill",
|
|
"--tp",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce0",
|
|
]
|
|
cls.process_prefill = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.prefill_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=prefill_args,
|
|
)
|
|
|
|
@classmethod
|
|
def start_decode(cls):
|
|
decode_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"decode",
|
|
"--tp",
|
|
"1",
|
|
"--base-gpu-id",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce1",
|
|
]
|
|
cls.process_decode = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.decode_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=decode_args,
|
|
)
|
|
|
|
def test_gsm8k(self):
|
|
args = SimpleNamespace(
|
|
num_shots=5,
|
|
data_path=None,
|
|
num_questions=200,
|
|
max_new_tokens=512,
|
|
parallel=128,
|
|
host=f"http://{self.base_host}",
|
|
port=int(self.lb_port),
|
|
)
|
|
metrics = run_eval_few_shot_gsm8k(args)
|
|
print(f"Evaluation metrics: {metrics}")
|
|
|
|
self.assertGreater(metrics["accuracy"], 0.62)
|
|
|
|
def test_logprob(self):
|
|
prompt = "The capital of france is "
|
|
response = requests.post(
|
|
self.lb_url + "/generate",
|
|
json={
|
|
"text": prompt,
|
|
"sampling_params": {"temperature": 0},
|
|
"return_logprob": True,
|
|
"return_input_logprob": True,
|
|
"logprob_start_len": 0,
|
|
},
|
|
)
|
|
|
|
j = response.json()
|
|
completion_tokens = j["meta_info"]["completion_tokens"]
|
|
input_logprobs = j["meta_info"]["input_token_logprobs"]
|
|
output_logprobs = j["meta_info"]["output_token_logprobs"]
|
|
|
|
assert (
|
|
len(output_logprobs) == completion_tokens
|
|
), f"output_logprobs and completion_tokens should have the same length, but got {len(output_logprobs)} and {completion_tokens}"
|
|
assert (
|
|
len(input_logprobs) > 0
|
|
), f"input_logprobs should have at least one token, but got {len(input_logprobs)}"
|
|
|
|
def test_structured_output(self):
|
|
json_schema = json.dumps(
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "pattern": "^[\\w]+$"},
|
|
"population": {"type": "integer"},
|
|
},
|
|
"required": ["name", "population"],
|
|
}
|
|
)
|
|
|
|
# JSON
|
|
response = requests.post(
|
|
f"{self.lb_url}/generate",
|
|
json={
|
|
"text": "Here is the information of the capital of France in the JSON format.\n",
|
|
"sampling_params": {
|
|
"temperature": 0,
|
|
"max_new_tokens": 64,
|
|
"json_schema": json_schema,
|
|
},
|
|
},
|
|
)
|
|
output = response.json()["text"]
|
|
# ensure the output is a valid JSON
|
|
json.loads(output)
|
|
|
|
|
|
class TestDisaggregationMooncakeFailure(TestDisaggregationBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
# set DISAGGREGATION_TEST_FAILURE_PROB to simulate failure
|
|
os.environ["DISAGGREGATION_TEST_FAILURE_PROB"] = "0.05"
|
|
|
|
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
|
parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
|
|
cls.base_host = parsed_url.hostname
|
|
base_port = str(parsed_url.port)
|
|
cls.lb_port = base_port
|
|
cls.prefill_port = f"{int(base_port) + 100}"
|
|
cls.decode_port = f"{int(base_port) + 200}"
|
|
cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
|
|
cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
|
|
cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
|
|
print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
|
|
|
|
# Non blocking start servers
|
|
cls.start_prefill()
|
|
cls.start_decode()
|
|
|
|
# Block until both
|
|
cls.wait_server_ready(cls.prefill_url + "/health")
|
|
cls.wait_server_ready(cls.decode_url + "/health")
|
|
|
|
cls.launch_lb()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
os.environ.pop("DISAGGREGATION_TEST_FAILURE_PROB")
|
|
super().tearDownClass()
|
|
|
|
@classmethod
|
|
def start_prefill(cls):
|
|
prefill_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"prefill",
|
|
"--tp",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce0",
|
|
]
|
|
cls.process_prefill = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.prefill_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=prefill_args,
|
|
)
|
|
|
|
@classmethod
|
|
def start_decode(cls):
|
|
decode_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"decode",
|
|
"--tp",
|
|
"1",
|
|
"--base-gpu-id",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce1",
|
|
]
|
|
cls.process_decode = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.decode_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=decode_args,
|
|
)
|
|
|
|
def test_gsm8k(self):
|
|
args = SimpleNamespace(
|
|
num_shots=5,
|
|
data_path=None,
|
|
num_questions=200,
|
|
max_new_tokens=512,
|
|
parallel=128,
|
|
host=f"http://{self.base_host}",
|
|
port=int(self.lb_port),
|
|
)
|
|
|
|
# Expect lots of failure but the server cannot crash
|
|
try:
|
|
metrics = run_eval_few_shot_gsm8k(args)
|
|
print(f"Evaluation metrics: {metrics}")
|
|
except Exception as e:
|
|
print(f"Test encountered expected errors: {e}")
|
|
# Check if servers are still healthy
|
|
try:
|
|
response = requests.get(self.prefill_url + "/health_generate")
|
|
assert response.status_code == 200
|
|
response = requests.get(self.decode_url + "/health_generate")
|
|
assert response.status_code == 200
|
|
except Exception as health_check_error:
|
|
# If health check fails, re-raise the original exception
|
|
raise e from health_check_error
|
|
|
|
|
|
class TestDisaggregationMooncakeSpec(TestDisaggregationBase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.model = DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST
|
|
cls.draft_model = DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST
|
|
parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
|
|
cls.base_host = parsed_url.hostname
|
|
base_port = str(parsed_url.port)
|
|
cls.lb_port = base_port
|
|
cls.prefill_port = f"{int(base_port) + 100}"
|
|
cls.decode_port = f"{int(base_port) + 200}"
|
|
cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
|
|
cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
|
|
cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
|
|
cls.spec_args = [
|
|
"--speculative-algorithm",
|
|
"EAGLE",
|
|
"--speculative-draft-model-path",
|
|
cls.draft_model,
|
|
"--speculative-num-steps",
|
|
"3",
|
|
"--speculative-eagle-topk",
|
|
"4",
|
|
"--speculative-num-draft-tokens",
|
|
"16",
|
|
"--cuda-graph-max-bs",
|
|
"8",
|
|
]
|
|
print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
|
|
|
|
# Non blocking start servers
|
|
cls.start_prefill()
|
|
cls.start_decode()
|
|
|
|
# Block until both
|
|
cls.wait_server_ready(cls.prefill_url + "/health")
|
|
cls.wait_server_ready(cls.decode_url + "/health")
|
|
|
|
cls.launch_lb()
|
|
|
|
@classmethod
|
|
def start_prefill(cls):
|
|
prefill_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"prefill",
|
|
"--tp",
|
|
"2",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce0,mlx5_roce1",
|
|
] + cls.spec_args
|
|
cls.process_prefill = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.prefill_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=prefill_args,
|
|
)
|
|
|
|
@classmethod
|
|
def start_decode(cls):
|
|
decode_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"decode",
|
|
"--tp",
|
|
"2",
|
|
"--base-gpu-id",
|
|
"2",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce2,mlx5_roce3",
|
|
] + cls.spec_args
|
|
cls.process_decode = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.decode_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=decode_args,
|
|
)
|
|
|
|
def test_gsm8k(self):
|
|
args = SimpleNamespace(
|
|
num_shots=5,
|
|
data_path=None,
|
|
num_questions=200,
|
|
max_new_tokens=512,
|
|
parallel=2,
|
|
host=f"http://{self.base_host}",
|
|
port=int(self.lb_port),
|
|
)
|
|
metrics = run_eval_few_shot_gsm8k(args)
|
|
print(f"Evaluation metrics: {metrics}")
|
|
|
|
self.assertGreater(metrics["accuracy"], 0.20)
|
|
|
|
|
|
class TestDisaggregationSimulatedRetract(TestDisaggregationBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
os.environ["SGLANG_TEST_RETRACT"] = "true"
|
|
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
|
parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
|
|
cls.base_host = parsed_url.hostname
|
|
base_port = str(parsed_url.port)
|
|
cls.lb_port = base_port
|
|
cls.prefill_port = f"{int(base_port) + 100}"
|
|
cls.decode_port = f"{int(base_port) + 200}"
|
|
cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
|
|
cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
|
|
cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
|
|
print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
|
|
|
|
# Non blocking start servers
|
|
cls.start_prefill()
|
|
cls.start_decode()
|
|
|
|
# Block until both
|
|
cls.wait_server_ready(cls.prefill_url + "/health")
|
|
cls.wait_server_ready(cls.decode_url + "/health")
|
|
|
|
cls.launch_lb()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
os.environ.pop("SGLANG_TEST_RETRACT")
|
|
super().tearDownClass()
|
|
|
|
@classmethod
|
|
def start_prefill(cls):
|
|
prefill_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"prefill",
|
|
"--tp",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce0",
|
|
]
|
|
cls.process_prefill = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.prefill_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=prefill_args,
|
|
)
|
|
|
|
@classmethod
|
|
def start_decode(cls):
|
|
decode_args = [
|
|
"--trust-remote-code",
|
|
"--disaggregation-mode",
|
|
"decode",
|
|
"--tp",
|
|
"1",
|
|
"--base-gpu-id",
|
|
"1",
|
|
"--disaggregation-ib-device",
|
|
"mlx5_roce1",
|
|
]
|
|
cls.process_decode = popen_launch_pd_server(
|
|
cls.model,
|
|
cls.decode_url,
|
|
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
|
other_args=decode_args,
|
|
)
|
|
|
|
def test_gsm8k(self):
|
|
args = SimpleNamespace(
|
|
num_shots=5,
|
|
data_path=None,
|
|
num_questions=200,
|
|
max_new_tokens=512,
|
|
parallel=128,
|
|
host=f"http://{self.base_host}",
|
|
port=int(self.lb_port),
|
|
)
|
|
metrics = run_eval_few_shot_gsm8k(args)
|
|
print(f"Evaluation metrics: {metrics}")
|
|
|
|
self.assertGreater(metrics["accuracy"], 0.62)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|