forked from Python-Ensemble-Toolbox/PET
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_tools.py
More file actions
233 lines (170 loc) · 7.91 KB
/
data_tools.py
File metadata and controls
233 lines (170 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
__author__ = 'Mathias Methlie Nilsen'
import numpy as np
import pandas as pd
__all__ = [
'combine_ensemble_predictions',
'en_pred_to_pred_data',
'merge_dataframes',
'multilevel_to_singlelevel_columns',
'dataframe_to_series',
'series_to_dataframe',
'series_to_matrix',
'dataframe_to_matrix'
]
def combine_ensemble_predictions(en_pred, dataypes, true_order) -> pd.DataFrame:
index_name, index = true_order
# Initialize empty DataFrame
df = pd.DataFrame(columns=dataypes, index=index)
df.index.name = index_name
# Check en_pred is iterable
if not isinstance(en_pred, (list, tuple, np.ndarray)):
raise ValueError('en_pred must be a list, tuple, or ndarray of ensemble predictions.')
#----------------------------------------------------------------------------------------------
if all(isinstance(el, (list, tuple, np.ndarray)) for el in en_pred):
if all(isinstance(el, dict) for el in en_pred[0]):
pred_data = en_pred_to_pred_data(en_pred)
#pred_data = [
# {typ: np.concatenate(tuple((el[ind][typ][:, np.newaxis]) for el in en_pred), axis=1)
# if any(elem is not None for elem in tuple((el[ind][typ]) for el in en_pred))
# else None for typ in en_pred[0][0].keys()} for ind in range(len(en_pred[0]))
#]
# Fill in DataFrame
for i, ind in enumerate(index):
for key in dataypes:
if not key in pred_data[i]:
raise ValueError(f'Key {key} not found in pred_data at index {i}.')
if pred_data[i][key] is not None:
df.at[ind, key] = np.squeeze(pred_data[i][key])
else:
df.at[ind, key] = np.nan
else:
raise ValueError('Unsupported nested structure in en_pred.')
#----------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------
elif all(isinstance(el, dict) for el in en_pred):
# Combine dicts to one dict with concatenated arrays
pred_data_dict = {}
for key in en_pred[0].keys():
member_list = []
for el in en_pred:
member_data = el[key][:, np.newaxis]
member_list.append(member_data)
pred_data_dict[key] = np.concatenate(tuple(member_list), axis=1)
# Fill in DataFrame
for i, ind in enumerate(index):
for key in dataypes:
if not key in pred_data_dict:
raise ValueError(f'Key {key} not found in pred_data_dict.')
if pred_data_dict[key] is not None:
df.at[ind, key] = np.squeeze(pred_data_dict[key][i, :])
else:
df.at[ind, key] = np.nan
#----------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------
elif all(isinstance(el, pd.DataFrame) for el in en_pred):
# Fill in DataFrame
for i, ind in enumerate(index):
for key in dataypes:
if not key in en_pred[0].columns:
raise ValueError(f'Key {key} not found in DataFrame columns.')
member_data = []
for el in en_pred:
member_data.append(el.at[ind, key])
df.at[ind, key] = np.squeeze(np.array(member_data))
#----------------------------------------------------------------------------------------------
return df
def en_pred_to_pred_data(en_pred):
'''
This is equvalent to the famouse one-liner from the wizard known as Kristian Fossum!
A big thanks to copilot for helpeing me decode the wizards spell to make this function.
'''
pred_data = []
# Loop over each time step
for ind in range(len(en_pred[0])):
data_type_dict = {}
# Loop over each data type
for typ in en_pred[0][0].keys():
# Check if any ensemble member has non-None data for this type and time step
has_data = False
for el in en_pred:
if el[ind][typ] is not None:
has_data = True
break
# If at least one member has data, concatenate all members
if has_data:
member_list = []
for el in en_pred:
if not isinstance(el[ind][typ],np.ndarray):
member_data = np.array([el[ind][typ]])[:, np.newaxis]
else:
member_data = el[ind][typ][:, np.newaxis]
member_list.append(member_data)
data_type_dict[typ] = np.concatenate(tuple(member_list), axis=1)
else:
# Otherwise, store None
data_type_dict[typ] = None
pred_data.append(data_type_dict)
return pred_data
def merge_dataframes(en_dfs: list[pd.DataFrame]) -> pd.DataFrame:
'''
Combine a list of DataFrames (one per ensemble member) into a single DataFrame
where each cell contains an array of ensemble values.
'''
if not all(isinstance(df, pd.DataFrame) for df in en_dfs):
raise ValueError('All elements in en_dfs must be pandas DataFrames.')
# Initialize empty DataFrame with same index and columns as the first DataFrame
df = pd.DataFrame(index=en_dfs[0].index, columns=en_dfs[0].columns)
df.index.name = en_dfs[0].index.name
# Loop over each cell and combine ensemble values into arrays
for idx in df.index:
for col in df.columns:
values = []
for dfn in en_dfs:
values.append(dfn.at[idx, col])
df.at[idx, col] = np.array(values).squeeze().T
return df
def multilevel_to_singlelevel_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert a MultiIndex-column DataFrame with structure (key, param)
into a DataFrame with one column per key, where the value is
the concatenation of all param-arrays for that key.
"""
result = {}
# Top-level keys (level 0 of MultiIndex), preserving first appearance order
keys = pd.Index(df.columns.get_level_values(0)).unique()
for key in keys:
# Extract all columns for this key → list of arrays per row
param_arrays = df[key] # this is a sub-dataframe for this key
# For each row, concatenate arrays from all params
concatenated = [
np.concatenate(param_arrays.iloc[i].values)
for i in range(len(df))
]
result[key] = concatenated
df_new = pd.DataFrame(result, index=df.index)
df_new.index.name = df.index.name
return df_new
def dataframe_to_series(df):
mult_index = []
for idx in df.index:
for col in df.columns:
mult_index.append((idx, col))
mult_index = pd.MultiIndex.from_tuples(mult_index, names=[df.index.name, 'datatype'])
values = []
for idx in df.index:
for col in df.columns:
values.append(df.loc[idx, col])
return pd.Series(values, index=mult_index)
def series_to_dataframe(series):
col = series.index.get_level_values('datatype').unique()
idx = series.index.get_level_values(series.index.names[0]).unique()
df = pd.DataFrame(index=idx, columns=col.values)
for (date, datatype), value in series.items():
df.at[date, datatype] = value
return df
def series_to_matrix(series):
val = np.array([v for v in series.values])
return val
def dataframe_to_matrix(df):
series = dataframe_to_series(df)
return series_to_matrix(series)