feat: DataViz Pro full-stack data visualization platform

- Frontend: Next.js + React + TypeScript + ECharts + Ant Design + Redux/Zustand (Clean Architecture)
- Backend: FastAPI + PostgreSQL + SQLAlchemy (DDD + Clean Architecture + Microservices)
- 4 microservices: data-service, chart-service, template-service, export-service
- 15+ chart types, drag-drop layout, multi-format export
- Docker Compose orchestration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hailin 2026-04-04 23:20:12 -07:00
commit 6e3127e7d6
265 changed files with 8450 additions and 0 deletions

58
.gitignore vendored Normal file
View File

@ -0,0 +1,58 @@
# Dependencies
node_modules/
__pycache__/
*.pyc
*.pyo
.venv/
venv/
env/
# Build outputs
frontend/out/
frontend/.next/
*.egg-info/
dist/
build/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
desktop.ini
# Environment
.env
.env.local
.env.*.local
!.env.example
# Logs
*.log
npm-debug.log*
# Docker volumes
backend/pgdata/
# Exports
backend/services/export-service/exports/
# Python
*.egg
.mypy_cache/
.ruff_cache/
.pytest_cache/
htmlcov/
.coverage
# Alembic
backend/services/*/alembic/versions/*.py
!backend/services/*/alembic/versions/__init__.py
# Package lock (keep package-lock.json for frontend)
poetry.lock

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# DataViz Pro
Data visualization platform for statistical analysis and chart rendering.

View File

@ -0,0 +1,77 @@
version: "3.9"
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: dataviz
POSTGRES_PASSWORD: dataviz_local
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./init-databases.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dataviz"]
interval: 5s
retries: 5
data-service:
build:
context: .
dockerfile: services/data-service/Dockerfile
ports:
- "8001:8000"
environment:
DATABASE_URL: postgresql+asyncpg://dataviz:dataviz_local@postgres:5432/data_db
depends_on:
postgres:
condition: service_healthy
chart-service:
build:
context: .
dockerfile: services/chart-service/Dockerfile
ports:
- "8002:8000"
environment:
DATABASE_URL: postgresql+asyncpg://dataviz:dataviz_local@postgres:5432/chart_db
DATA_SERVICE_URL: http://data-service:8000
depends_on:
postgres:
condition: service_healthy
data-service:
condition: service_started
template-service:
build:
context: .
dockerfile: services/template-service/Dockerfile
ports:
- "8003:8000"
environment:
DATABASE_URL: postgresql+asyncpg://dataviz:dataviz_local@postgres:5432/template_db
depends_on:
postgres:
condition: service_healthy
export-service:
build:
context: .
dockerfile: services/export-service/Dockerfile
ports:
- "8004:8000"
environment:
DATABASE_URL: postgresql+asyncpg://dataviz:dataviz_local@postgres:5432/export_db
DATA_SERVICE_URL: http://data-service:8000
CHART_SERVICE_URL: http://chart-service:8000
depends_on:
postgres:
condition: service_healthy
data-service:
condition: service_started
chart-service:
condition: service_started
volumes:
pgdata:

View File

@ -0,0 +1,4 @@
CREATE DATABASE data_db;
CREATE DATABASE chart_db;
CREATE DATABASE template_db;
CREATE DATABASE export_db;

19
backend/pyproject.toml Normal file
View File

@ -0,0 +1,19 @@
[tool.poetry]
name = "dataviz-pro-backend"
version = "0.1.0"
description = "DataViz Pro Backend Monorepo"
packages = [{include = "shared"}]
[tool.poetry.dependencies]
python = "^3.12"
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
[tool.mypy]
python_version = "3.12"
strict = true

View File

@ -0,0 +1,29 @@
FROM python:3.12-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
# Copy shared library
COPY shared/ /app/shared/
# Copy service code
COPY services/chart-service/pyproject.toml /app/
COPY services/chart-service/src/ /app/src/
COPY services/chart-service/alembic/ /app/alembic/
COPY services/chart-service/alembic.ini /app/alembic.ini
# Install dependencies
RUN pip install --no-cache-dir poetry && \
poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi --no-root
# Set PYTHONPATH to include shared
ENV PYTHONPATH=/app:/app/src
EXPOSE 8000
CMD ["python", "-m", "src.infrastructure.main"]

View File

