Merge branch 'master' of github.com:nvie/rq

Conflicts:
	tests/test_job.py
main
Selwin Ong 9 years ago
commit e538512c79

@ -0,0 +1,6 @@
Cal Leeming <cal@iops.io> <cal.leeming@simplicitymedialtd.co.uk>
Mark LaPerriere <marklap@gmail.com> <mark.a.laperriere@disney.com>
Selwin Ong <selwin.ong@gmail.com> <selwin@ui.co.id>
Vincent Driessen <me@nvie.com> <vincent@3rdcloud.com>
Vincent Driessen <me@nvie.com> <vincent@datafox.nl>
zhangliyong <lyzhang87@gmail.com> <zhangliyong@umeng.com>

@ -1,3 +1,13 @@
### 0.5.5
(August 25th, 2015)
- Add support for `--exception-handler` command line flag
- Fix compatibility with click>=5.0
- Fix maximum recursion depth problem for very large queues that contain jobs
that all fail
### 0.5.4
(July 8th, 2015)

@ -260,24 +260,26 @@ class Queue(object):
description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front)
def enqueue_job(self, job, at_front=False):
def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution.
If Queue is instantiated with async=False, job is executed immediately.
"""
with self.connection._pipeline() as pipeline:
pipe = pipeline if pipeline is not None else self.connection._pipeline()
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
pipe.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipe)
job.origin = self.name
job.enqueued_at = utcnow()
if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipeline)
job.save(pipeline=pipe)
pipeline.execute()
if pipeline is None:
pipe.execute()
if self._async:
self.push_job_id(job.id, at_front=at_front)
@ -289,15 +291,20 @@ class Queue(object):
# TODO: can probably be pipelined
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True:
job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None:
break
dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent)
self.enqueue_job(dependent)
registry = DeferredJobRegistry(dependent.origin, self.connection)
with self.connection._pipeline() as pipeline:
registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipeline)
else:
queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipeline)
pipeline.execute()
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
VERSION = '0.5.4'
VERSION = '0.5.5'

@ -590,7 +590,12 @@ class Worker(object):
except Exception:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
try:
pipeline.execute()
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass
self.handle_exception(job, *sys.exc_info())
return False

@ -120,4 +120,3 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)

@ -350,6 +350,45 @@ class TestQueue(RQTestCase):
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue("queue_1")
q_2 = Queue("queue_2")
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
# Each queue has its own DeferredJobRegistry
registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn)
self.assertEqual(
set(registry_1.get_job_ids()),
set([job_1.id])
)
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
self.assertEqual(
set(registry_2.get_job_ids()),
set([job_2.id])
)
# After dependents is enqueued, job_1 on queue_1 and
# job_2 should be in queue_2
self.assertEqual(q_1.job_ids, [])
self.assertEqual(q_2.job_ids, [])
q_1.enqueue_dependents(parent_job)
q_2.enqueue_dependents(parent_job)
self.assertEqual(set(q_1.job_ids), set([job_1.id]))
self.assertEqual(set(q_2.job_ids), set([job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry_1.get_job_ids(), [])
self.assertEqual(registry_2.get_job_ids(), [])
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued

Loading…
Cancel
Save