|
1 | 1 | from functools import wraps |
2 | 2 | from typing import List, Optional |
3 | 3 | from flask import current_app, redirect, request, url_for, session, abort |
| 4 | +from sqlalchemy import select |
4 | 5 |
|
5 | 6 | from flowapp import __version__, db, validators |
6 | 7 | from flowapp.models import Flowspec4, Flowspec6, RTBH, Whitelist, get_user_nets |
@@ -153,35 +154,35 @@ def get_user_allowed_rule_ids(rule_type: str, user_id: int, user_role_ids: List[ |
153 | 154 | # Admin users can modify any rules |
154 | 155 | if 3 in user_role_ids: |
155 | 156 | if rule_type == "ipv4": |
156 | | - return [r.id for r in db.session.query(Flowspec4.id).all()] |
| 157 | + return list(db.session.scalars(select(Flowspec4.id))) |
157 | 158 | elif rule_type == "ipv6": |
158 | | - return [r.id for r in db.session.query(Flowspec6.id).all()] |
| 159 | + return list(db.session.scalars(select(Flowspec6.id))) |
159 | 160 | elif rule_type == "rtbh": |
160 | | - return [r.id for r in db.session.query(RTBH.id).all()] |
| 161 | + return list(db.session.scalars(select(RTBH.id))) |
161 | 162 | elif rule_type == "whitelist": |
162 | | - return [r.id for r in db.session.query(Whitelist.id).all()] |
| 163 | + return list(db.session.scalars(select(Whitelist.id))) |
163 | 164 | return [] |
164 | 165 |
|
165 | 166 | # Regular users - filter by network ranges |
166 | 167 | net_ranges = get_user_nets(user_id) |
167 | 168 |
|
168 | 169 | if rule_type == "ipv4": |
169 | | - rules = db.session.query(Flowspec4).all() |
| 170 | + rules = db.session.scalars(select(Flowspec4)).all() |
170 | 171 | filtered_rules = validators.filter_rules_in_network(net_ranges, rules) |
171 | 172 | return [r.id for r in filtered_rules] |
172 | 173 |
|
173 | 174 | elif rule_type == "ipv6": |
174 | | - rules = db.session.query(Flowspec6).all() |
| 175 | + rules = db.session.scalars(select(Flowspec6)).all() |
175 | 176 | filtered_rules = validators.filter_rules_in_network(net_ranges, rules) |
176 | 177 | return [r.id for r in filtered_rules] |
177 | 178 |
|
178 | 179 | elif rule_type == "rtbh": |
179 | | - rules = db.session.query(RTBH).all() |
| 180 | + rules = db.session.scalars(select(RTBH)).all() |
180 | 181 | filtered_rules = validators.filter_rtbh_rules(net_ranges, rules) |
181 | 182 | return [r.id for r in filtered_rules] |
182 | 183 |
|
183 | 184 | elif rule_type == "whitelist": |
184 | | - rules = db.session.query(Whitelist).all() |
| 185 | + rules = db.session.scalars(select(Whitelist)).all() |
185 | 186 | filtered_rules = validators.filter_rules_in_network(net_ranges, rules) |
186 | 187 | return [r.id for r in filtered_rules] |
187 | 188 |
|
|
0 commit comments