import multiprocessing import time import unittest from concurrent.futures import ThreadPoolExecutor import requests from sglang.test.test_utils import CustomTestCase, run_and_check_memory_leak class TestAbort(CustomTestCase): def workload_func(self, base_url, model): def process_func(): def run_one(_): prompt = """ System: You are a helpful assistant. User: What is the capital of France? Assistant: The capital of France is """ response = requests.post( f"{base_url}/generate", json={ "text": prompt, "sampling_params": { "temperature": 0, "max_new_tokens": 2048, }, }, ) ret = response.json() with ThreadPoolExecutor(16) as executor: list(executor.map(run_one, list(range(16)))) p = multiprocessing.Process(target=process_func) p.start() time.sleep(0.5) p.terminate() time.sleep(10) def test_memory_leak(self): run_and_check_memory_leak( self.workload_func, disable_radix_cache=False, enable_mixed_chunk=False, disable_overlap=False, chunked_prefill_size=8192, assert_has_abort=True, ) if __name__ == "__main__": unittest.main()