@ -0,0 +1,37 @@
[alembic]
script_location = .
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,69 @@
from __future__ import annotations
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from src.adapters.persistence.models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def get_database_url() -> str:
return os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/chart_db",
)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = get_database_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection): # noqa: ANN001
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode using an async engine."""
connectable = create_async_engine(
get_database_url(),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,15 @@
[tool.poetry]
name = "chart-service"
version = "0.1.0"
description = "DataViz Pro Chart Service"
[tool.poetry.dependencies]
python = "^3.12"
fastapi = "^0.115"
uvicorn = {extras = ["standard"], version = "^0.34"}
sqlalchemy = {extras = ["asyncio"], version = "^2.0"}
asyncpg = "^0.30"
alembic = "^1.14"
pydantic = "^2.10"
pydantic-settings = "^2.7"
httpx = "^0.27"

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,29 @@
from __future__ import annotations
import uuid
from shared.http_client import ServiceHttpClient
from src.application.ports.output.data_service_client import IDataServiceClient
class DataServiceClientImpl(IDataServiceClient):
"""Calls data-service over HTTP using the shared ServiceHttpClient."""
def __init__(self, http_client: ServiceHttpClient) -> None:
self._http = http_client
async def get_dataset(self, dataset_id: uuid.UUID) -> dict:
"""Fetch dataset metadata + structure from data-service."""
structure = await self._http.get(f"/api/v1/datasets/{dataset_id}/structure")
return structure
async def get_rows(
self, dataset_id: uuid.UUID, limit: int = 10000, offset: int = 0
) -> list[dict]:
"""Fetch dataset rows from data-service."""
result = await self._http.get(
f"/api/v1/datasets/{dataset_id}/rows",
params={"limit": limit, "offset": offset},
)
return result.get("rows", [])

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,198 @@
from __future__ import annotations
from typing import Any
from collections import defaultdict
from shared.types import ChartType
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def _aggregate(values: list[Any], aggregation: str | None) -> Any:
nums = [v for v in values if isinstance(v, (int, float))]
if not nums:
return 0
if aggregation == "sum" or aggregation is None:
return sum(nums)
if aggregation == "avg":
return sum(nums) / len(nums)
if aggregation == "count":
return len(values)
if aggregation == "max":
return max(nums)
if aggregation == "min":
return min(nums)
return sum(nums)
def build_bar_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Standard vertical bar chart."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
if not x_bind or not y_bind:
return {"series": []}
# Group by x values
groups: dict[str, list[Any]] = defaultdict(list)
for row in data:
key = str(row.get(x_bind.column_name, ""))
groups[key].append(row.get(y_bind.column_name, 0))
categories = list(groups.keys())
values = [_aggregate(groups[c], y_bind.aggregation) for c in categories]
return {
"tooltip": {"trigger": "axis"},
"xAxis": {"type": "category", "data": categories},
"yAxis": {"type": "value"},
"series": [
{
"type": "bar",
"name": y_bind.column_name,
"data": values,
}
],
**_style_overrides(chart.style),
}
def build_grouped_bar_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Grouped bar chart with a group binding."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
g_bind = _get_binding(chart.bindings, "group")
if not x_bind or not y_bind or not g_bind:
return {"series": []}
groups: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
all_categories: list[str] = []
all_groups: list[str] = []
for row in data:
cat = str(row.get(x_bind.column_name, ""))
grp = str(row.get(g_bind.column_name, ""))
if cat not in all_categories:
all_categories.append(cat)
if grp not in all_groups:
all_groups.append(grp)
groups[grp][cat].append(row.get(y_bind.column_name, 0))
series = []
for grp in all_groups:
series.append(
{
"type": "bar",
"name": grp,
"data": [
_aggregate(groups[grp].get(c, []), y_bind.aggregation)
for c in all_categories
],
}
)
return {
"tooltip": {"trigger": "axis"},
"legend": {"data": all_groups},
"xAxis": {"type": "category", "data": all_categories},
"yAxis": {"type": "value"},
"series": series,
**_style_overrides(chart.style),
}
def build_stacked_bar_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Stacked bar chart."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
s_bind = _get_binding(chart.bindings, "stack")
if not x_bind or not y_bind or not s_bind:
return {"series": []}
groups: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
all_categories: list[str] = []
all_stacks: list[str] = []
for row in data:
cat = str(row.get(x_bind.column_name, ""))
stk = str(row.get(s_bind.column_name, ""))
if cat not in all_categories:
all_categories.append(cat)
if stk not in all_stacks:
all_stacks.append(stk)
groups[stk][cat].append(row.get(y_bind.column_name, 0))
series = []
for stk in all_stacks:
series.append(
{
"type": "bar",
"name": stk,
"stack": "total",
"data": [
_aggregate(groups[stk].get(c, []), y_bind.aggregation)
for c in all_categories
],
}
)
return {
"tooltip": {"trigger": "axis"},
"legend": {"data": all_stacks},
"xAxis": {"type": "category", "data": all_categories},
"yAxis": {"type": "value"},
"series": series,
**_style_overrides(chart.style),
}
def build_horizontal_bar_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Horizontal bar chart (axes swapped)."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
if not x_bind or not y_bind:
return {"series": []}
groups: dict[str, list[Any]] = defaultdict(list)
for row in data:
key = str(row.get(x_bind.column_name, ""))
groups[key].append(row.get(y_bind.column_name, 0))
categories = list(groups.keys())
values = [_aggregate(groups[c], y_bind.aggregation) for c in categories]
return {
"tooltip": {"trigger": "axis"},
"xAxis": {"type": "value"},
"yAxis": {"type": "category", "data": categories},
"series": [
{
"type": "bar",
"name": y_bind.column_name,
"data": values,
}
],
**_style_overrides(chart.style),
}
def _style_overrides(style: dict[str, Any]) -> dict[str, Any]:
"""Extract top-level ECharts keys from style config."""
out: dict[str, Any] = {}
if "title" in style:
out["title"] = style["title"]
if "backgroundColor" in style:
out["backgroundColor"] = style["backgroundColor"]
return out

View File

@ -0,0 +1,127 @@
from __future__ import annotations
from typing import Any, Callable
from shared.types import ChartType
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
from src.domain.services.option_builder import IOptionBuilder
from src.adapters.option_builders.bar_builder import (
build_bar_option,
build_grouped_bar_option,
build_horizontal_bar_option,
build_stacked_bar_option,
)
from src.adapters.option_builders.combo_builder import build_combo_option
from src.adapters.option_builders.heatmap_builder import build_heatmap_option
from src.adapters.option_builders.line_builder import build_area_option, build_line_option
from src.adapters.option_builders.map_builder import build_map_option
from src.adapters.option_builders.pie_builder import build_donut_option, build_pie_option
from src.adapters.option_builders.radar_builder import build_radar_option
from src.adapters.option_builders.scatter_builder import (
build_boston_matrix_option,
build_scatter_option,
)
from src.adapters.option_builders.wordcloud_builder import build_wordcloud_option
BuilderFn = Callable[[ChartInstance, list[dict]], dict[str, Any]]
def _build_kpi_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""KPI card: return a simple value dict, not a full ECharts option."""
val_bind: FieldBinding | None = None
label_bind: FieldBinding | None = None
comparison_bind: FieldBinding | None = None
for b in chart.bindings:
if b.axis == "value":
val_bind = b
elif b.axis == "label":
label_bind = b
elif b.axis == "comparison":
comparison_bind = b
if not val_bind or not data:
return {"value": 0, "label": ""}
# Aggregate all rows into a single value
nums = [
row.get(val_bind.column_name, 0)
for row in data
if isinstance(row.get(val_bind.column_name), (int, float))
]
agg = val_bind.aggregation or "sum"
if agg == "sum":
result = sum(nums) if nums else 0
elif agg == "avg":
result = sum(nums) / len(nums) if nums else 0
elif agg == "count":
result = len(data)
elif agg == "max":
result = max(nums) if nums else 0
elif agg == "min":
result = min(nums) if nums else 0
else:
result = sum(nums) if nums else 0
label = ""
if label_bind and data:
label = str(data[0].get(label_bind.column_name, ""))
comparison = None
if comparison_bind and data:
comp_nums = [
row.get(comparison_bind.column_name, 0)
for row in data
if isinstance(row.get(comparison_bind.column_name), (int, float))
]
comparison = sum(comp_nums) if comp_nums else None
out: dict[str, Any] = {"value": result, "label": label}
if comparison is not None:
out["comparison"] = comparison
return out
def _build_data_table_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Data table: return raw rows."""
return {"rows": data}
_BUILDERS: dict[ChartType, BuilderFn] = {
ChartType.KPI: _build_kpi_option,
ChartType.BAR: build_bar_option,
ChartType.GROUPED_BAR: build_grouped_bar_option,
ChartType.STACKED_BAR: build_stacked_bar_option,
ChartType.HORIZONTAL_BAR: build_horizontal_bar_option,
ChartType.LINE: build_line_option,
ChartType.AREA: build_area_option,
ChartType.PIE: build_pie_option,
ChartType.DONUT: build_donut_option,
ChartType.SCATTER: build_scatter_option,
ChartType.BOSTON_MATRIX: build_boston_matrix_option,
ChartType.RADAR: build_radar_option,
ChartType.WORDCLOUD: build_wordcloud_option,
ChartType.HEATMAP: build_heatmap_option,
ChartType.MAP: build_map_option,
ChartType.COMBO: build_combo_option,
ChartType.DATA_TABLE: _build_data_table_option,
}
def get_builder(chart_type: ChartType) -> BuilderFn:
"""Return the option builder function for the given chart type."""
builder = _BUILDERS.get(chart_type)
if builder is None:
raise ValueError(f"No option builder registered for chart type: {chart_type}")
return builder
class EChartsOptionBuilderAdapter(IOptionBuilder):
"""Adapter implementing IOptionBuilder port using the registered builders."""
def build(self, chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
builder = get_builder(chart.chart_type)
return builder(chart, data)

View File

@ -0,0 +1,102 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def _get_bindings(bindings: list[FieldBinding], axis: str) -> list[FieldBinding]:
return [b for b in bindings if b.axis == axis]
def _aggregate(values: list[Any], aggregation: str | None) -> Any:
nums = [v for v in values if isinstance(v, (int, float))]
if not nums:
return 0
if aggregation == "sum" or aggregation is None:
return sum(nums)
if aggregation == "avg":
return sum(nums) / len(nums)
if aggregation == "count":
return len(values)
if aggregation == "max":
return max(nums)
if aggregation == "min":
return min(nums)
return sum(nums)
def build_combo_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Combo chart (bar + line on the same axes)."""
x_bind = _get_binding(chart.bindings, "x")
bar_bindings = _get_bindings(chart.bindings, "bar_y")
line_bindings = _get_bindings(chart.bindings, "line_y")
if not x_bind:
return {"series": []}
# Build categories
categories: list[str] = []
for row in data:
cat = str(row.get(x_bind.column_name, ""))
if cat not in categories:
categories.append(cat)
series: list[dict[str, Any]] = []
legend_data: list[str] = []
# Bar series
for b_bind in bar_bindings:
groups: dict[str, list[Any]] = defaultdict(list)
for row in data:
cat = str(row.get(x_bind.column_name, ""))
groups[cat].append(row.get(b_bind.column_name, 0))
legend_data.append(b_bind.column_name)
series.append(
{
"type": "bar",
"name": b_bind.column_name,
"data": [
_aggregate(groups.get(c, []), b_bind.aggregation) for c in categories
],
}
)
# Line series on secondary y-axis
for l_bind in line_bindings:
groups2: dict[str, list[Any]] = defaultdict(list)
for row in data:
cat = str(row.get(x_bind.column_name, ""))
groups2[cat].append(row.get(l_bind.column_name, 0))
legend_data.append(l_bind.column_name)
series.append(
{
"type": "line",
"name": l_bind.column_name,
"yAxisIndex": 1,
"data": [
_aggregate(groups2.get(c, []), l_bind.aggregation) for c in categories
],
}
)
y_axes: list[dict[str, Any]] = [{"type": "value", "name": "Bar"}]
if line_bindings:
y_axes.append({"type": "value", "name": "Line"})
return {
"tooltip": {"trigger": "axis"},
"legend": {"data": legend_data},
"xAxis": {"type": "category", "data": categories},
"yAxis": y_axes,
"series": series,
}

View File

@ -0,0 +1,75 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def build_heatmap_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Heatmap chart builder."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
val_bind = _get_binding(chart.bindings, "value")
if not x_bind or not y_bind or not val_bind:
return {"series": []}
x_categories: list[str] = []
y_categories: list[str] = []
cell_values: dict[tuple[str, str], list[float]] = defaultdict(list)
for row in data:
x_val = str(row.get(x_bind.column_name, ""))
y_val = str(row.get(y_bind.column_name, ""))
v = row.get(val_bind.column_name, 0)
if x_val not in x_categories:
x_categories.append(x_val)
if y_val not in y_categories:
y_categories.append(y_val)
if isinstance(v, (int, float)):
cell_values[(x_val, y_val)].append(v)
heatmap_data: list[list[Any]] = []
all_values: list[float] = []
for xi, x_cat in enumerate(x_categories):
for yi, y_cat in enumerate(y_categories):
vals = cell_values.get((x_cat, y_cat), [0])
avg = sum(vals) / len(vals) if vals else 0
heatmap_data.append([xi, yi, round(avg, 2)])
all_values.append(avg)
min_val = min(all_values) if all_values else 0
max_val = max(all_values) if all_values else 1
return {
"tooltip": {"position": "top"},
"xAxis": {"type": "category", "data": x_categories, "splitArea": {"show": True}},
"yAxis": {"type": "category", "data": y_categories, "splitArea": {"show": True}},
"visualMap": {
"min": min_val,
"max": max_val,
"calculable": True,
"orient": "horizontal",
"left": "center",
"bottom": "0%",
},
"series": [
{
"type": "heatmap",
"data": heatmap_data,
"label": {"show": True},
"emphasis": {
"itemStyle": {"shadowBlur": 10, "shadowColor": "rgba(0,0,0,0.5)"}
},
}
],
}

View File

