-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathsqlalchemy_.py
More file actions
30 lines (23 loc) · 985 Bytes
/
sqlalchemy_.py
File metadata and controls
30 lines (23 loc) · 985 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from __future__ import annotations
from typing import Any
from ...core.exceptions import IntegrationMissingError
from ...core.ports import DBPort
try:
from sqlalchemy import create_engine, text as sa_text
from sqlalchemy.engine import Engine
except ImportError:
create_engine = None # type: ignore[assignment]
sa_text = None # type: ignore[assignment]
Engine = None # type: ignore[assignment,misc]
class SQLAlchemyDB(DBPort):
"""DBPort implementation backed by SQLAlchemy 2.x."""
def __init__(self, url: str) -> None:
if create_engine is None:
raise IntegrationMissingError(
"sqlalchemy", extra="sqlalchemy", hint="pip install sqlalchemy"
)
self._engine: Engine = create_engine(url)
def execute(self, sql: str) -> list[dict[str, Any]]:
with self._engine.connect() as conn:
result = conn.execute(sa_text(sql))
return [dict(row._mapping) for row in result]