-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathservice_scope_resolver.py
More file actions
384 lines (318 loc) · 14 KB
/
service_scope_resolver.py
File metadata and controls
384 lines (318 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
import logging
from odoo import api, models
_logger = logging.getLogger(__name__)
class ScopeResolverService(models.AbstractModel):
"""
Service to resolve aggregation scopes to partner IDs.
Uses strategy pattern to handle different scope types.
Each scope type has a dedicated resolver method.
"""
_name = "spp.aggregation.scope.resolver"
_description = "Aggregation Scope Resolver"
@api.model
def resolve(self, scope):
"""
Resolve a scope to a list of partner IDs.
:param scope: spp.aggregation.scope record or dict for inline scope
:returns: List of partner IDs
:rtype: list[int]
"""
if isinstance(scope, dict):
return self._resolve_inline(scope)
scope.ensure_one()
scope_type = scope.scope_type
resolver_method = getattr(self, f"_resolve_{scope_type}", None)
if resolver_method is None:
_logger.error("No resolver for scope type: %s", scope_type)
return []
try:
return resolver_method(scope)
except Exception as e:
scope_name = scope.name
_logger.error("Error resolving scope %s: %s", scope_name, e)
return []
def _resolve_inline(self, scope_dict):
"""
Resolve an inline scope definition (dict).
Creates a temporary scope record to leverage existing resolver methods.
"""
scope_type = scope_dict.get("scope_type")
if not scope_type:
_logger.error("Inline scope missing scope_type")
return []
# Map inline scope dict to resolver
resolver_map = {
"cel": self._resolve_cel_inline,
"area": self._resolve_area_inline,
"area_tag": self._resolve_area_tag_inline,
"spatial_polygon": self._resolve_spatial_polygon_inline,
"spatial_buffer": self._resolve_spatial_buffer_inline,
"explicit": self._resolve_explicit_inline,
"all_registrants": self._resolve_all_registrants_inline,
}
resolver = resolver_map.get(scope_type)
if not resolver:
_logger.error("No resolver for inline scope type: %s", scope_type)
return []
try:
return resolver(scope_dict)
except Exception as e:
_logger.error("Error resolving inline scope: %s", e)
return []
# -------------------------------------------------------------------------
# CEL Resolution
# -------------------------------------------------------------------------
def _resolve_cel(self, scope):
"""Resolve a CEL expression scope."""
return self._resolve_cel_expression(
scope.cel_expression,
scope.cel_profile or "registry_individuals",
)
def _resolve_cel_inline(self, scope_dict):
"""Resolve an inline CEL scope."""
return self._resolve_cel_expression(
scope_dict.get("cel_expression", ""),
scope_dict.get("cel_profile", "registry_individuals"),
)
def _resolve_cel_expression(self, expression, profile):
"""Execute CEL expression and return matching IDs."""
if not expression:
return []
executor = self.env.get("spp.cel.executor")
if not executor:
_logger.error("CEL executor not available")
return []
executor = executor.sudo() # nosemgrep: odoo-sudo-without-context
all_ids = []
try:
for batch_ids in executor.compile_for_batch("res.partner", expression, batch_size=5000):
all_ids.extend(batch_ids)
except Exception as e:
_logger.error("CEL execution failed: %s", e)
return []
return all_ids
# -------------------------------------------------------------------------
# Area Resolution
# -------------------------------------------------------------------------
def _resolve_area(self, scope):
"""Resolve an area scope to partner IDs."""
area = scope.area_id
if not area:
return []
return self._resolve_area_ids([area.id], scope.include_child_areas)
def _resolve_area_inline(self, scope_dict):
"""Resolve an inline area scope."""
area_id = scope_dict.get("area_id")
if not area_id:
return []
include_children = scope_dict.get("include_child_areas", True)
return self._resolve_area_ids([area_id], include_children)
def _resolve_area_ids(self, area_ids, include_children=True):
"""Resolve area IDs to partner IDs.
Returns registrants directly in the given areas, plus individuals
whose group (household) is in those areas but who lack their own
area_id assignment.
"""
if not area_ids:
return []
# Build area domain (sudo for model reads - callers may be unprivileged)
if include_children:
# Collect all parent_path values first, then do a single search using
# OR-chained domain conditions to avoid N+1 queries inside a loop.
areas = self.env["spp.area"].sudo().browse(area_ids) # nosemgrep: odoo-sudo-without-context
all_area_ids = set(area_ids)
parent_paths = [area.parent_path for area in areas if area.parent_path]
if parent_paths:
domain = ["|"] * (len(parent_paths) - 1)
for path in parent_paths:
domain.append(("parent_path", "like", f"{path}%"))
child_areas = self.env["spp.area"].sudo().search(domain) # nosemgrep: odoo-sudo-without-context
all_area_ids.update(child_areas.ids)
area_ids = list(all_area_ids)
# Find registrants directly in these areas
domain = [
("is_registrant", "=", True),
("area_id", "in", area_ids),
]
direct_ids = set(
self.env["res.partner"] # nosemgrep: odoo-sudo-without-context, odoo-sudo-on-sensitive-models
.sudo()
.search(domain)
.ids
)
# Also find individuals without area_id whose group is in these areas
Membership = self.env["spp.group.membership"].sudo() # nosemgrep: odoo-sudo-without-context
memberships = Membership.search(
[
("group.area_id", "in", area_ids),
("individual.area_id", "=", False),
("individual.is_registrant", "=", True),
("is_ended", "=", False),
]
)
indirect_ids = set(memberships.mapped("individual").ids)
return list(direct_ids | indirect_ids)
# -------------------------------------------------------------------------
# Area Tag Resolution
# -------------------------------------------------------------------------
def _resolve_area_tag(self, scope):
"""Resolve an area tag scope to partner IDs."""
tag_ids = scope.area_tag_ids.ids
if not tag_ids:
return []
include_children = scope.include_child_areas
return self._resolve_area_tag_ids(tag_ids, include_children)
def _resolve_area_tag_inline(self, scope_dict):
"""Resolve an inline area tag scope."""
tag_ids = scope_dict.get("area_tag_ids", [])
if not tag_ids:
return []
include_children = scope_dict.get("include_child_areas", True)
return self._resolve_area_tag_ids(tag_ids, include_children)
def _resolve_area_tag_ids(self, tag_ids, include_children=True):
"""Resolve area tag IDs to partner IDs."""
if not tag_ids:
return []
# Find areas with these tags (sudo for model reads - callers may be unprivileged)
areas = self.env["spp.area"].sudo().search([("tag_ids", "in", tag_ids)]) # nosemgrep: odoo-sudo-without-context
if not areas:
return []
return self._resolve_area_ids(areas.ids, include_children)
# -------------------------------------------------------------------------
# Spatial Resolution (basic, full PostGIS in bridge module)
# -------------------------------------------------------------------------
def _resolve_spatial_polygon(self, scope):
"""Resolve a spatial polygon scope."""
geojson = scope.geometry_geojson
if not geojson:
return []
return self._resolve_spatial_polygon_geometry(geojson)
def _resolve_spatial_polygon_inline(self, scope_dict):
"""Resolve an inline spatial polygon scope."""
geojson = scope_dict.get("geometry_geojson")
if not geojson:
return []
return self._resolve_spatial_polygon_geometry(geojson)
def _resolve_spatial_polygon_geometry(self, geojson_str):
"""
Resolve spatial polygon to partner IDs.
This is a basic implementation. For full PostGIS support,
install the spp_aggregation_spatial bridge module.
"""
# Check if PostGIS bridge is available
spatial_resolver = self.env.get("spp.aggregation.spatial.resolver")
if spatial_resolver:
return spatial_resolver.resolve_polygon(geojson_str)
# Fallback: no spatial support
_logger.warning("Spatial polygon scope requires spp_aggregation_spatial module. Returning empty result.")
return []
def _resolve_spatial_buffer(self, scope):
"""Resolve a spatial buffer scope."""
return self._resolve_spatial_buffer_params(
scope.buffer_center_latitude,
scope.buffer_center_longitude,
scope.buffer_radius_km,
)
def _resolve_spatial_buffer_inline(self, scope_dict):
"""Resolve an inline spatial buffer scope."""
return self._resolve_spatial_buffer_params(
scope_dict.get("buffer_center_latitude"),
scope_dict.get("buffer_center_longitude"),
scope_dict.get("buffer_radius_km"),
)
def _resolve_spatial_buffer_params(self, latitude, longitude, radius_km):
"""
Resolve spatial buffer to partner IDs.
This is a basic implementation. For full PostGIS support,
install the spp_aggregation_spatial bridge module.
"""
if not all([latitude, longitude, radius_km]):
return []
# Check if PostGIS bridge is available
spatial_resolver = self.env.get("spp.aggregation.spatial.resolver")
if spatial_resolver:
return spatial_resolver.resolve_buffer(latitude, longitude, radius_km)
# Fallback: no spatial support
_logger.warning("Spatial buffer scope requires spp_aggregation_spatial module. Returning empty result.")
return []
# -------------------------------------------------------------------------
# Simulation Resolution (added by spp_simulation module)
# -------------------------------------------------------------------------
# Note: _resolve_simulation method is added by spp_simulation when installed
# -------------------------------------------------------------------------
# Explicit Resolution
# -------------------------------------------------------------------------
def _resolve_explicit(self, scope):
"""Resolve an explicit ID list scope."""
return scope.explicit_partner_ids.ids
def _resolve_explicit_inline(self, scope_dict):
"""Resolve an inline explicit scope."""
partner_ids = scope_dict.get("explicit_partner_ids", [])
if not partner_ids:
return []
# Validate that these are actual registrants (sudo for model reads - callers may be unprivileged)
valid_ids = (
self.env["res.partner"] # nosemgrep: odoo-sudo-without-context, odoo-sudo-on-sensitive-models
.sudo()
.search(
[
("id", "in", partner_ids),
("is_registrant", "=", True),
]
)
.ids
)
return valid_ids
# -------------------------------------------------------------------------
# All Registrants Resolution
# -------------------------------------------------------------------------
def _resolve_all_registrants_inline(self, scope_dict):
"""Resolve all registrants scope.
Returns IDs of all registrants in the system. Callers use this
instead of explicit scope so they don't need to enumerate IDs
up front; the search is done here in a single query.
"""
return (
self.env["res.partner"] # nosemgrep: odoo-sudo-without-context, odoo-sudo-on-sensitive-models
.sudo()
.search([("is_registrant", "=", True)])
.ids
)
# -------------------------------------------------------------------------
# Batch Resolution
# -------------------------------------------------------------------------
@api.model
def resolve_multiple(self, scopes):
"""
Resolve multiple scopes and return combined IDs.
:param scopes: List of scope records or dicts
:returns: Combined list of unique partner IDs
:rtype: list[int]
"""
all_ids = set()
for scope in scopes:
ids = self.resolve(scope)
all_ids.update(ids)
return list(all_ids)
@api.model
def resolve_intersect(self, scopes):
"""
Resolve multiple scopes and return intersection of IDs.
:param scopes: List of scope records or dicts
:returns: List of partner IDs present in ALL scopes
:rtype: list[int]
"""
if not scopes:
return []
result_ids = None
for scope in scopes:
ids = set(self.resolve(scope))
if result_ids is None:
result_ids = ids
else:
result_ids = result_ids.intersection(ids)
# Short-circuit if intersection is empty
if not result_ids:
return []
return list(result_ids) if result_ids else []