@ -0,0 +1,116 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def _aggregate(values: list[Any], aggregation: str | None) -> Any:
nums = [v for v in values if isinstance(v, (int, float))]
if not nums:
return 0
if aggregation == "sum" or aggregation is None:
return sum(nums)
if aggregation == "avg":
return sum(nums) / len(nums)
if aggregation == "count":
return len(values)
if aggregation == "max":
return max(nums)
if aggregation == "min":
return min(nums)
return sum(nums)
def build_line_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Line chart builder."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
g_bind = _get_binding(chart.bindings, "group")
if not x_bind or not y_bind:
return {"series": []}
if g_bind:
return _build_multi_line(chart, data, x_bind, y_bind, g_bind)
groups: dict[str, list[Any]] = defaultdict(list)
categories: list[str] = []
for row in data:
key = str(row.get(x_bind.column_name, ""))
if key not in categories:
categories.append(key)
groups[key].append(row.get(y_bind.column_name, 0))
values = [_aggregate(groups[c], y_bind.aggregation) for c in categories]
return {
"tooltip": {"trigger": "axis"},
"xAxis": {"type": "category", "data": categories},
"yAxis": {"type": "value"},
"series": [
{
"type": "line",
"name": y_bind.column_name,
"data": values,
}
],
}
def _build_multi_line(
chart: ChartInstance,
data: list[dict],
x_bind: FieldBinding,
y_bind: FieldBinding,
g_bind: FieldBinding,
) -> dict[str, Any]:
groups: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
all_categories: list[str] = []
all_groups: list[str] = []
for row in data:
cat = str(row.get(x_bind.column_name, ""))
grp = str(row.get(g_bind.column_name, ""))
if cat not in all_categories:
all_categories.append(cat)
if grp not in all_groups:
all_groups.append(grp)
groups[grp][cat].append(row.get(y_bind.column_name, 0))
series = [
{
"type": "line",
"name": grp,
"data": [
_aggregate(groups[grp].get(c, []), y_bind.aggregation)
for c in all_categories
],
}
for grp in all_groups
]
return {
"tooltip": {"trigger": "axis"},
"legend": {"data": all_groups},
"xAxis": {"type": "category", "data": all_categories},
"yAxis": {"type": "value"},
"series": series,
}
def build_area_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Area chart (line with areaStyle)."""
option = build_line_option(chart, data)
for s in option.get("series", []):
s["areaStyle"] = {}
return option

View File

@ -0,0 +1,63 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def build_map_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Geo map chart builder."""
region_bind = _get_binding(chart.bindings, "region")
val_bind = _get_binding(chart.bindings, "value")
if not region_bind or not val_bind:
return {"series": []}
groups: dict[str, list[float]] = defaultdict(list)
ordered: list[str] = []
for row in data:
region = str(row.get(region_bind.column_name, ""))
val = row.get(val_bind.column_name, 0)
if region not in ordered:
ordered.append(region)
if isinstance(val, (int, float)):
groups[region].append(val)
map_data = [
{"name": r, "value": sum(groups[r]) if groups[r] else 0}
for r in ordered
]
all_values = [d["value"] for d in map_data]
min_val = min(all_values) if all_values else 0
max_val = max(all_values) if all_values else 1
return {
"tooltip": {"trigger": "item"},
"visualMap": {
"min": min_val,
"max": max_val,
"left": "left",
"top": "bottom",
"text": ["High", "Low"],
"calculable": True,
},
"series": [
{
"type": "map",
"map": "china",
"roam": True,
"data": map_data,
"emphasis": {"label": {"show": True}},
}
],
}

View File

@ -0,0 +1,80 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def _aggregate(values: list[Any], aggregation: str | None) -> Any:
nums = [v for v in values if isinstance(v, (int, float))]
if not nums:
return 0
if aggregation == "sum" or aggregation is None:
return sum(nums)
if aggregation == "avg":
return sum(nums) / len(nums)
if aggregation == "count":
return len(values)
if aggregation == "max":
return max(nums)
if aggregation == "min":
return min(nums)
return sum(nums)
def build_pie_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Standard pie chart."""
name_bind = _get_binding(chart.bindings, "name")
val_bind = _get_binding(chart.bindings, "value")
if not name_bind or not val_bind:
return {"series": []}
groups: dict[str, list[Any]] = defaultdict(list)
ordered: list[str] = []
for row in data:
key = str(row.get(name_bind.column_name, ""))
if key not in ordered:
ordered.append(key)
groups[key].append(row.get(val_bind.column_name, 0))
pie_data = [
{"name": k, "value": _aggregate(groups[k], val_bind.aggregation)}
for k in ordered
]
return {
"tooltip": {"trigger": "item"},
"legend": {"orient": "vertical", "left": "left"},
"series": [
{
"type": "pie",
"radius": "55%",
"data": pie_data,
"emphasis": {
"itemStyle": {
"shadowBlur": 10,
"shadowOffsetX": 0,
"shadowColor": "rgba(0, 0, 0, 0.5)",
}
},
}
],
}
def build_donut_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Donut chart (pie with inner radius)."""
option = build_pie_option(chart, data)
for s in option.get("series", []):
s["radius"] = ["40%", "70%"]
return option

View File

