11from __future__ import annotations
22from abc import ABC , abstractmethod
3+ from collections import defaultdict
34
45from ..nodes .abstract_node import AbstractNode
56from ..common .index .uni_index import UniIndex
67from ..common .index .advanced_index import AdvancedIndex
78from ..core .tuple import TupleState , AbstractTuple
89from ..common .joiner_type import JoinerType
910
10- # --- Start of Bug Fix ---
1111# Defines the logical inverse for each joiner type, which is essential for
1212# creating symmetrical join logic within the BetaNode.
1313JOINER_INVERSES = {
2121 JoinerType .RANGE_CONTAINS : JoinerType .RANGE_WITHIN ,
2222 JoinerType .RANGE_WITHIN : JoinerType .RANGE_CONTAINS ,
2323}
24- # --- End of Bug Fix ---
2524
2625
2726class BetaNode (AbstractNode , ABC ):
2827 """
2928 An abstract base class for all join nodes (Bi, Tri, Quad, etc.).
3029 It contains the common logic for indexing, matching, and propagating tuples.
30+ This version includes reverse indices for O(1) retraction performance.
3131 """
3232 def __init__ (self , node_id , joiner_type , left_index_properties , right_index_properties , scheduler , tuple_pool ):
3333 super ().__init__ (node_id )
3434 self .scheduler = scheduler
3535 self .tuple_pool = tuple_pool
3636 self .joiner_type = joiner_type
37-
38- # --- Start of Bug Fix ---
39- # The join is defined from left to right (e.g., left.key < right.key).
40- # When a new right_tuple arrives, we probe the left_index to find left_tuples
41- # that satisfy the original joiner (e.g., left.key < new_right.key).
42- self .left_index = self ._create_index (left_index_properties , joiner_type )
4337
44- # When a new left_tuple arrives, we must probe the right_index using the
45- # inverse joiner to find right_tuples that satisfy the condition.
46- # e.g., to find 'right' where 'new_left.key < right.key', we must query for
47- # 'right.key > new_left.key'.
38+ self .left_index = self ._create_index (left_index_properties , joiner_type )
4839 inverse_joiner = JOINER_INVERSES .get (joiner_type )
4940 if inverse_joiner is None :
5041 raise ValueError (f"Joiner type { joiner_type } has no defined inverse." )
5142 self .right_index = self ._create_index (right_index_properties , inverse_joiner )
52- # --- End of Bug Fix ---
53-
43+
5444 self .beta_memory = {}
45+ self .left_to_pairs = defaultdict (list )
46+ self .right_to_pairs = defaultdict (list )
5547
5648 def __repr__ (self ) -> str :
5749 """Overrides base representation to include the joiner type."""
5850 return f"<{ self .__class__ .__name__ } id={ self ._node_id } joiner={ self .joiner_type .name } >"
5951
60-
6152 def _create_index (self , props , joiner ):
6253 """Factory method to create the appropriate index based on joiner type."""
6354 if joiner == JoinerType .EQUAL :
@@ -70,40 +61,73 @@ def _create_child_tuple(self, left_tuple: AbstractTuple, right_tuple: AbstractTu
7061 pass
7162
7263 def insert_left (self , left_tuple : AbstractTuple ):
64+ """Handles insertion of a tuple from the left parent."""
7365 self .left_index .put (left_tuple )
7466 key = self .left_index ._index_properties .get_property (left_tuple )
67+ # Probe the opposing index for matches
7568 right_matches = self .right_index .get_matches (key ) if hasattr (self .right_index , 'get_matches' ) else self .right_index .get (key )
7669 for right_tuple in right_matches :
77- self .create_and_schedule_child (left_tuple , right_tuple )
70+ self .create_and_propagate_child (left_tuple , right_tuple )
7871
7972 def insert_right (self , right_tuple : AbstractTuple ):
73+ """Handles insertion of a tuple from the right parent."""
8074 self .right_index .put (right_tuple )
8175 key = self .right_index ._index_properties .get_property (right_tuple )
76+ # Probe the opposing index for matches
8277 left_matches = self .left_index .get_matches (key ) if hasattr (self .left_index , 'get_matches' ) else self .left_index .get (key )
8378 for left_tuple in left_matches :
84- self .create_and_schedule_child (left_tuple , right_tuple )
79+ self .create_and_propagate_child (left_tuple , right_tuple )
8580
8681 def retract_left (self , left_tuple : AbstractTuple ):
82+ """Handles retraction of a tuple from the left parent using the reverse index."""
8783 self .left_index .remove (left_tuple )
88- pairs_to_remove = [p for p in self .beta_memory if p [0 ] == left_tuple ]
89- for pair in pairs_to_remove :
90- self .retract_and_schedule_child (pair [0 ], pair [1 ])
84+
85+ if left_tuple in self .left_to_pairs :
86+ pairs_to_remove = list (self .left_to_pairs [left_tuple ])
87+
88+ for pair in pairs_to_remove :
89+ self .retract_and_propagate_child (pair )
9190
9291 def retract_right (self , right_tuple : AbstractTuple ):
92+ """Handles retraction of a tuple from the right parent using the reverse index."""
9393 self .right_index .remove (right_tuple )
94- pairs_to_remove = [p for p in self .beta_memory if p [1 ] == right_tuple ]
95- for pair in pairs_to_remove :
96- self .retract_and_schedule_child (pair [0 ], pair [1 ])
94+
95+ if right_tuple in self .right_to_pairs :
96+ pairs_to_remove = list (self .right_to_pairs [right_tuple ])
97+
98+ for pair in pairs_to_remove :
99+ self .retract_and_propagate_child (pair )
100+
101+ def create_and_propagate_child (self , left_tuple : AbstractTuple , right_tuple : AbstractTuple ):
102+ """Creates a new child tuple, stores it, updates indices, and schedules propagation."""
103+ pair = (left_tuple , right_tuple )
104+ if pair in self .beta_memory :
105+ return # Avoid creating duplicate children
97106
98- def create_and_schedule_child (self , left_tuple : AbstractTuple , right_tuple : AbstractTuple ):
99107 child = self ._create_child_tuple (left_tuple , right_tuple )
100108 child .node , child .state = self , TupleState .CREATING
101- self .beta_memory [(left_tuple , right_tuple )] = child
109+ self .beta_memory [pair ] = child
110+
111+ self .left_to_pairs [left_tuple ].append (pair )
112+ self .right_to_pairs [right_tuple ].append (pair )
113+
102114 self .scheduler .schedule (child )
103115
104- def retract_and_schedule_child (self , left : AbstractTuple , right : AbstractTuple ):
105- child = self .beta_memory .pop ((left , right ), None )
116+ def retract_and_propagate_child (self , pair : tuple [AbstractTuple , AbstractTuple ]):
117+ """Removes a child tuple, cleans up all indices, and schedules retraction."""
118+ child = self .beta_memory .pop (pair , None )
106119 if child :
120+ left , right = pair
121+ if left in self .left_to_pairs :
122+ self .left_to_pairs [left ].remove (pair )
123+ if not self .left_to_pairs [left ]:
124+ del self .left_to_pairs [left ]
125+
126+ if right in self .right_to_pairs :
127+ self .right_to_pairs [right ].remove (pair )
128+ if not self .right_to_pairs [right ]:
129+ del self .right_to_pairs [right ]
130+
107131 if child .state == TupleState .CREATING :
108132 child .state = TupleState .ABORTING
109133 elif not child .state .is_dirty ():
0 commit comments