-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.py
More file actions
576 lines (518 loc) · 24.9 KB
/
database.py
File metadata and controls
576 lines (518 loc) · 24.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# ruff: noqa: S608
import gc
import platform
from datetime import datetime
from typing import Optional
import duckdb
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
from tqdm.auto import tqdm
from biasanalyzer.models import CohortDefinition
from biasanalyzer.sql import (
AGE_DISTRIBUTION_QUERY,
AGE_STATS_QUERY,
ETHNICITY_STATS_QUERY,
GENDER_DISTRIBUTION_QUERY,
GENDER_STATS_QUERY,
RACE_STATS_QUERY,
)
from biasanalyzer.utils import build_concept_hierarchy, find_roots, notify_users, print_hierarchy
class BiasDatabase:
distribution_queries = {
"age": AGE_DISTRIBUTION_QUERY,
"gender": GENDER_DISTRIBUTION_QUERY,
}
stats_queries = {
"age": AGE_STATS_QUERY,
"gender": GENDER_STATS_QUERY,
"race": RACE_STATS_QUERY,
"ethnicity": ETHNICITY_STATS_QUERY,
}
_instance = None # indicating a singleton with only one instance of the class ever created
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize(*args, **kwargs) # Initialize only once
return cls._instance
def _safe_attach(self, alias: str, url: str, type_clause: str = ""):
try:
self.conn.execute(f"DETACH DATABASE {alias}")
except (duckdb.BinderException, duckdb.CatalogException):
pass
if type_clause: # attaching to postgreSQL OMOP DB
self.conn.execute(f"ATTACH '{url}' AS {alias} {type_clause}")
else: # attaching to duckdb OMOP DB - adding READ_ONLY is critical to get it working on Windows
self.conn.execute(f"ATTACH '{url}' AS {alias} (READ_ONLY)")
def _initialize(self, db_url, omop_db_url=None):
# by default, duckdb uses in memory database
self.conn = duckdb.connect(db_url)
self.schema = "biasanalyzer"
self.omop_alias = "omop"
self.conn.execute(f"CREATE SCHEMA IF NOT EXISTS {self.schema}")
self.omop_cdm_db_url = omop_db_url
if omop_db_url is not None:
if omop_db_url.startswith("postgresql://"):
# omop db is postgreSQL
self.load_postgres_extension()
self._safe_attach(self.omop_alias, self.omop_cdm_db_url, "(TYPE postgres)")
elif omop_db_url.endswith(".duckdb"):
self._safe_attach(self.omop_alias, self.omop_cdm_db_url)
else:
raise ValueError("Unsupported OMOP database backend")
# Set self.schema as default schema
self.conn.execute(f"SET schema '{self.schema}'")
self._create_cohort_definition_table()
self._create_cohort_table()
def _create_cohort_definition_table(self):
try:
self.conn.execute(f"CREATE SEQUENCE {self.schema}.id_sequence START 1")
except duckdb.Error as e:
if "already exists" in str(e).lower():
notify_users("Sequence already exists, skipping creation.")
else:
raise
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.schema}.cohort_definition (
id INTEGER DEFAULT nextval('{self.schema}.id_sequence'),
name VARCHAR NOT NULL,
description VARCHAR,
created_date DATE,
creation_info VARCHAR,
created_by VARCHAR,
PRIMARY KEY (id)
)
""")
notify_users("Cohort Definition table created.")
def _create_cohort_table(self):
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.schema}.cohort (
subject_id VARCHAR NOT NULL,
cohort_definition_id INTEGER,
cohort_start_date DATE,
cohort_end_date DATE,
FOREIGN KEY (cohort_definition_id) REFERENCES {self.schema}.cohort_definition(id)
)
""")
try:
self.conn.execute(f"""
CREATE INDEX idx_cohort_dates ON {self.schema}.cohort (cohort_definition_id, cohort_start_date,
cohort_end_date);
""")
except duckdb.Error as e:
if "already exists" in str(e).lower():
notify_users("Index already exists, skipping creation.")
else:
raise
notify_users("Cohort table created.")
def load_postgres_extension(self):
self.conn.execute("INSTALL postgres;")
self.conn.execute("LOAD postgres;")
def create_cohort_definition(self, cohort_definition: CohortDefinition, progress_obj=None):
self.conn.execute(
f"""
INSERT INTO {self.schema}.cohort_definition (name, description, created_date, creation_info, created_by)
VALUES (?, ?, ?, ?, ?)
""",
(
cohort_definition.name,
cohort_definition.description,
cohort_definition.created_date or datetime.now(),
cohort_definition.creation_info,
cohort_definition.created_by,
),
)
if progress_obj is None:
notify_users("Cohort definition inserted successfully.") # pragma: no cover
else:
progress_obj.write("Cohort definition inserted successfully.")
self.conn.execute(f"SELECT id from {self.schema}.cohort_definition ORDER BY id DESC LIMIT 1")
created_cohort_id = self.conn.fetchone()[0]
return created_cohort_id
# Method to insert cohort data in bulk from a dataframe
def create_cohort_in_bulk(self, cohort_df: pd.DataFrame):
# make duckdb to treat cohort_df dataframe as a virtual table named "cohort_df"
self.conn.register("cohort_df", cohort_df)
self.conn.execute(f"""
INSERT INTO {self.schema}.cohort (subject_id, cohort_definition_id, cohort_start_date, cohort_end_date)
SELECT subject_id, cohort_definition_id, cohort_start_date, cohort_end_date FROM cohort_df
""")
def get_cohort_definition(self, cohort_definition_id):
results = self.conn.execute(f"""
SELECT id, name, description, created_date, creation_info, created_by FROM {self.schema}.cohort_definition
WHERE id = {cohort_definition_id}
""")
headers = [desc[0] for desc in results.description]
row = results.fetchall()
if len(row) == 0:
return {}
else:
return dict(zip(headers, row[0]))
def get_cohort(self, cohort_definition_id):
results = self.conn.execute(f"""
SELECT subject_id, cohort_definition_id, cohort_start_date, cohort_end_date FROM {self.schema}.cohort
WHERE cohort_definition_id = {cohort_definition_id}
""")
headers = [desc[0] for desc in results.description]
rows = results.fetchall()
return [dict(zip(headers, row)) for row in rows]
def _execute_query(self, query_str):
results = self.conn.execute(query_str)
headers = [desc[0] for desc in results.description]
rows = results.fetchall()
return [dict(zip(headers, row)) for row in rows]
def get_cohort_basic_stats(self, cohort_definition_id: int, variable=""):
"""
Get aggregation statistics for a cohort from the cohort table.
:param cohort_definition_id: cohort definition id representing the cohort
:param variable: optional with an empty string as default. If empty, basic stats of
the cohort are returned; If set to a specific variable such as age, gender, race,
the stats of the specified variable in the cohort are returned
:return: cohort stats corresponding to the specified variable
"""
try:
if variable:
query_str = self.__class__.stats_queries.get(variable)
if query_str is None:
raise ValueError(
f"Statistics for variable '{variable}' is not available. "
f"Valid variables are {self.__class__.stats_queries.keys()}"
)
stats_query = query_str.format(
ba_schema=self.schema, omop=self.omop_alias, cohort_definition_id=cohort_definition_id
)
else:
# Query the cohort data to get basic statistics
stats_query = f"""
WITH cohort_Duration AS (
SELECT
subject_id,
cohort_start_date,
cohort_end_date,
cohort_end_date - cohort_start_date AS duration_days
FROM
{self.schema}.cohort
WHERE cohort_definition_id = {cohort_definition_id}
)
SELECT
COUNT(*) AS total_count,
MIN(cohort_start_date) AS earliest_start_date,
MAX(cohort_start_date) AS latest_start_date,
MIN(cohort_end_date) AS earliest_end_date,
MAX(cohort_end_date) AS latest_end_date,
MIN(duration_days) AS min_duration_days,
MAX(duration_days) AS max_duration_days,
ROUND(AVG(duration_days), 2) AS avg_duration_days,
CAST(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY duration_days) AS INT) AS median_duration,
ROUND(STDDEV(duration_days), 2) AS stddev_duration
FROM cohort_Duration
"""
return self._execute_query(stats_query)
except Exception as e:
notify_users(f"Error computing cohort basic statistics: {e}", level="error")
return None
@property
def cohort_distribution_variables(self):
return self.__class__.distribution_queries.keys()
def get_cohort_distributions(self, cohort_definition_id: int, variable: str):
"""
Get distribution statistics for a cohort from the cohort table.
"""
try:
query_str = self.__class__.distribution_queries.get(variable)
if not query_str:
raise ValueError(
f"Distribution for variable '{variable}' is not available. "
f"Valid variables are {self.__class__.distribution_queries.keys()}"
)
query = query_str.format(
ba_schema=self.schema, omop=self.omop_alias, cohort_definition_id=cohort_definition_id
)
return self._execute_query(query)
except Exception as e:
notify_users(f"Error computing cohort {variable} distributions: {e}", level="error")
return None
def get_cohort_concept_stats(
self,
cohort_definition_id: int,
qry_builder,
concept_type="condition_occurrence",
filter_count=0,
vocab=None,
print_concept_hierarchy=False,
):
"""
Get concept statistics for a cohort from the cohort table.
"""
concept_stats = {}
try:
# validate input vocab if it is not None
if vocab is not None:
valid_vocabs = self._execute_query(f"SELECT distinct vocabulary_id FROM {self.omop_alias}.concept")
valid_vocab_ids = [row["vocabulary_id"] for row in valid_vocabs]
if vocab not in valid_vocab_ids:
err_msg = (
f"input {vocab} is not a valid vocabulary in OMOP. "
f"Supported vocabulary ids are: {valid_vocab_ids}"
)
notify_users(err_msg, level="error")
raise ValueError(err_msg)
query = qry_builder.build_concept_prevalence_query(
self.schema, self.omop_alias, concept_type, cohort_definition_id, filter_count, vocab
)
concept_stats[concept_type] = self._execute_query(query)
cs_df = pd.DataFrame(concept_stats[concept_type])
if not cs_df.empty:
# Combine concept_name and prevalence into a "details" column
cs_df["details"] = cs_df.apply(
lambda row: f"{row['concept_name']} (Code: {row['concept_code']}, "
f"Count: {row['count_in_cohort']}, Prevalence: {row['prevalence']:.3%})",
axis=1,
)
if print_concept_hierarchy:
filtered_cs_df = cs_df[cs_df["ancestor_concept_id"] != cs_df["descendant_concept_id"]]
roots = find_roots(filtered_cs_df)
hierarchy = build_concept_hierarchy(filtered_cs_df)
notify_users(f"cohort concept hierarchy for {concept_type} with root concept ids {roots}:")
for root in roots:
root_detail = cs_df[
(cs_df["ancestor_concept_id"] == root) & (cs_df["descendant_concept_id"] == root)
]["details"].iloc[0]
print_hierarchy(hierarchy, parent=root, level=0, parent_details=root_detail)
return concept_stats
except Exception as e:
raise ValueError("Error computing cohort concept stats") from e
def close(self):
if self.conn:
self.conn.close()
BiasDatabase._instance = None
notify_users("Connection to BiasDatabase closed.")
class OMOPCDMDatabase:
_instance = None # indicating a singleton with only one instance of the class ever created
_database_type = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize(*args, **kwargs) # Initialize only once
return cls._instance
def _initialize(self, db_url):
if db_url.endswith(".duckdb"):
# close any potential global connections if any
for obj in gc.get_objects(): # pragma: no cover
if isinstance(obj, duckdb.DuckDBPyConnection):
try:
obj.close()
except Exception as e:
notify_users(f"failed to close the lingering duckdb connection before opening a new one: {e}")
# Handle DuckDB connection
try:
if platform.system().lower() == "windows": # pragma: no cover
# it is critical to set duckdb connection to be read-only on windows platform
self.engine = duckdb.connect(db_url, read_only=True)
else:
self.engine = duckdb.connect(db_url)
notify_users(f"Connected to the DuckDB database: {db_url}.")
except duckdb.Error as e: # pragma: no cover
notify_users(f"Failed to connect to DuckDB: {e}", level="error")
self.Session = self.engine # Use engine directly for DuckDB
self._database_type = "duckdb"
else: # pragma: no cover
# Handle PostgreSQL connection
try:
self.engine = create_engine(
db_url,
echo=False,
connect_args={"options": "-c default_transaction_read_only=on"}, # Enforce read-only transactions
)
self.Session = sessionmaker(bind=self.engine)
notify_users("Connected to the OMOP CDM database (read-only).")
self._database_type = "postgresql"
except SQLAlchemyError as e:
notify_users(f"Failed to connect to the database: {e}", level="error")
def get_session(self):
if self._database_type == "duckdb":
return self.engine
else: # pragma: no cover
# postgresql connection: provide a new session for read-only queries
return self.Session()
def execute_query(self, query, params=None):
try:
if self._database_type == "duckdb":
# DuckDB query execution
results = self.engine.execute(query, params).fetchall()
headers = [desc[0] for desc in self.engine.execute(query, params).description]
else: # pragma: no cover
# PostgreSQL query execution
omop_session = self.get_session()
query = text(query)
results = omop_session.execute(query, params) if params else omop_session.execute(query)
headers = results.keys()
results = results.fetchall()
omop_session.close()
return [dict(zip(headers, row)) for row in results]
except duckdb.Error as e:
notify_users(f"Error executing query: {e}", level="error")
return []
except SQLAlchemyError as e: # pragma: no cover
notify_users(f"Error executing query: {e}", level="error")
if omop_session:
omop_session.close()
return []
def get_domains_and_vocabularies(self) -> list:
# find a concept ID based on a search term
query = """
SELECT distinct domain_id, vocabulary_id FROM concept order by domain_id, vocabulary_id
"""
return self.execute_query(query)
def get_concepts(self, search_term: str, domain: Optional[str], vocab: Optional[str]) -> list:
search_term_exact = search_term.lower()
search_term_suffix = f"{search_term_exact} "
search_term_prefix = f" {search_term_exact}"
search_term_prefix_suffix = f" {search_term_exact} "
if self._database_type == "duckdb":
# Use positional parameters and ? as placeholder to meet duckdb syntax requirement
base_query = """
SELECT concept_id, concept_name, valid_start_date, valid_end_date, domain_id, vocabulary_id \
FROM concept
WHERE {condition_str} \
AND (
LOWER (concept_name) = ? \
OR
LOWER (concept_name) LIKE '%' || ? \
OR
LOWER (concept_name) LIKE ? || '%' \
OR
LOWER (concept_name) LIKE '%' || ? || '%'
)
ORDER BY concept_id \
"""
if domain is not None and vocab is not None:
condition_str = "domain_id = ? AND vocabulary_id = ?"
params = [
domain,
vocab,
search_term_exact,
search_term_prefix,
search_term_suffix,
search_term_prefix_suffix,
]
elif domain is None:
condition_str = "vocabulary_id = ?"
params = [vocab, search_term_exact, search_term_prefix, search_term_suffix, search_term_prefix_suffix]
else:
condition_str = "domain_id = ?"
params = [domain, search_term_exact, search_term_prefix, search_term_suffix, search_term_prefix_suffix]
else: # pragma: no cover
# Use named parameters with :param_name syntax for SQLAlchemy/PostgreSQL
base_query = """
SELECT concept_id, concept_name, valid_start_date, valid_end_date, domain_id, vocabulary_id \
FROM concept
WHERE {condition_str} \
AND (
LOWER (concept_name) = :search_term_exact \
OR
LOWER (concept_name) LIKE '%' || :search_term_prefix \
OR
LOWER (concept_name) LIKE :search_term_suffix || '%' \
OR
LOWER (concept_name) LIKE '%' || :search_term_prefix_suffix || '%'
)
ORDER BY concept_id \
"""
params = {
"search_term_exact": search_term_exact,
"search_term_prefix": search_term_prefix,
"search_term_suffix": search_term_suffix,
"search_term_prefix_suffix": search_term_prefix_suffix,
}
if domain is not None and vocab is not None:
condition_str = "domain_id = :domain AND vocabulary_id = :vocabulary"
params["domain"] = domain
params["vocabulary"] = vocab
elif domain is None:
condition_str = "vocabulary_id = :vocabulary"
params["vocabulary"] = vocab
else:
condition_str = "domain_id = :domain"
params["domain"] = domain
query = base_query.format(condition_str=condition_str)
return self.execute_query(query, params=params)
def get_concept_hierarchy(self, concept_id: int):
"""
Retrieves the full concept hierarchy (ancestors and descendants) for a given concept_id
and organizes it into a nested dictionary to represent the tree structure.
"""
if not isinstance(concept_id, int):
# this check is important to avoid SQL injection risk
raise ValueError("concept_id must be an integer")
stages = ["Queried concept hierarchy", "Fetched concept details", "Built hierarchy tree"]
progress = tqdm(total=len(stages), desc="Concept Hierarchy", unit="stage")
progress.set_postfix_str(stages[0])
# Inline the concept_id directly into the query
query = f"""
WITH RECURSIVE concept_hierarchy AS (
SELECT ancestor_concept_id, descendant_concept_id, min_levels_of_separation
FROM concept_ancestor
WHERE ancestor_concept_id = {concept_id} OR descendant_concept_id = {concept_id}
UNION
SELECT ca.ancestor_concept_id, ca.descendant_concept_id, ca.min_levels_of_separation
FROM concept_ancestor ca
JOIN concept_hierarchy ch ON ca.ancestor_concept_id = ch.descendant_concept_id
)
SELECT ancestor_concept_id, descendant_concept_id
FROM concept_hierarchy
WHERE min_levels_of_separation > 0
"""
results = self.execute_query(query)
progress.update(1)
progress.set_postfix_str(stages[1])
# Collect all unique concept IDs involved in the hierarchy using set comprehension
concept_ids = {row["ancestor_concept_id"] for row in results} | {
row["descendant_concept_id"] for row in results
}
# Fetch details of each concept
concept_details = {}
if concept_ids:
# Convert set of integers to comma-separated string
concept_ids_str = ", ".join(str(cid) for cid in concept_ids)
query = f"""
SELECT concept_id, concept_name, vocabulary_id, concept_code
FROM concept
WHERE concept_id IN ({concept_ids_str})
"""
result = self.execute_query(query)
concept_details = {row["concept_id"]: row for row in result}
progress.update(1)
progress.set_postfix_str(stages[2])
# Build the hierarchy tree using a dictionary
hierarchy = {}
reverse_hierarchy = {}
for row in results:
ancestor_id = row["ancestor_concept_id"]
descendant_id = row["descendant_concept_id"]
ancestor_entry = hierarchy.setdefault(
ancestor_id, {"details": concept_details[ancestor_id], "children": []}
)
descendant_entry = hierarchy.setdefault(
descendant_id, {"details": concept_details[descendant_id], "children": []}
)
ancestor_entry["children"].append(descendant_entry)
desc_entry_rev = reverse_hierarchy.setdefault(
descendant_id, {"details": concept_details[descendant_id], "parents": []}
)
ancestor_entry_rev = reverse_hierarchy.setdefault(
ancestor_id, {"details": concept_details[ancestor_id], "parents": []}
)
desc_entry_rev["parents"].append(ancestor_entry_rev)
progress.update(1)
progress.close()
# Return the parent hierarchy and children hierarchy of the specified concept
return reverse_hierarchy[concept_id], hierarchy[concept_id]
def close(self):
if isinstance(self.engine, duckdb.DuckDBPyConnection):
self.engine.close()
else:
self.engine.dispose() # pragma: no cover
OMOPCDMDatabase._instance = None
notify_users("Connection to the OMOP CDM database closed.")