dv/backend/shared/http_client.py

34 lines
1.4 KiB
Python

from __future__ import annotations
from typing import Any
import httpx
class ServiceHttpClient:
"""Shared async HTTP client for inter-service communication."""
def __init__(self, base_url: str, timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
async def get(self, path: str, params: dict[str, Any] | None = None) -> Any:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(f"{self.base_url}{path}", params=params)
resp.raise_for_status()
return resp.json()
async def post(self, path: str, json: dict[str, Any] | None = None) -> Any:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(f"{self.base_url}{path}", json=json)
resp.raise_for_status()
return resp.json()
async def put(self, path: str, json: dict[str, Any] | None = None) -> Any:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.put(f"{self.base_url}{path}", json=json)
resp.raise_for_status()
return resp.json()
async def delete(self, path: str) -> None:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.delete(f"{self.base_url}{path}")
resp.raise_for_status()