File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -160,6 +160,18 @@ def _filter_queues(self, queues):
160160 )
161161 ]
162162
163+ def _worker_perform_secondary_tasks (self ):
164+ # We should queue scheduled tasks every QUEUE_SCHEDULED_TASKS_TIME time
165+ # and expired tasks every REQUEUE_EXPIRED_TASKS_INTERVAL time. Use only
166+ # one Redis query to enter this block since every single worker calls
167+ # this every second.
168+ # XXX: Ideally, we should keep track of workers and take turns.
169+ key = self ._key ("lock" , "secondary_tasks" )
170+ # if self.connection.set(key, "1", ex=1, nx=True):
171+ if True :
172+ self ._worker_queue_scheduled_tasks ()
173+ self ._worker_queue_expired_tasks ()
174+
163175 def _worker_queue_scheduled_tasks (self ):
164176 """
165177 Helper method that takes due tasks from the SCHEDULED queue and puts
@@ -1046,8 +1058,7 @@ def _worker_run(self):
10461058 time .time () - self ._last_task_check > self .config ['SELECT_TIMEOUT' ]
10471059 and not self ._stop_requested
10481060 ):
1049- self ._worker_queue_scheduled_tasks ()
1050- self ._worker_queue_expired_tasks ()
1061+ self ._worker_perform_secondary_tasks ()
10511062 self ._last_task_check = time .time ()
10521063
10531064 def _queue_periodic_tasks (self ):
You can’t perform that action at this time.
0 commit comments