@ -0,0 +1,89 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def build_radar_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Radar chart builder."""
ind_bind = _get_binding(chart.bindings, "indicator")
val_bind = _get_binding(chart.bindings, "value")
grp_bind = _get_binding(chart.bindings, "group")
if not ind_bind or not val_bind:
return {"series": []}
# Collect indicator names and their max values
indicator_values: dict[str, list[float]] = defaultdict(list)
ordered_indicators: list[str] = []
for row in data:
ind = str(row.get(ind_bind.column_name, ""))
val = row.get(val_bind.column_name, 0)
if ind not in ordered_indicators:
ordered_indicators.append(ind)
if isinstance(val, (int, float)):
indicator_values[ind].append(val)
indicators = [
{
"name": ind,
"max": max(indicator_values[ind]) * 1.2 if indicator_values[ind] else 100,
}
for ind in ordered_indicators
]
if grp_bind:
# Multi-group radar
groups: dict[str, dict[str, float]] = defaultdict(dict)
group_order: list[str] = []
for row in data:
ind = str(row.get(ind_bind.column_name, ""))
grp = str(row.get(grp_bind.column_name, ""))
val = row.get(val_bind.column_name, 0)
if grp not in group_order:
group_order.append(grp)
groups[grp][ind] = val if isinstance(val, (int, float)) else 0
series_data = [
{
"name": grp,
"value": [groups[grp].get(ind, 0) for ind in ordered_indicators],
}
for grp in group_order
]
legend = {"data": group_order}
else:
# Single radar
series_data = [
{
"name": val_bind.column_name,
"value": [
indicator_values[ind][0] if indicator_values[ind] else 0
for ind in ordered_indicators
],
}
]
legend = {}
return {
"tooltip": {"trigger": "item"},
**( {"legend": legend} if legend else {}),
"radar": {"indicator": indicators},
"series": [
{
"type": "radar",
"data": series_data,
}
],
}

View File

@ -0,0 +1,89 @@
from __future__ import annotations
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def build_scatter_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Scatter plot builder."""
x_bind = _get_binding(chart.bindings, "x")
y_bind = _get_binding(chart.bindings, "y")
size_bind = _get_binding(chart.bindings, "size")
if not x_bind or not y_bind:
return {"series": []}
scatter_data: list[list[Any]] = []
for row in data:
x_val = row.get(x_bind.column_name, 0)
y_val = row.get(y_bind.column_name, 0)
if size_bind:
s_val = row.get(size_bind.column_name, 10)
scatter_data.append([x_val, y_val, s_val])
else:
scatter_data.append([x_val, y_val])
series_item: dict[str, Any] = {
"type": "scatter",
"data": scatter_data,
"symbolSize": 10,
}
if size_bind:
series_item["symbolSize"] = None # will use encode
series_item["encode"] = {"x": 0, "y": 1, "size": 2}
return {
"tooltip": {"trigger": "item"},
"xAxis": {"type": "value", "name": x_bind.column_name},
"yAxis": {"type": "value", "name": y_bind.column_name},
"series": [series_item],
}
def build_boston_matrix_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Boston matrix (scatter with quadrant lines)."""
option = build_scatter_option(chart, data)
label_bind = _get_binding(chart.bindings, "label")
# Add labels if binding exists
if label_bind and option.get("series"):
for i, row in enumerate(data):
label_text = str(row.get(label_bind.column_name, ""))
if i < len(option["series"][0]["data"]):
point = option["series"][0]["data"][i]
if isinstance(point, list):
point.append(label_text)
# Add quadrant markLines (median-based)
all_x = []
all_y = []
for point in option.get("series", [{}])[0].get("data", []):
if isinstance(point, list) and len(point) >= 2:
if isinstance(point[0], (int, float)):
all_x.append(point[0])
if isinstance(point[1], (int, float)):
all_y.append(point[1])
if all_x and all_y:
mid_x = sum(all_x) / len(all_x)
mid_y = sum(all_y) / len(all_y)
option["series"][0]["markLine"] = {
"silent": True,
"lineStyle": {"type": "dashed", "color": "#999"},
"data": [
{"xAxis": mid_x},
{"yAxis": mid_y},
],
}
return option

View File

@ -0,0 +1,60 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
def _get_binding(bindings: list[FieldBinding], axis: str) -> FieldBinding | None:
for b in bindings:
if b.axis == axis:
return b
return None
def build_wordcloud_option(chart: ChartInstance, data: list[dict]) -> dict[str, Any]:
"""Word cloud chart builder.
Note: echarts-wordcloud is a separate extension. We produce the
standard option shape that the extension expects.
"""
name_bind = _get_binding(chart.bindings, "name")
val_bind = _get_binding(chart.bindings, "value")
if not name_bind or not val_bind:
return {"series": []}
groups: dict[str, list[float]] = defaultdict(list)
ordered: list[str] = []
for row in data:
name = str(row.get(name_bind.column_name, ""))
val = row.get(val_bind.column_name, 0)
if name not in ordered:
ordered.append(name)
if isinstance(val, (int, float)):
groups[name].append(val)
cloud_data = [
{"name": n, "value": sum(groups[n]) if groups[n] else 0}
for n in ordered
]
return {
"tooltip": {"show": True},
"series": [
{
"type": "wordCloud",
"shape": "circle",
"sizeRange": [14, 60],
"rotationRange": [-45, 45],
"gridSize": 8,
"data": cloud_data,
"textStyle": {
"fontFamily": "sans-serif",
"fontWeight": "bold",
},
}
],
}

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,85 @@
from __future__ import annotations
import uuid
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.types import ChartType
from src.adapters.persistence.models import ChartInstanceModel
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
from src.domain.repositories.chart_repository import ChartRepository
class ChartRepositoryImpl(ChartRepository):
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def find_by_id(self, entity_id: uuid.UUID) -> Optional[ChartInstance]:
model = await self._session.get(ChartInstanceModel, entity_id)
if model is None:
return None
return _to_entity(model)
async def find_all(self) -> list[ChartInstance]:
result = await self._session.execute(
select(ChartInstanceModel).order_by(ChartInstanceModel.created_at.desc())
)
return [_to_entity(m) for m in result.scalars().all()]
async def save(self, entity: ChartInstance) -> ChartInstance:
existing = await self._session.get(ChartInstanceModel, entity.id)
if existing is None:
model = _to_model(entity)
self._session.add(model)
else:
existing.dataset_id = entity.dataset_id
existing.chart_type = entity.chart_type.value
existing.bindings = [b.to_dict() for b in entity.bindings]
existing.style = entity.style
existing.filters = entity.filters
existing.sort_config = entity.sort_config
existing.top_n = entity.top_n
existing.updated_at = entity.updated_at
await self._session.flush()
return entity
async def delete(self, entity_id: uuid.UUID) -> None:
model = await self._session.get(ChartInstanceModel, entity_id)
if model is not None:
await self._session.delete(model)
await self._session.flush()
def _to_entity(model: ChartInstanceModel) -> ChartInstance:
return ChartInstance(
id=model.id,
created_at=model.created_at,
updated_at=model.updated_at,
dataset_id=model.dataset_id,
chart_type=ChartType(model.chart_type),
bindings=[FieldBinding.from_dict(b) for b in (model.bindings or [])],
style=model.style or {},
filters=model.filters or [],
sort_config=model.sort_config,
top_n=model.top_n,
)
def _to_model(entity: ChartInstance) -> ChartInstanceModel:
return ChartInstanceModel(
id=entity.id,
dataset_id=entity.dataset_id,
chart_type=entity.chart_type.value,
bindings=[b.to_dict() for b in entity.bindings],
style=entity.style,
filters=entity.filters,
sort_config=entity.sort_config,
top_n=entity.top_n,
created_at=entity.created_at,
updated_at=entity.updated_at,
)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
import os
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/chart_db",
)
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=10,
max_overflow=20,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

View File

@ -0,0 +1,36 @@
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import DateTime, Integer, String, Uuid
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class ChartInstanceModel(Base):
__tablename__ = "chart_instances"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
dataset_id: Mapped[uuid.UUID] = mapped_column(Uuid, nullable=False, index=True)
chart_type: Mapped[str] = mapped_column(String(64), nullable=False)
bindings: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
style: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
filters: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
sort_config: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
top_n: Mapped[int | None] = mapped_column(Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,34 @@
from __future__ import annotations
from typing import Any
from src.application.dto.chart_response import ChartResponse, FieldBindingDTO
from src.domain.entities.chart_instance import ChartInstance
def present_chart(chart: ChartInstance) -> ChartResponse:
"""Convert domain entity to API response DTO."""
return ChartResponse(
id=chart.id,
dataset_id=chart.dataset_id,
chart_type=chart.chart_type.value,
bindings=[
FieldBindingDTO(
axis=b.axis,
column_name=b.column_name,
aggregation=b.aggregation,
)
for b in chart.bindings
],
style=chart.style,
filters=chart.filters,
sort_config=chart.sort_config,
top_n=chart.top_n,
created_at=chart.created_at,
updated_at=chart.updated_at,
)
def present_option(option: dict[str, Any]) -> dict[str, Any]:
"""Wrap an ECharts option for the API response."""
return {"option": option}

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,47 @@
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Any
from pydantic import BaseModel, Field
class FieldBindingDTO(BaseModel):
axis: str
column_name: str
aggregation: str | None = None
class CreateChartRequest(BaseModel):
dataset_id: uuid.UUID
chart_type: str
bindings: list[FieldBindingDTO] = Field(default_factory=list)
style: dict[str, Any] = Field(default_factory=dict)
filters: list[dict[str, Any]] = Field(default_factory=list)
sort_config: dict[str, Any] | None = None
top_n: int | None = None
class UpdateChartRequest(BaseModel):
chart_type: str | None = None
bindings: list[FieldBindingDTO] | None = None
style: dict[str, Any] | None = None
filters: list[dict[str, Any]] | None = None
sort_config: dict[str, Any] | None = None
top_n: int | None = None
class ChartResponse(BaseModel):
id: uuid.UUID
dataset_id: uuid.UUID
chart_type: str
bindings: list[FieldBindingDTO]
style: dict[str, Any]
filters: list[dict[str, Any]]
sort_config: dict[str, Any] | None
top_n: int | None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}

View File

@ -0,0 +1,13 @@
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, Field
class EChartsOptionResponse(BaseModel):
"""Wraps the generated ECharts option dict."""
option: dict[str, Any] = Field(
..., description="Complete ECharts option object ready for setOption()"
)

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from src.application.dto.chart_response import ChartResponse, CreateChartRequest
class ICreateChartUseCase(ABC):
@abstractmethod
async def execute(self, request: CreateChartRequest) -> ChartResponse: ...

View File

@ -0,0 +1,10 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from typing import Any
class IGetChartOptionUseCase(ABC):
@abstractmethod
async def execute(self, chart_id: uuid.UUID) -> dict[str, Any]: ...

View File

@ -0,0 +1,9 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
class IRecommendChartsUseCase(ABC):
@abstractmethod
async def execute(self, dataset_id: uuid.UUID) -> list[dict]: ...

View File

@ -0,0 +1,13 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from src.application.dto.chart_response import ChartResponse, UpdateChartRequest
class IUpdateChartUseCase(ABC):
@abstractmethod
async def execute(
self, chart_id: uuid.UUID, request: UpdateChartRequest
) -> ChartResponse: ...

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,14 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
class IDataServiceClient(ABC):
@abstractmethod
async def get_dataset(self, dataset_id: uuid.UUID) -> dict: ...
@abstractmethod
async def get_rows(
self, dataset_id: uuid.UUID, limit: int = 10000, offset: int = 0
) -> list[dict]: ...

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,63 @@
from __future__ import annotations
from shared.types import ChartType
from src.application.dto.chart_response import (
ChartResponse,
CreateChartRequest,
FieldBindingDTO,
)
from src.application.ports.input.create_chart import ICreateChartUseCase
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
from src.domain.repositories.chart_repository import ChartRepository
_DEFAULT_STYLE: dict = {
"backgroundColor": "#ffffff",
"textStyle": {"fontFamily": "sans-serif"},
}
class CreateChartUseCase(ICreateChartUseCase):
def __init__(self, repository: ChartRepository) -> None:
self._repository = repository
async def execute(self, request: CreateChartRequest) -> ChartResponse:
bindings = [
FieldBinding(axis=b.axis, column_name=b.column_name, aggregation=b.aggregation)
for b in request.bindings
]
style = {**_DEFAULT_STYLE, **request.style}
chart = ChartInstance(
dataset_id=request.dataset_id,
chart_type=ChartType(request.chart_type),
bindings=bindings,
style=style,
filters=request.filters,
sort_config=request.sort_config,
top_n=request.top_n,
)
saved = await self._repository.save(chart)
return _to_response(saved)
def _to_response(chart: ChartInstance) -> ChartResponse:
return ChartResponse(
id=chart.id,
dataset_id=chart.dataset_id,
chart_type=chart.chart_type.value,
bindings=[
FieldBindingDTO(axis=b.axis, column_name=b.column_name, aggregation=b.aggregation)
for b in chart.bindings
],
style=chart.style,
filters=chart.filters,
sort_config=chart.sort_config,
top_n=chart.top_n,
created_at=chart.created_at,
updated_at=chart.updated_at,
)

View File

@ -0,0 +1,89 @@
from __future__ import annotations
import uuid
from typing import Any
from shared.exceptions import EntityNotFoundError
from src.application.ports.input.get_chart_option import IGetChartOptionUseCase
from src.application.ports.output.data_service_client import IDataServiceClient
from src.domain.repositories.chart_repository import ChartRepository
from src.domain.services.option_builder import IOptionBuilder
class GetChartOptionUseCase(IGetChartOptionUseCase):
def __init__(
self,
repository: ChartRepository,
data_client: IDataServiceClient,
option_builder: IOptionBuilder,
) -> None:
self._repository = repository
self._data_client = data_client
self._option_builder = option_builder
async def execute(self, chart_id: uuid.UUID) -> dict[str, Any]:
chart = await self._repository.find_by_id(chart_id)
if chart is None:
raise EntityNotFoundError("ChartInstance", str(chart_id))
# Fetch data from data-service
limit = chart.top_n if chart.top_n else 10000
rows = await self._data_client.get_rows(chart.dataset_id, limit=limit)
# Apply client-side filters
rows = _apply_filters(rows, chart.filters)
# Apply sort
if chart.sort_config:
field = chart.sort_config.get("field")
ascending = chart.sort_config.get("ascending", True)
if field:
rows = sorted(
rows,
key=lambda r: r.get(field, ""),
reverse=not ascending,
)
# Apply top_n after sort
if chart.top_n:
rows = rows[: chart.top_n]
option = self._option_builder.build(chart, rows)
return option
def _apply_filters(rows: list[dict], filters: list[dict]) -> list[dict]:
"""Apply simple filters to rows."""
if not filters:
return rows
result = rows
for f in filters:
field = f.get("field")
op = f.get("operator", "eq")
value = f.get("value")
if not field:
continue
result = [r for r in result if _matches(r.get(field), op, value)]
return result
def _matches(cell_value: Any, op: str, filter_value: Any) -> bool:
if op == "eq":
return cell_value == filter_value
if op == "ne":
return cell_value != filter_value
if op == "gt":
return cell_value is not None and cell_value > filter_value
if op == "gte":
return cell_value is not None and cell_value >= filter_value
if op == "lt":
return cell_value is not None and cell_value < filter_value
if op == "lte":
return cell_value is not None and cell_value <= filter_value
if op == "in":
return cell_value in (filter_value or [])
if op == "contains":
return filter_value is not None and str(filter_value) in str(cell_value or "")
return True

View File

@ -0,0 +1,18 @@
from __future__ import annotations
import uuid
from src.application.ports.input.recommend_charts import IRecommendChartsUseCase
from src.application.ports.output.data_service_client import IDataServiceClient
from src.domain.services.chart_recommendation import recommend_charts
class RecommendChartsUseCase(IRecommendChartsUseCase):
def __init__(self, data_client: IDataServiceClient) -> None:
self._data_client = data_client
async def execute(self, dataset_id: uuid.UUID) -> list[dict]:
dataset = await self._data_client.get_dataset(dataset_id)
columns = dataset.get("columns", [])
data_structure = dataset.get("data_structure", "")
return recommend_charts(columns, data_structure)

View File

@ -0,0 +1,76 @@
from __future__ import annotations
import uuid
from shared.exceptions import EntityNotFoundError
from shared.types import ChartType
from src.application.dto.chart_response import (
ChartResponse,
FieldBindingDTO,
UpdateChartRequest,
)
from src.application.ports.input.update_chart import IUpdateChartUseCase
from src.domain.entities.field_binding import FieldBinding
from src.domain.repositories.chart_repository import ChartRepository
class UpdateChartUseCase(IUpdateChartUseCase):
def __init__(self, repository: ChartRepository) -> None:
self._repository = repository
async def execute(
self, chart_id: uuid.UUID, request: UpdateChartRequest
) -> ChartResponse:
chart = await self._repository.find_by_id(chart_id)
if chart is None:
raise EntityNotFoundError("ChartInstance", str(chart_id))
if request.chart_type is not None:
chart.chart_type = ChartType(request.chart_type)
chart.touch()
if request.bindings is not None:
new_bindings = [
FieldBinding(
axis=b.axis,
column_name=b.column_name,
aggregation=b.aggregation,
)
for b in request.bindings
]
chart.update_bindings(new_bindings)
if request.style is not None:
chart.update_style(request.style)
if request.filters is not None:
chart.filters = request.filters
chart.touch()
if request.sort_config is not None:
chart.sort_config = request.sort_config
chart.touch()
if request.top_n is not None:
chart.top_n = request.top_n
chart.touch()
saved = await self._repository.save(chart)
return ChartResponse(
id=saved.id,
dataset_id=saved.dataset_id,
chart_type=saved.chart_type.value,
bindings=[
FieldBindingDTO(
axis=b.axis, column_name=b.column_name, aggregation=b.aggregation
)
for b in saved.bindings
],
style=saved.style,
filters=saved.filters,
sort_config=saved.sort_config,
top_n=saved.top_n,
created_at=saved.created_at,
updated_at=saved.updated_at,
)

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,7 @@
from __future__ import annotations
from src.domain.entities.chart_instance import ChartInstance
from src.domain.entities.field_binding import FieldBinding
from src.domain.entities.style_config import StyleConfig
__all__ = ["ChartInstance", "FieldBinding", "StyleConfig"]

View File

@ -0,0 +1,48 @@
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from typing import Any
from shared.base_entity import BaseEntity
from shared.types import ChartType
from src.domain.entities.field_binding import FieldBinding
from src.domain.entities.style_config import StyleConfig
@dataclass
class ChartInstance(BaseEntity):
"""Aggregate root representing a configured chart."""
dataset_id: uuid.UUID = field(default_factory=uuid.uuid4)
chart_type: ChartType = ChartType.BAR
bindings: list[FieldBinding] = field(default_factory=list)
style: StyleConfig = field(default_factory=dict)
filters: list[dict[str, Any]] = field(default_factory=list)
sort_config: dict[str, Any] | None = None
top_n: int | None = None
# ------------------------------------------------------------------
# Domain behaviour
# ------------------------------------------------------------------
def update_bindings(self, bindings: list[FieldBinding]) -> None:
"""Replace current bindings after validation."""
errors = self.validate_bindings(bindings)
if errors:
raise ValueError("; ".join(errors))
self.bindings = bindings
self.touch()
def update_style(self, style: StyleConfig) -> None:
"""Merge new style properties into current style."""
self.style.update(style)
self.touch()
def validate_bindings(self, bindings: list[FieldBinding] | None = None) -> list[str]:
"""Return a list of validation error messages (empty means valid)."""
from src.domain.services.binding_validation import validate_bindings as _validate
target = bindings if bindings is not None else self.bindings
return _validate(self.chart_type, target)

View File

@ -0,0 +1,27 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class FieldBinding:
"""Maps a chart axis to a dataset column with optional aggregation."""
axis: str
column_name: str
aggregation: str | None = None
def to_dict(self) -> dict[str, str | None]:
return {
"axis": self.axis,
"column_name": self.column_name,
"aggregation": self.aggregation,
}
@classmethod
def from_dict(cls, data: dict[str, str | None]) -> FieldBinding:
return cls(
axis=str(data["axis"]),
column_name=str(data["column_name"]),
aggregation=data.get("aggregation"),
)

View File

@ -0,0 +1,6 @@
from __future__ import annotations
from typing import Any
StyleConfig = dict[str, Any]
"""Style configuration stored as JSONB. Keys are ECharts style properties."""

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,9 @@
from __future__ import annotations
from shared.base_repository import BaseRepository
from src.domain.entities.chart_instance import ChartInstance
class ChartRepository(BaseRepository[ChartInstance]):
"""Abstract repository for ChartInstance aggregate."""

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,105 @@
from __future__ import annotations
from shared.types import ChartType
from src.domain.entities.field_binding import FieldBinding
# Maps chart type -> { "required": [axis_names], "optional": [axis_names] }
CHART_BINDING_RULES: dict[ChartType, dict[str, list[str]]] = {
ChartType.KPI: {
"required": ["value"],
"optional": ["label", "comparison"],
},
ChartType.BAR: {
"required": ["x", "y"],
"optional": ["color"],
},
ChartType.GROUPED_BAR: {
"required": ["x", "y", "group"],
"optional": [],
},
ChartType.STACKED_BAR: {
"required": ["x", "y", "stack"],
"optional": [],
},
ChartType.HORIZONTAL_BAR: {
"required": ["x", "y"],
"optional": ["color"],
},
ChartType.LINE: {
"required": ["x", "y"],
"optional": ["group", "color"],
},
ChartType.AREA: {
"required": ["x", "y"],
"optional": ["group"],
},
ChartType.PIE: {
"required": ["name", "value"],
"optional": [],
},
ChartType.DONUT: {
"required": ["name", "value"],
"optional": [],
},
ChartType.SCATTER: {
"required": ["x", "y"],
"optional": ["size", "color"],
},
ChartType.BOSTON_MATRIX: {
"required": ["x", "y"],
"optional": ["size", "label"],
},
ChartType.RADAR: {
"required": ["indicator", "value"],
"optional": ["group"],
},
ChartType.WORDCLOUD: {
"required": ["name", "value"],
"optional": [],
},
ChartType.HEATMAP: {
"required": ["x", "y", "value"],
"optional": [],
},
ChartType.MAP: {
"required": ["region", "value"],
"optional": ["label"],
},
ChartType.COMBO: {
"required": ["x"],
"optional": ["bar_y", "line_y"],
},
ChartType.DATA_TABLE: {
"required": [],
"optional": [],
},
}
def validate_bindings(
chart_type: ChartType, bindings: list[FieldBinding]
) -> list[str]:
"""Return list of validation error messages (empty if valid)."""
rules = CHART_BINDING_RULES.get(chart_type)
if rules is None:
return [f"Unknown chart type: {chart_type}"]
errors: list[str] = []
bound_axes = {b.axis for b in bindings}
required = set(rules["required"])
allowed = required | set(rules["optional"])
missing = required - bound_axes
if missing:
errors.append(
f"Missing required binding(s) for {chart_type}: {', '.join(sorted(missing))}"
)
unexpected = bound_axes - allowed
if unexpected:
errors.append(
f"Unexpected binding axis(es) for {chart_type}: {', '.join(sorted(unexpected))}"
)
return errors

View File

@ -0,0 +1,96 @@
from __future__ import annotations
from shared.types import ChartType, DataStructureType
# Mapping from data structure -> ordered list of (chart_type, label, primary?)
_RECOMMENDATION_MAP: dict[str, list[tuple[ChartType, str, bool]]] = {
DataStructureType.TOTAL: [
(ChartType.KPI, "KPI Card", True),
],
DataStructureType.YOY_MOM: [
(ChartType.KPI, "KPI Card", True),
(ChartType.BAR, "Bar Chart", False),
(ChartType.LINE, "Line Chart", False),
],
DataStructureType.SINGLE_DIM_SINGLE_METRIC: [
(ChartType.BAR, "Bar Chart", True),
(ChartType.HORIZONTAL_BAR, "Horizontal Bar", False),
(ChartType.PIE, "Pie Chart", False),
(ChartType.DONUT, "Donut Chart", False),
(ChartType.LINE, "Line Chart", False),
],
DataStructureType.SINGLE_DIM_MULTI_METRIC: [
(ChartType.GROUPED_BAR, "Grouped Bar", True),
(ChartType.STACKED_BAR, "Stacked Bar", False),
(ChartType.RADAR, "Radar Chart", False),
(ChartType.LINE, "Line Chart", False),
(ChartType.COMBO, "Combo Chart", False),
],
DataStructureType.DUAL_DIM_SINGLE_METRIC: [
(ChartType.GROUPED_BAR, "Grouped Bar", True),
(ChartType.STACKED_BAR, "Stacked Bar", False),
(ChartType.HEATMAP, "Heatmap", False),
(ChartType.LINE, "Line Chart", False),
],
DataStructureType.DUAL_DIM_MULTI_METRIC: [
(ChartType.GROUPED_BAR, "Grouped Bar", True),
(ChartType.STACKED_BAR, "Stacked Bar", False),
(ChartType.COMBO, "Combo Chart", False),
],
DataStructureType.TIME_SERIES: [
(ChartType.LINE, "Line Chart", True),
(ChartType.AREA, "Area Chart", False),
(ChartType.BAR, "Bar Chart", False),
],
DataStructureType.GEO: [
(ChartType.MAP, "Map", True),
(ChartType.BAR, "Bar Chart", False),
],
DataStructureType.TEXT_FREQUENCY: [
(ChartType.WORDCLOUD, "Word Cloud", True),
(ChartType.BAR, "Bar Chart", False),
(ChartType.PIE, "Pie Chart", False),
],
DataStructureType.TWO_DIM_EVALUATION: [
(ChartType.SCATTER, "Scatter Plot", True),
(ChartType.BOSTON_MATRIX, "Boston Matrix", False),
],
}
def recommend_charts(
columns: list[dict], data_structure: str
) -> list[dict]:
"""Return recommended chart types with labels and primary flag.
Parameters
----------
columns:
Column metadata dicts (name, field_type, ...).
data_structure:
The detected DataStructureType value string.
Returns
-------
list of dicts with keys: chart_type, label, primary.
"""
recommendations = _RECOMMENDATION_MAP.get(data_structure, [])
# Fallback: if unknown structure, offer a generic set
if not recommendations:
recommendations = [
(ChartType.BAR, "Bar Chart", True),
(ChartType.LINE, "Line Chart", False),
(ChartType.PIE, "Pie Chart", False),
(ChartType.DATA_TABLE, "Data Table", False),
]
# Always append data-table as last option if not already there
chart_types_present = {r[0] for r in recommendations}
if ChartType.DATA_TABLE not in chart_types_present:
recommendations = [*recommendations, (ChartType.DATA_TABLE, "Data Table", False)]
return [
{"chart_type": ct.value, "label": label, "primary": primary}
for ct, label, primary in recommendations
]

View File

@ -0,0 +1,17 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
from src.domain.entities.chart_instance import ChartInstance
class IOptionBuilder(ABC):
"""Port interface for building ECharts option JSON.
Defined in domain layer as an abstract interface.
Concrete implementation lives in adapters layer (builder_factory).
"""
@abstractmethod
def build(self, chart: ChartInstance, data: list[dict]) -> dict[str, Any]: ...

View File

@ -0,0 +1,5 @@
from __future__ import annotations
from src.domain.value_objects.chart_type import ChartType
__all__ = ["ChartType"]

View File

@ -0,0 +1,5 @@
from __future__ import annotations
from shared.types import ChartType
__all__ = ["ChartType"]

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,44 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.infrastructure.api.routes import router
from src.infrastructure.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Startup
yield
# Shutdown
from src.adapters.persistence.database import engine
await engine.dispose()
def create_app() -> FastAPI:
app = FastAPI(
title="DataViz Pro - Chart Service",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(router)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok", "service": "chart-service"}
return app

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from shared.http_client import ServiceHttpClient
from src.adapters.clients.data_service_client_impl import DataServiceClientImpl
from src.adapters.option_builders.builder_factory import EChartsOptionBuilderAdapter
from src.adapters.persistence.chart_repository_impl import ChartRepositoryImpl
from src.adapters.persistence.database import get_session
from src.application.ports.output.data_service_client import IDataServiceClient
from src.application.usecases.create_chart_usecase import CreateChartUseCase
from src.application.usecases.get_chart_option_usecase import GetChartOptionUseCase
from src.application.usecases.recommend_charts_usecase import RecommendChartsUseCase
from src.application.usecases.update_chart_usecase import UpdateChartUseCase
from src.domain.repositories.chart_repository import ChartRepository
from src.domain.services.option_builder import IOptionBuilder
from src.infrastructure.config import settings
# ---- output ports ----
def get_data_service_client() -> IDataServiceClient:
http_client = ServiceHttpClient(base_url=settings.DATA_SERVICE_URL)
return DataServiceClientImpl(http_client)
def get_option_builder() -> IOptionBuilder:
return EChartsOptionBuilderAdapter()
# ---- repositories ----
async def get_chart_repository(
session: AsyncSession = Depends(get_session),
) -> ChartRepository:
return ChartRepositoryImpl(session)
# ---- use cases ----
async def get_create_chart_usecase(
repository: ChartRepository = Depends(get_chart_repository),
) -> CreateChartUseCase:
return CreateChartUseCase(repository=repository)
async def get_update_chart_usecase(
repository: ChartRepository = Depends(get_chart_repository),
) -> UpdateChartUseCase:
return UpdateChartUseCase(repository=repository)
async def get_chart_option_usecase(
repository: ChartRepository = Depends(get_chart_repository),
data_client: IDataServiceClient = Depends(get_data_service_client),
option_builder: IOptionBuilder = Depends(get_option_builder),
) -> GetChartOptionUseCase:
return GetChartOptionUseCase(
repository=repository,
data_client=data_client,
option_builder=option_builder,
)
async def get_recommend_charts_usecase(
data_client: IDataServiceClient = Depends(get_data_service_client),
) -> RecommendChartsUseCase:
return RecommendChartsUseCase(data_client=data_client)

View File

@ -0,0 +1,112 @@
from __future__ import annotations
import uuid
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from shared.exceptions import EntityNotFoundError
from src.application.dto.chart_response import (
ChartResponse,
CreateChartRequest,
UpdateChartRequest,
)
from src.application.dto.echarts_option import EChartsOptionResponse
from src.application.usecases.create_chart_usecase import CreateChartUseCase
from src.application.usecases.get_chart_option_usecase import GetChartOptionUseCase
from src.application.usecases.recommend_charts_usecase import RecommendChartsUseCase
from src.application.usecases.update_chart_usecase import UpdateChartUseCase
from src.adapters.presenters.chart_presenter import present_chart
from src.domain.repositories.chart_repository import ChartRepository
from src.infrastructure.api.dependencies import (
get_chart_option_usecase,
get_chart_repository,
get_create_chart_usecase,
get_recommend_charts_usecase,
get_update_chart_usecase,
)
router = APIRouter(prefix="/api/v1/charts", tags=["charts"])
@router.post("", response_model=ChartResponse, status_code=201)
async def create_chart(
request: CreateChartRequest,
use_case: CreateChartUseCase = Depends(get_create_chart_usecase),
) -> ChartResponse:
try:
return await use_case.execute(request)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
@router.get("", response_model=list[ChartResponse])
async def list_charts(
repository: ChartRepository = Depends(get_chart_repository),
) -> list[ChartResponse]:
charts = await repository.find_all()
return [present_chart(c) for c in charts]
@router.get("/{chart_id}", response_model=ChartResponse)
async def get_chart(
chart_id: uuid.UUID,
repository: ChartRepository = Depends(get_chart_repository),
) -> ChartResponse:
chart = await repository.find_by_id(chart_id)
if chart is None:
raise HTTPException(status_code=404, detail="Chart not found")
return present_chart(chart)
@router.put("/{chart_id}", response_model=ChartResponse)
async def update_chart(
chart_id: uuid.UUID,
request: UpdateChartRequest,
use_case: UpdateChartUseCase = Depends(get_update_chart_usecase),
) -> ChartResponse:
try:
return await use_case.execute(chart_id, request)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail="Chart not found")
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
@router.delete("/{chart_id}", status_code=204)
async def delete_chart(
chart_id: uuid.UUID,
repository: ChartRepository = Depends(get_chart_repository),
) -> None:
chart = await repository.find_by_id(chart_id)
if chart is None:
raise HTTPException(status_code=404, detail="Chart not found")
await repository.delete(chart_id)
@router.get("/{chart_id}/option", response_model=EChartsOptionResponse)
async def get_chart_option(
chart_id: uuid.UUID,
use_case: GetChartOptionUseCase = Depends(get_chart_option_usecase),
) -> EChartsOptionResponse:
try:
option = await use_case.execute(chart_id)
return EChartsOptionResponse(option=option)
except EntityNotFoundError:
raise HTTPException(status_code=404, detail="Chart not found")
from pydantic import BaseModel
class RecommendChartsRequest(BaseModel):
dataset_id: uuid.UUID
@router.post("/recommend")
async def recommend_charts(
request: RecommendChartsRequest,
use_case: RecommendChartsUseCase = Depends(get_recommend_charts_usecase),
) -> list[dict[str, Any]]:
return await use_case.execute(request.dataset_id)

View File

@ -0,0 +1,16 @@
from __future__ import annotations
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/chart_db"
DATA_SERVICE_URL: str = "http://localhost:8000"
APP_HOST: str = "0.0.0.0"
APP_PORT: int = 8000
CORS_ORIGINS: list[str] = ["*"]
model_config = {"env_prefix": "", "case_sensitive": True}
settings = Settings()

View File

@ -0,0 +1,16 @@
from __future__ import annotations
import uvicorn
from src.infrastructure.api.app import create_app
from src.infrastructure.config import settings
app = create_app()
if __name__ == "__main__":
uvicorn.run(
"src.infrastructure.main:app",
host=settings.APP_HOST,
port=settings.APP_PORT,
reload=True,
)

View File

@ -0,0 +1,29 @@
FROM python:3.12-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
# Copy shared library
COPY shared/ /app/shared/
# Copy service code
COPY services/data-service/pyproject.toml /app/
COPY services/data-service/src/ /app/src/
COPY services/data-service/alembic/ /app/alembic/
COPY services/data-service/alembic.ini /app/alembic.ini
# Install dependencies
RUN pip install --no-cache-dir poetry && \
poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi --no-root
# Set PYTHONPATH to include shared
ENV PYTHONPATH=/app:/app/src
EXPOSE 8000
CMD ["python", "-m", "src.infrastructure.main"]

View File

@ -0,0 +1,37 @@
[alembic]
script_location = .
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,69 @@
from __future__ import annotations
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from src.adapters.persistence.models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def get_database_url() -> str:
return os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/data_db",
)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = get_database_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection): # noqa: ANN001
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode using an async engine."""
connectable = create_async_engine(
get_database_url(),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,18 @@
[tool.poetry]
name = "data-service"
version = "0.1.0"
description = "DataViz Pro Data Service"
[tool.poetry.dependencies]
python = "^3.12"
fastapi = "^0.115"
uvicorn = {extras = ["standard"], version = "^0.34"}
sqlalchemy = {extras = ["asyncio"], version = "^2.0"}
asyncpg = "^0.30"
alembic = "^1.14"
openpyxl = "^3.1"
xlrd = "^2.0"
pandas = "^2.2"
python-multipart = "^0.0.18"
pydantic = "^2.10"
pydantic-settings = "^2.7"

View File

@ -0,0 +1,29 @@
from __future__ import annotations
import io
from typing import Any
import pandas as pd
from src.application.ports.output.file_parser import ParsedSheet
class CsvParser:
"""Parse .csv files using pandas."""
async def parse(self, file_content: bytes) -> list[ParsedSheet]:
# Try common encodings
for encoding in ("utf-8", "utf-8-sig", "gbk", "gb2312", "latin-1"):
try:
text = file_content.decode(encoding)
break
except (UnicodeDecodeError, LookupError):
continue
else:
text = file_content.decode("utf-8", errors="replace")
df = pd.read_csv(io.StringIO(text), dtype=str, keep_default_na=False)
columns = [str(c).strip() for c in df.columns.tolist()]
rows: list[dict[str, Any]] = df.to_dict(orient="records")
return [ParsedSheet(sheet_name="Sheet1", columns=columns, rows=rows)]

View File

@ -0,0 +1,36 @@
from __future__ import annotations
import json
from typing import Any
from src.application.ports.output.file_parser import ParsedSheet
class JsonParser:
"""Parse .json files (expects an array of objects or a single object)."""
async def parse(self, file_content: bytes) -> list[ParsedSheet]:
data = json.loads(file_content.decode("utf-8"))
if isinstance(data, dict):
data = [data]
if not isinstance(data, list) or not data:
raise ValueError("JSON must be an array of objects or a single object")
# Collect all unique keys in order of appearance
columns: list[str] = []
seen: set[str] = set()
for record in data:
if isinstance(record, dict):
for key in record:
if key not in seen:
columns.append(str(key))
seen.add(key)
rows: list[dict[str, Any]] = []
for record in data:
if isinstance(record, dict):
rows.append({col: record.get(col) for col in columns})
return [ParsedSheet(sheet_name="Sheet1", columns=columns, rows=rows)]

View File

@ -0,0 +1,40 @@
from __future__ import annotations
from shared.exceptions import FileParsingError
from src.application.ports.output.file_parser import IFileParser, ParsedSheet
from .csv_parser import CsvParser
from .json_parser import JsonParser
from .xlsx_parser import XlsParser, XlsxParser
class ParserFactory(IFileParser):
"""Dispatch file parsing to the correct adapter based on file extension."""
def __init__(self) -> None:
self._xlsx_parser = XlsxParser()
self._xls_parser = XlsParser()
self._csv_parser = CsvParser()
self._json_parser = JsonParser()
async def parse(self, file_name: str, file_content: bytes) -> list[ParsedSheet]:
ext = file_name.rsplit(".", maxsplit=1)[-1].lower() if "." in file_name else ""
try:
if ext == "xlsx":
return await self._xlsx_parser.parse(file_content)
if ext == "xls":
return await self._xls_parser.parse(file_content)
if ext == "csv":
return await self._csv_parser.parse(file_content)
if ext == "json":
return await self._json_parser.parse(file_content)
except FileParsingError:
raise
except Exception as exc:
raise FileParsingError(f"Failed to parse file '{file_name}': {exc}") from exc
raise FileParsingError(
f"Unsupported file format: '.{ext}'. Supported: xlsx, xls, csv, json"
)

View File

@ -0,0 +1,74 @@
from __future__ import annotations
import io
from typing import Any
from openpyxl import load_workbook
from src.application.ports.output.file_parser import ParsedSheet
class XlsxParser:
"""Parse .xlsx files using openpyxl."""
async def parse(self, file_content: bytes) -> list[ParsedSheet]:
wb = load_workbook(filename=io.BytesIO(file_content), read_only=True, data_only=True)
sheets: list[ParsedSheet] = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
rows_iter = ws.iter_rows(values_only=True)
# First row = headers
header_row = next(rows_iter, None)
if header_row is None:
continue
columns = [str(h).strip() if h is not None else f"column_{i}" for i, h in enumerate(header_row)]
rows: list[dict[str, Any]] = []
for row in rows_iter:
if all(cell is None for cell in row):
continue
row_dict: dict[str, Any] = {}
for idx, cell_value in enumerate(row):
if idx < len(columns):
row_dict[columns[idx]] = cell_value
rows.append(row_dict)
sheets.append(ParsedSheet(sheet_name=sheet_name, columns=columns, rows=rows))
wb.close()
return sheets
class XlsParser:
"""Parse .xls files using xlrd."""
async def parse(self, file_content: bytes) -> list[ParsedSheet]:
import xlrd
wb = xlrd.open_workbook(file_contents=file_content)
sheets: list[ParsedSheet] = []
for sheet_idx in range(wb.nsheets):
ws = wb.sheet_by_index(sheet_idx)
if ws.nrows == 0:
continue
columns = [
str(ws.cell_value(0, col)).strip() or f"column_{col}"
for col in range(ws.ncols)
]
rows: list[dict[str, Any]] = []
for row_idx in range(1, ws.nrows):
row_dict: dict[str, Any] = {}
for col_idx in range(ws.ncols):
if col_idx < len(columns):
row_dict[columns[col_idx]] = ws.cell_value(row_idx, col_idx)
rows.append(row_dict)
sheets.append(ParsedSheet(sheet_name=ws.name, columns=columns, rows=rows))
return sheets

View File

@ -0,0 +1,38 @@
from __future__ import annotations
import os
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/data_db",
)
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=10,
max_overflow=20,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

