Skip to content
88 changes: 56 additions & 32 deletions containers/airflow/dags/brasil/sinan/dengue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def update_dengue(egh_conn: dict):
import pandas as pd

from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError
from pysus.online_data import parquets_to_dataframe
from pysus.ftp.databases.sinan import SINAN

Expand All @@ -49,6 +50,31 @@ def update_dengue(egh_conn: dict):
tablename = "sinan_dengue_m"
files = sinan.get_files(dis_code=dis_code)


def insert_parquets(parquet_dir: str, year: int):
"""
Insert parquet dir into database using its chunks. Delete the chunk
and the directory after insertion.
"""
for parquet in os.listdir(parquet_dir):
file = os.path.join(parquet_dir, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')
df.columns = df.columns.str.lower()
df['year'] = year
df['prelim'] = False
df.to_sql(
name=tablename,
con=create_engine(egh_conn['URI']),
schema="brasil",
if_exists='append',
index=False
)
del df
os.remove(file)
logging.debug(f"{file} inserted into db")
os.rmdir(parquets.path)


f_stage = {}
for file in files:
code, year = sinan.format(file)
Expand Down Expand Up @@ -88,22 +114,21 @@ def update_dengue(egh_conn: dict):

parquets = sinan.download(sinan.get_files(dis_code, year))

for parquet in os.listdir(parquets.path):
file = os.path.join(parquets.path, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')
df.columns = df.columns.str.lower()
df['year'] = year
df['prelim'] = False
df.to_sql(
name=tablename,
con=create_engine(egh_conn['URI']),
schema="brasil",
if_exists='append',
index=False
)
del df
os.remove(file)
logging.debug(f"{file} inserted into db")
try:
insert_parquets(parquets.path, year)
except ProgrammingError as error:
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
# Include new columns to table
column_name = str(error).split('"')[1]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that obtaining the missing column name from the error message is not a good approach, because if psycopg2 changes the wording in their error messages it will break our code. I think we should instead look at the list of column names of the parquet files and compare them with the columns in the current schema. From the difference in these lists, which can be efficiently obtained as list(set(cols1)-set(cols2)), we can then create the alter table query adding the new columns to the database table. With this approach, we don't even need to rely on an exception being raised. This determination of the missing columns can be done before the first insert.

with create_engine(egh_conn['URI']).connect() as conn:
conn.execute(text(
f'ALTER TABLE brasil.{tablename}'
f' ADD COLUMN {column_name} TEXT'
))
conn.commit()
logging.warning(f"Column {column_name} added into {tablename}")
insert_parquets(parquets.path, year)

os.rmdir(parquets.path)

for year in f_stage['prelim']:
Expand All @@ -116,22 +141,21 @@ def update_dengue(egh_conn: dict):

parquets = sinan.download(sinan.get_files(dis_code, year))

for parquet in os.listdir(parquets.path):
file = os.path.join(parquets.path, parquet)
df = pd.read_parquet(str(file), engine='fastparquet')
df.columns = df.columns.str.lower()
df['year'] = year
df['prelim'] = True
df.to_sql(
name=tablename,
con=create_engine(egh_conn['URI']),
schema="brasil",
if_exists='append',
index=False
)
del df
os.remove(file)
logging.debug(f"{file} inserted into db")
try:
insert_parquets(parquets.path, year)
except ProgrammingError as error:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
# Include new columns to table
column_name = str(error).split('"')[1]
with create_engine(egh_conn['URI']).connect() as conn:
conn.execute(text(
f'ALTER TABLE brasil.{tablename}'
f' ADD COLUMN {column_name} TEXT'
))
conn.commit()
logging.warning(f"Column {column_name} added into {tablename}")
insert_parquets(parquets.path, year)

os.rmdir(parquets.path)

update_dengue(CONN)