107 lines
3.4 KiB
Python
107 lines
3.4 KiB
Python
# Copyright 2023-2024 SGLang Team
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ==============================================================================
|
|
"""
|
|
Records the latency of some functions
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from functools import wraps
|
|
from typing import Any, Callable, List, Optional
|
|
|
|
enable_metrics = False
|
|
|
|
|
|
def enable_func_timer():
|
|
# We need to import prometheus_client after setting the env variable `PROMETHEUS_MULTIPROC_DIR`
|
|
from prometheus_client import Histogram
|
|
|
|
global enable_metrics, FUNC_LATENCY
|
|
enable_metrics = True
|
|
|
|
FUNC_LATENCY = Histogram(
|
|
"sglang:func_latency_seconds",
|
|
"Function latency in seconds",
|
|
# captures latency in range [50ms - ~50s]
|
|
buckets=exponential_buckets(start=0.05, width=1.5, length=18),
|
|
labelnames=["name"],
|
|
)
|
|
|
|
|
|
FUNC_LATENCY = None
|
|
|
|
|
|
def exponential_buckets(start: float, width: float, length: int) -> List[float]:
|
|
buckets = []
|
|
for i in range(length):
|
|
buckets.append(start * (width**i))
|
|
return buckets
|
|
|
|
|
|
def time_func_latency(
|
|
func: Callable = None, name: Optional[str] = None
|
|
) -> Callable[..., Any]:
|
|
"""
|
|
A decorator to observe the latency of a function's execution. Supports both sync and async functions.
|
|
|
|
NOTE: We use our own implementation of a timer decorator since prometheus_client does not support async
|
|
context manager yet.
|
|
|
|
Overhead: The overhead introduced here in case of an async function could likely be because of `await` introduced
|
|
which will return in another coroutine object creation and under heavy load could see longer wall time
|
|
(scheduling delays due to introduction of another awaitable).
|
|
"""
|
|
|
|
def measure(func: Callable[..., Any]) -> Callable[..., Any]:
|
|
nonlocal name
|
|
|
|
name = name or func.__name__
|
|
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
if not enable_metrics:
|
|
return await func(*args, **kwargs)
|
|
|
|
metric = FUNC_LATENCY
|
|
start = time.monotonic()
|
|
ret = func(*args, **kwargs)
|
|
if isinstance(ret, asyncio.Future) or asyncio.iscoroutine(ret):
|
|
try:
|
|
ret = await ret
|
|
finally:
|
|
metric.labels(name=name).observe(time.monotonic() - start)
|
|
return ret
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
if not enable_metrics:
|
|
return func(*args, **kwargs)
|
|
|
|
metric = FUNC_LATENCY
|
|
start = time.monotonic()
|
|
try:
|
|
ret = func(*args, **kwargs)
|
|
finally:
|
|
metric.labels(name=name).observe(time.monotonic() - start)
|
|
return ret
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
if func:
|
|
return measure(func)
|
|
else:
|
|
return measure
|