View File

@ -0,0 +1,133 @@
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.types import DataStructureType, FieldType
from src.domain.entities.column import Column
from src.domain.entities.dataset import DataSet
from src.domain.repositories.dataset_repository import DataSetRepository
from .models import ColumnModel, DataSetModel
class DataSetRepositoryImpl(DataSetRepository):
def __init__(self, session: AsyncSession) -> None:
self._session = session
# ---- mapping helpers ----
@staticmethod
def _to_domain(model: DataSetModel) -> DataSet:
columns = [
Column(
name=cm.name,
field_type=FieldType(cm.field_type),
sample_values=cm.sample_values or [],
ordinal=cm.ordinal,
)
for cm in model.columns_rel
]
return DataSet(
id=model.id,
created_at=model.created_at,
updated_at=model.updated_at,
file_name=model.file_name,
sheet_name=model.sheet_name,
columns=columns,
row_count=model.row_count,
data_structure=(
DataStructureType(model.data_structure)
if model.data_structure
else None
),
raw_data=model.raw_data or [],
)
@staticmethod
def _to_model(entity: DataSet) -> DataSetModel:
model = DataSetModel(
id=entity.id,
file_name=entity.file_name,
sheet_name=entity.sheet_name,
row_count=entity.row_count,
data_structure=(
entity.data_structure.value if entity.data_structure else None
),
raw_data=entity.raw_data,
created_at=entity.created_at,
updated_at=entity.updated_at,
)
model.columns_rel = [
ColumnModel(
id=uuid.uuid4(),
dataset_id=entity.id,
name=col.name,
field_type=col.field_type.value,
sample_values=col.sample_values,
ordinal=col.ordinal,
)
for col in entity.columns
]
return model
# ---- repository interface ----
async def find_by_id(self, entity_id: uuid.UUID) -> Optional[DataSet]:
stmt = select(DataSetModel).where(DataSetModel.id == entity_id)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()
if model is None:
return None
return self._to_domain(model)
async def find_all(self) -> list[DataSet]:
stmt = select(DataSetModel).order_by(DataSetModel.created_at.desc())
result = await self._session.execute(stmt)
models = result.scalars().all()
return [self._to_domain(m) for m in models]
async def save(self, entity: DataSet) -> DataSet:
entity.touch()
existing = await self._session.get(DataSetModel, entity.id)
if existing is not None:
existing.file_name = entity.file_name
existing.sheet_name = entity.sheet_name
existing.row_count = entity.row_count
existing.data_structure = (
entity.data_structure.value if entity.data_structure else None
)
existing.raw_data = entity.raw_data
existing.updated_at = entity.updated_at
# Replace columns
existing.columns_rel.clear()
for col in entity.columns:
existing.columns_rel.append(
ColumnModel(
id=uuid.uuid4(),
dataset_id=entity.id,
name=col.name,
field_type=col.field_type.value,
sample_values=col.sample_values,
ordinal=col.ordinal,
)
)
await self._session.flush()
return entity
model = self._to_model(entity)
self._session.add(model)
await self._session.flush()
return entity
async def delete(self, entity_id: uuid.UUID) -> None:
model = await self._session.get(DataSetModel, entity_id)
if model is not None:
await self._session.delete(model)
await self._session.flush()

View File

@ -0,0 +1,66 @@
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import (
DateTime,
ForeignKey,
Integer,
String,
Text,
Uuid,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class DataSetModel(Base):
__tablename__ = "datasets"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
file_name: Mapped[str] = mapped_column(String(255), nullable=False)
sheet_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
row_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
data_structure: Mapped[str | None] = mapped_column(String(64), nullable=True)
raw_data: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
columns_rel: Mapped[list[ColumnModel]] = relationship(
"ColumnModel",
back_populates="dataset",
cascade="all, delete-orphan",
order_by="ColumnModel.ordinal",
lazy="selectin",
)
class ColumnModel(Base):
__tablename__ = "columns"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
dataset_id: Mapped[uuid.UUID] = mapped_column(
Uuid, ForeignKey("datasets.id", ondelete="CASCADE"), nullable=False
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
field_type: Mapped[str] = mapped_column(String(32), nullable=False, default="text")
sample_values: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
ordinal: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
dataset: Mapped[DataSetModel] = relationship(
"DataSetModel", back_populates="columns_rel"
)

View File

@ -0,0 +1,31 @@
from __future__ import annotations
from src.application.dto.dataset_response import ColumnInfo, DataSetResponse
from src.domain.entities.dataset import DataSet
class DataSetPresenter:
"""Maps DataSet domain entities to API response DTOs."""
@staticmethod
def to_response(entity: DataSet) -> DataSetResponse:
return DataSetResponse(
id=str(entity.id),
file_name=entity.file_name,
sheet_name=entity.sheet_name,
columns=[
ColumnInfo(
name=col.name,
field_type=col.field_type.value,
sample_values=col.sample_values,
ordinal=col.ordinal,
)
for col in entity.columns
],
row_count=entity.row_count,
data_structure=(
entity.data_structure.value if entity.data_structure else None
),
created_at=entity.created_at,
updated_at=entity.updated_at,
)

View File

@ -0,0 +1,6 @@
from __future__ import annotations
from .dataset_response import ColumnInfo, DataSetResponse
from .import_result import ImportResult
__all__ = ["ColumnInfo", "DataSetResponse", "ImportResult"]

View File

@ -0,0 +1,24 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from pydantic import BaseModel
class ColumnInfo(BaseModel):
name: str
field_type: str
sample_values: list[Any] = []
ordinal: int
class DataSetResponse(BaseModel):
id: str
file_name: str
sheet_name: str | None = None
columns: list[ColumnInfo]
row_count: int
data_structure: str | None = None
created_at: datetime
updated_at: datetime

View File

@ -0,0 +1,21 @@
from __future__ import annotations
from typing import Any
from pydantic import BaseModel
class ImportColumnInfo(BaseModel):
name: str
field_type: str
ordinal: int
class ImportResult(BaseModel):
dataset_id: str
file_name: str
sheet_name: str | None = None
row_count: int
columns: list[ImportColumnInfo]
data_structure: str | None = None
suggestions: list[str] = []

View File

@ -0,0 +1,7 @@
from __future__ import annotations
from .get_dataset import IGetDataSetUseCase
from .import_data import IImportDataUseCase
from .list_datasets import IListDataSetsUseCase
__all__ = ["IGetDataSetUseCase", "IImportDataUseCase", "IListDataSetsUseCase"]

View File

@ -0,0 +1,11 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from src.application.dto.dataset_response import DataSetResponse
class IGetDataSetUseCase(ABC):
@abstractmethod
async def execute(self, dataset_id: uuid.UUID) -> DataSetResponse: ...

View File

@ -0,0 +1,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from src.application.dto.import_result import ImportResult
class IImportDataUseCase(ABC):
@abstractmethod
async def execute(self, file_name: str, file_content: bytes) -> list[ImportResult]: ...

View File

@ -0,0 +1,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from src.application.dto.dataset_response import DataSetResponse
class IListDataSetsUseCase(ABC):
@abstractmethod
async def execute(self) -> list[DataSetResponse]: ...

View File

@ -0,0 +1,5 @@
from __future__ import annotations
from .file_parser import IFileParser, ParsedSheet
__all__ = ["IFileParser", "ParsedSheet"]

View File

@ -0,0 +1,17 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class ParsedSheet:
sheet_name: str
columns: list[str]
rows: list[dict[str, Any]]
class IFileParser(ABC):
@abstractmethod
async def parse(self, file_name: str, file_content: bytes) -> list[ParsedSheet]: ...

View File

@ -0,0 +1,13 @@
from __future__ import annotations
from .delete_dataset_usecase import DeleteDataSetUseCase
from .get_dataset_usecase import GetDataSetUseCase
from .import_data_usecase import ImportDataUseCase
from .list_datasets_usecase import ListDataSetsUseCase
__all__ = [
"DeleteDataSetUseCase",
"GetDataSetUseCase",
"ImportDataUseCase",
"ListDataSetsUseCase",
]

View File

@ -0,0 +1,18 @@
from __future__ import annotations
import uuid
from shared.exceptions import EntityNotFoundError
from src.domain.repositories.dataset_repository import DataSetRepository
class DeleteDataSetUseCase:
def __init__(self, repository: DataSetRepository) -> None:
self._repository = repository
async def execute(self, dataset_id: uuid.UUID) -> None:
existing = await self._repository.find_by_id(dataset_id)
if existing is None:
raise EntityNotFoundError("DataSet", str(dataset_id))
await self._repository.delete(dataset_id)

View File

@ -0,0 +1,39 @@
from __future__ import annotations
import uuid
from shared.exceptions import EntityNotFoundError
from src.application.dto.dataset_response import ColumnInfo, DataSetResponse
from src.application.ports.input.get_dataset import IGetDataSetUseCase
from src.domain.repositories.dataset_repository import DataSetRepository
class GetDataSetUseCase(IGetDataSetUseCase):
def __init__(self, repository: DataSetRepository) -> None:
self._repository = repository
async def execute(self, dataset_id: uuid.UUID) -> DataSetResponse:
dataset = await self._repository.find_by_id(dataset_id)
if dataset is None:
raise EntityNotFoundError("DataSet", str(dataset_id))
return DataSetResponse(
id=str(dataset.id),
file_name=dataset.file_name,
sheet_name=dataset.sheet_name,
columns=[
ColumnInfo(
name=col.name,
field_type=col.field_type.value,
sample_values=col.sample_values,
ordinal=col.ordinal,
)
for col in dataset.columns
],
row_count=dataset.row_count,
data_structure=(
dataset.data_structure.value if dataset.data_structure else None
),
created_at=dataset.created_at,
updated_at=dataset.updated_at,
)

Some files were not shown because too many files have changed in this diff Show More