From eadc7db29f6ef6bbd016fbbc5882d1266b75e3dd Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 18 Apr 2013 22:11:43 +0700 Subject: [PATCH 01/16] First stab at writing implementing job dependency. --- rq/job.py | 57 +++++++++++++++++++++++++++++++++++++++++--- rq/queue.py | 29 ++++++++++++++++++---- rq/worker.py | 1 + tests/test_job.py | 35 ++++++++++++++++++++++++++- tests/test_queue.py | 25 +++++++++++++++++++ tests/test_worker.py | 9 +++++++ 6 files changed, 148 insertions(+), 8 deletions(-) diff --git a/rq/job.py b/rq/job.py index a4772a4..2a6efcf 100644 --- a/rq/job.py +++ b/rq/job.py @@ -64,7 +64,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None): + result_ttl=None, status=None, parent=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -87,6 +87,9 @@ class Job(object): job.description = job.get_call_string() job.result_ttl = result_ttl job._status = status + # parent could be job instance or id + if parent is not None: + job._parent_id = parent.id if isinstance(parent, Job) else parent return job @property @@ -119,6 +122,20 @@ class Job(object): def is_started(self): return self.status == Status.STARTED + @property + def parent(self): + """Returns a job's parent. To avoid repeated Redis fetches, we cache + job.parent as job._parent. + """ + if self._parent_id is None: + return None + if hasattr(self, '_parent'): + return self._parent + job = Job.fetch(self._parent_id, connection=self.connection) + job.refresh() + self._parent = job + return job + @property def func(self): func_name = self.func_name @@ -185,6 +202,7 @@ class Job(object): self.timeout = None self.result_ttl = None self._status = None + self._parent_id = None self.meta = {} @@ -208,11 +226,21 @@ class Job(object): """The Redis key that is used to store job hash under.""" return 'rq:job:%s' % (job_id,) + @classmethod + def waitlist_key_for(cls, job_id): + """The Redis key that is used to store job hash under.""" + return 'rq:job:%s:waitlist' % (job_id,) + @property def key(self): """The Redis key that is used to store job hash under.""" return self.key_for(self.id) + @property + def waitlist_key(self): + """The Redis key that is used to store job hash under.""" + return self.waitlist_key_for(self.id) + @property # noqa def job_tuple(self): """Returns the job tuple that encodes the actual function call that @@ -285,6 +313,7 @@ class Job(object): self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = obj.get('status') if obj.get('status') else None + self._parent_id = obj.get('parent_id', None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def save(self, pipeline=None): @@ -315,6 +344,8 @@ class Job(object): obj['result_ttl'] = self.result_ttl if self._status is not None: obj['status'] = self._status + if self._parent_id is not None: + obj['parent_id'] = self._parent_id if self.meta: obj['meta'] = dumps(self.meta) @@ -381,7 +412,26 @@ class Job(object): elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) + + def register_dependency(self): + """Jobs may have a waitlist. Jobs in this waitlist are enqueued + only if the parent job is successfully performed. We maintain this + waitlist in Redis, with key that looks something like: + + rq:job:job_id:waitlist = ['job_id_1', 'job_id_2'] + This method puts the job on it's parent's waitlist. + """ + # TODO: This can probably be pipelined + self.connection.rpush(Job.waitlist_key_for(self._parent_id), self.id) + + def get_waitlist(self): + """Returns all job ids in the waitlist. + """ + # TODO: This can probably be pipelined + + return self.connection.lrange( + self.waitlist_key, 0, self.connection.llen(self.waitlist_key) - 1) def __str__(self): return '' % (self.id, self.description) @@ -413,10 +463,11 @@ class Job(object): def __setattr__(self, name, value): # Ignore the "private" fields - private_attrs = set(['origin', '_func_name', 'ended_at', + private_attrs = set(('origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', - 'data', '_instance', 'result_ttl', '_status', 'status', 'meta']) + 'data', '_instance', 'result_ttl', '_status', 'status', + '_parent_id', '_parent', 'parent', 'meta')) if name in private_attrs: object.__setattr__(self, name, value) diff --git a/rq/queue.py b/rq/queue.py index 3bf5d00..f70033a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -113,7 +113,8 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, + result_ttl=None, after=None): """Creates a job to represent the delayed function call and enqueues it. @@ -123,7 +124,14 @@ class Queue(object): """ timeout = timeout or self._default_timeout job = Job.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED) + result_ttl=result_ttl, status=Status.QUEUED, + parent=after) + # If job depends on another job to finish, register itself on it's + # parent's waitlist instead of enqueueing it + if after is not None: + job.register_dependency() + job.save() + return job return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -149,15 +157,18 @@ class Queue(object): # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) timeout = None result_ttl = None - if 'args' in kwargs or 'kwargs' in kwargs: + after = None + if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) args = kwargs.pop('args', None) result_ttl = kwargs.pop('result_ttl', None) + after = kwargs.pop('after', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl) + timeout=timeout, result_ttl=result_ttl, + after=after) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. @@ -188,6 +199,16 @@ class Queue(object): job.save() return job + def enqueue_waitlist(self, job): + """Enqueues all jobs in the waitlist and clears it""" + # TODO: can probably be pipelined + job_ids = job.get_waitlist() + for job_id in job.get_waitlist(): + waitlisted_job = Job.fetch(job_id, connection=self.connection) + self.enqueue_job(waitlisted_job) + if job_ids: + self.connection.delete(job.waitlist_key) + def pop_job_id(self): """Pops a given job ID from this Redis queue.""" return self.connection.lpop(self.key) diff --git a/rq/worker.py b/rq/worker.py index 8f1da27..8d43f9f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -328,6 +328,7 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) + queue.enqueue_waitlist(job) did_perform_work = True finally: diff --git a/tests/test_job.py b/tests/test_job.py index 8b1d137..87f649e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -131,6 +131,22 @@ class TestJob(RQTestCase): self.testconn.hkeys(job.key), ['created_at', 'data', 'description']) + def test_persistence_of_parent_job(self): + """Storing jobs with parent job, either instance or key.""" + parent_job = Job.create(func=some_calculation) + parent_job.save() + job = Job.create(func=some_calculation, parent=parent_job) + job.save() + stored_job = Job.fetch(job.id) + self.assertEqual(stored_job._parent_id, parent_job.id) + self.assertEqual(stored_job.parent, parent_job) + + job = Job.create(func=some_calculation, parent=parent_job.id) + job.save() + stored_job = Job.fetch(job.id) + self.assertEqual(stored_job._parent_id, parent_job.id) + self.assertEqual(stored_job.parent, parent_job) + def test_store_then_fetch(self): """Store, then fetch.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -242,10 +258,27 @@ class TestJob(RQTestCase): job.cleanup(ttl=-1) self.assertEqual(self.testconn.ttl(job.key), -1) - # Jobs with positive TTLs are eventually deleted + # Jobs with positive TTLs are eventually deleted job.cleanup(ttl=100) self.assertEqual(self.testconn.ttl(job.key), 100) # Jobs with 0 TTL are immediately deleted job.cleanup(ttl=0) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) + + def test_register_dependency(self): + """Test that jobs updates the correct job waitlist""" + job = Job.create(func=say_hello) + job._parent_id = 'id' + job.save() + job.register_dependency() + self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id) + + def test_get_waitlist(self): + """Test that all waitlisted job ids are fetched""" + job = Job.create(func=say_hello) + self.assertEqual(job.get_waitlist(), []) + self.testconn.lpush(job.waitlist_key, 'id_1') + self.assertEqual(job.get_waitlist(), ['id_1']) + self.testconn.lpush(job.waitlist_key, 'id_2') + self.assertEqual(job.get_waitlist(), ['id_2', 'id_1']) diff --git a/tests/test_queue.py b/tests/test_queue.py index 061c984..b596004 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -237,6 +237,31 @@ class TestQueue(RQTestCase): job = q.enqueue(say_hello) self.assertEqual(job.status, Status.QUEUED) + def test_enqueue_waitlist(self): + """Enqueueing a waitlist pushes all jobs in waitlist to queue""" + q = Queue() + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = Job.create(func=say_hello, parent=parent_job) + job_1.save() + job_1.register_dependency() + job_2 = Job.create(func=say_hello, parent=parent_job) + job_2.save() + job_2.register_dependency() + + # After waitlist is enqueued, job_1 and job_2 should be in queue + self.assertEqual(q.job_ids, []) + q.enqueue_waitlist(parent_job) + self.assertEqual(q.job_ids, [job_1.id, job_2.id]) + self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) + + def test_enqueue_job_with_dependency(self): + """Job with dependency is not queued right away""" + parent_job = Job.create(func=say_hello) + q = Queue() + q.enqueue_call(say_hello, after=parent_job) + self.assertEqual(q.job_ids, []) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index d9e2fe5..143fdbc 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -234,3 +234,12 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) + + def test_job_dependency(self): + q = Queue() + w = Worker([q]) + parent_job = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, after=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, 'finished') From 6550f866463cf3d25da5a4ee6b5ec5dcf31f01d3 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 18 Apr 2013 22:18:56 +0700 Subject: [PATCH 02/16] Don't enqueue waitlisted jobs on failed execution. --- rq/queue.py | 3 ++- rq/worker.py | 3 ++- tests/test_worker.py | 9 ++++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index f70033a..31d33c9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -113,7 +113,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, after=None): """Creates a job to represent the delayed function call and enqueues it. @@ -123,6 +123,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout + # TODO: job with dependency shouldn't have "queued" as status job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, parent=after) diff --git a/rq/worker.py b/rq/worker.py index 8d43f9f..6d35656 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -328,7 +328,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) - queue.enqueue_waitlist(job) + if job.status == 'finished': + queue.enqueue_waitlist(job) did_perform_work = True finally: diff --git a/tests/test_worker.py b/tests/test_worker.py index 143fdbc..8615801 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -236,10 +236,17 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_failed, True) def test_job_dependency(self): + """Waitlisted jobs are enqueued only if their parents don't fail""" q = Queue() w = Worker([q]) parent_job = q.enqueue(say_hello) job = q.enqueue_call(say_hello, after=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, 'finished') + self.assertEqual(job.status, 'finished') + + parent_job = q.enqueue(div_by_zero) + job = q.enqueue_call(say_hello, after=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertNotEqual(job.status, 'finished') From 18ff57ef352f39a8c9077cd33c46b0ec0634e242 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2013 11:11:36 +0700 Subject: [PATCH 03/16] Avoid race conditions when enqueueing job with dependency. --- rq/exceptions.py | 4 ++++ rq/queue.py | 25 +++++++++++++++++++------ tests/test_queue.py | 9 ++++++++- tests/test_worker.py | 2 +- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 982a580..7f8df37 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -17,3 +17,7 @@ class UnpickleError(Exception): class DequeueTimeout(Exception): pass + + +class EnqueueError(Exception): + pass \ No newline at end of file diff --git a/rq/queue.py b/rq/queue.py index 31d33c9..da0154a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,10 +1,12 @@ import times from .connections import resolve_connection from .job import Job, Status -from .exceptions import (NoSuchJobError, UnpickleError, - InvalidJobOperationError, DequeueTimeout) +from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError, + NoSuchJobError, UnpickleError) from .compat import total_ordering +from redis import WatchError + def get_failed_queue(connection=None): """Returns a handle to the special failed queue.""" @@ -127,12 +129,23 @@ class Queue(object): job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, parent=after) - # If job depends on another job to finish, register itself on it's + + # If job depends on an unfinished job, register itself on it's # parent's waitlist instead of enqueueing it if after is not None: - job.register_dependency() - job.save() - return job + with self.connection.pipeline() as pipe: + try: + pipe.watch(after.key) + if after.status != Status.FINISHED: + job.register_dependency() + job.save() + return job + except WatchError: + raise EnqueueError( + 'Parent job (%s) modified during enqueue process. ' + + 'Bailing out to avoid race conditions' % after.id + ) + return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/tests/test_queue.py b/tests/test_queue.py index b596004..9304e42 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -256,12 +256,19 @@ class TestQueue(RQTestCase): self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) def test_enqueue_job_with_dependency(self): - """Job with dependency is not queued right away""" + """Test enqueueing job with dependency""" + # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() q.enqueue_call(say_hello, after=parent_job) self.assertEqual(q.job_ids, []) + # Jobs dependent on finished jobs are immediately enqueued + parent_job.status = 'finished' + parent_job.save() + job = q.enqueue_call(say_hello, after=parent_job) + self.assertEqual(q.job_ids, [job.id]) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 8615801..7e4bd2e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -236,7 +236,7 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_failed, True) def test_job_dependency(self): - """Waitlisted jobs are enqueued only if their parents don't fail""" + """Enqueue waitlisted jobs only if their parents don't fail""" q = Queue() w = Worker([q]) parent_job = q.enqueue(say_hello) From 0dfb0413831c2df1df451fa50ffafdc85e0cbd78 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2013 11:20:39 +0700 Subject: [PATCH 04/16] Simplify enqueue_waitlist by using lpop. --- rq/job.py | 8 -------- rq/queue.py | 8 ++++---- tests/test_job.py | 9 --------- 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/rq/job.py b/rq/job.py index 2a6efcf..fbaa424 100644 --- a/rq/job.py +++ b/rq/job.py @@ -425,14 +425,6 @@ class Job(object): # TODO: This can probably be pipelined self.connection.rpush(Job.waitlist_key_for(self._parent_id), self.id) - def get_waitlist(self): - """Returns all job ids in the waitlist. - """ - # TODO: This can probably be pipelined - - return self.connection.lrange( - self.waitlist_key, 0, self.connection.llen(self.waitlist_key) - 1) - def __str__(self): return '' % (self.id, self.description) diff --git a/rq/queue.py b/rq/queue.py index da0154a..e06904d 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -216,12 +216,12 @@ class Queue(object): def enqueue_waitlist(self, job): """Enqueues all jobs in the waitlist and clears it""" # TODO: can probably be pipelined - job_ids = job.get_waitlist() - for job_id in job.get_waitlist(): + while True: + job_id = self.connection.lpop(job.waitlist_key) + if job_id is None: + break waitlisted_job = Job.fetch(job_id, connection=self.connection) self.enqueue_job(waitlisted_job) - if job_ids: - self.connection.delete(job.waitlist_key) def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/tests/test_job.py b/tests/test_job.py index 87f649e..cafc8ad 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -273,12 +273,3 @@ class TestJob(RQTestCase): job.save() job.register_dependency() self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id) - - def test_get_waitlist(self): - """Test that all waitlisted job ids are fetched""" - job = Job.create(func=say_hello) - self.assertEqual(job.get_waitlist(), []) - self.testconn.lpush(job.waitlist_key, 'id_1') - self.assertEqual(job.get_waitlist(), ['id_1']) - self.testconn.lpush(job.waitlist_key, 'id_2') - self.assertEqual(job.get_waitlist(), ['id_2', 'id_1']) From 2e826e2b1f822a1c084c8239014160809162b1d2 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 28 Apr 2013 11:27:50 +0700 Subject: [PATCH 05/16] Internally renamed the term "parent" to "dependency". --- rq/job.py | 40 ++++++++++++++++++++-------------------- rq/queue.py | 2 +- tests/test_job.py | 14 +++++++------- tests/test_queue.py | 4 ++-- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/rq/job.py b/rq/job.py index fbaa424..9f5d272 100644 --- a/rq/job.py +++ b/rq/job.py @@ -64,7 +64,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, parent=None): + result_ttl=None, status=None, dependency=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -87,9 +87,9 @@ class Job(object): job.description = job.get_call_string() job.result_ttl = result_ttl job._status = status - # parent could be job instance or id - if parent is not None: - job._parent_id = parent.id if isinstance(parent, Job) else parent + # dependency could be job instance or id + if dependency is not None: + job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency return job @property @@ -123,17 +123,17 @@ class Job(object): return self.status == Status.STARTED @property - def parent(self): - """Returns a job's parent. To avoid repeated Redis fetches, we cache - job.parent as job._parent. + def dependency(self): + """Returns a job's dependency. To avoid repeated Redis fetches, we cache + job.dependency as job._dependency. """ - if self._parent_id is None: + if self._dependency_id is None: return None - if hasattr(self, '_parent'): - return self._parent - job = Job.fetch(self._parent_id, connection=self.connection) + if hasattr(self, '_dependency'): + return self._dependency + job = Job.fetch(self._dependency_id, connection=self.connection) job.refresh() - self._parent = job + self._dependency = job return job @property @@ -202,7 +202,7 @@ class Job(object): self.timeout = None self.result_ttl = None self._status = None - self._parent_id = None + self._dependency_id = None self.meta = {} @@ -313,7 +313,7 @@ class Job(object): self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = obj.get('status') if obj.get('status') else None - self._parent_id = obj.get('parent_id', None) + self._dependency_id = obj.get('dependency_id', None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def save(self, pipeline=None): @@ -344,8 +344,8 @@ class Job(object): obj['result_ttl'] = self.result_ttl if self._status is not None: obj['status'] = self._status - if self._parent_id is not None: - obj['parent_id'] = self._parent_id + if self._dependency_id is not None: + obj['dependency_id'] = self._dependency_id if self.meta: obj['meta'] = dumps(self.meta) @@ -415,15 +415,15 @@ class Job(object): def register_dependency(self): """Jobs may have a waitlist. Jobs in this waitlist are enqueued - only if the parent job is successfully performed. We maintain this + only if the dependency job is successfully performed. We maintain this waitlist in Redis, with key that looks something like: rq:job:job_id:waitlist = ['job_id_1', 'job_id_2'] - This method puts the job on it's parent's waitlist. + This method puts the job on it's dependency's waitlist. """ # TODO: This can probably be pipelined - self.connection.rpush(Job.waitlist_key_for(self._parent_id), self.id) + self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id) def __str__(self): return '' % (self.id, self.description) @@ -459,7 +459,7 @@ class Job(object): 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'result_ttl', '_status', 'status', - '_parent_id', '_parent', 'parent', 'meta')) + '_dependency_id', '_dependency', 'dependency', 'meta')) if name in private_attrs: object.__setattr__(self, name, value) diff --git a/rq/queue.py b/rq/queue.py index e06904d..486af1a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -128,7 +128,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, - parent=after) + dependency=after) # If job depends on an unfinished job, register itself on it's # parent's waitlist instead of enqueueing it diff --git a/tests/test_job.py b/tests/test_job.py index cafc8ad..83a6eae 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -135,17 +135,17 @@ class TestJob(RQTestCase): """Storing jobs with parent job, either instance or key.""" parent_job = Job.create(func=some_calculation) parent_job.save() - job = Job.create(func=some_calculation, parent=parent_job) + job = Job.create(func=some_calculation, dependency=parent_job) job.save() stored_job = Job.fetch(job.id) - self.assertEqual(stored_job._parent_id, parent_job.id) - self.assertEqual(stored_job.parent, parent_job) + self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job.dependency, parent_job) - job = Job.create(func=some_calculation, parent=parent_job.id) + job = Job.create(func=some_calculation, dependency=parent_job.id) job.save() stored_job = Job.fetch(job.id) - self.assertEqual(stored_job._parent_id, parent_job.id) - self.assertEqual(stored_job.parent, parent_job) + self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job.dependency, parent_job) def test_store_then_fetch(self): """Store, then fetch.""" @@ -269,7 +269,7 @@ class TestJob(RQTestCase): def test_register_dependency(self): """Test that jobs updates the correct job waitlist""" job = Job.create(func=say_hello) - job._parent_id = 'id' + job._dependency_id = 'id' job.save() job.register_dependency() self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id) diff --git a/tests/test_queue.py b/tests/test_queue.py index 9304e42..83a129e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -242,10 +242,10 @@ class TestQueue(RQTestCase): q = Queue() parent_job = Job.create(func=say_hello) parent_job.save() - job_1 = Job.create(func=say_hello, parent=parent_job) + job_1 = Job.create(func=say_hello, dependency=parent_job) job_1.save() job_1.register_dependency() - job_2 = Job.create(func=say_hello, parent=parent_job) + job_2 = Job.create(func=say_hello, dependency=parent_job) job_2.save() job_2.register_dependency() From 6ee45597ca589708ed5d4f8f68b84e4eb1925dab Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 7 May 2013 18:26:08 +0700 Subject: [PATCH 06/16] Don't fail if job dependency is modified during enqueue process. --- rq/exceptions.py | 5 +---- rq/queue.py | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 7f8df37..25e4f0e 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -15,9 +15,6 @@ class UnpickleError(Exception): super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data + class DequeueTimeout(Exception): pass - - -class EnqueueError(Exception): - pass \ No newline at end of file diff --git a/rq/queue.py b/rq/queue.py index 0e8bdb7..8a60102 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,7 @@ import times from .connections import resolve_connection from .job import Job, Status -from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError, +from .exceptions import (DequeueTimeout, InvalidJobOperationError, NoSuchJobError, UnpickleError) from .compat import total_ordering @@ -145,20 +145,21 @@ class Queue(object): dependency=after) # If job depends on an unfinished job, register itself on it's - # parent's waitlist instead of enqueueing it + # parent's waitlist instead of enqueueing it. + # If WatchError is raised in the process, that means something else is + # modifying the dependency. In this case we simply retry if after is not None: with self.connection.pipeline() as pipe: - try: - pipe.watch(after.key) - if after.status != Status.FINISHED: - job.register_dependency() - job.save() - return job - except WatchError: - raise EnqueueError( - 'Parent job (%s) modified during enqueue process. ' + - 'Bailing out to avoid race conditions' % after.id - ) + while True: + try: + pipe.watch(after.key) + if after.status != Status.FINISHED: + job.register_dependency() + job.save() + return job + break + except WatchError: + continue return self.enqueue_job(job, timeout=timeout) From a0d46c93f375c38ccd55828d1f0645c25c20cb3e Mon Sep 17 00:00:00 2001 From: Devi Date: Mon, 3 Jun 2013 21:46:25 +0530 Subject: [PATCH 07/16] take REDIS_URL from settings if exists --- rq/scripts/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index fc31d09..6c567ac 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -30,6 +30,9 @@ def read_config_file(module): def setup_default_arguments(args, settings): """ Sets up args from settings or defaults """ + if args.url is None: + args.url = settings.get('REDIS_URL', 'redis://localhost:6379/0') + if args.host is None: args.host = settings.get('REDIS_HOST', 'localhost') From c4e7c1799471bd521e409145099094e8e5bd49c0 Mon Sep 17 00:00:00 2001 From: Devi Date: Mon, 17 Jun 2013 14:49:53 +0530 Subject: [PATCH 08/16] deprecate use of host/db/port options for Redis --- rq/scripts/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 6c567ac..6fc89e4 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -1,5 +1,6 @@ import importlib import redis +from warnings import warn from rq import use_connection @@ -31,7 +32,12 @@ def read_config_file(module): def setup_default_arguments(args, settings): """ Sets up args from settings or defaults """ if args.url is None: - args.url = settings.get('REDIS_URL', 'redis://localhost:6379/0') + args.url = settings.get('REDIS_URL') + + if (args.host or args.port or args.socket or args.db or args.password): + warn('Host, port, db, password options for Redis will not be \ + supported in future versions of RQ. \ + Please use `REDIS_URL` or `--url` instead.', DeprecationWarning) if args.host is None: args.host = settings.get('REDIS_HOST', 'localhost') From a29661907484a638ac56ffc71b30c6a4e9ccc146 Mon Sep 17 00:00:00 2001 From: Sylvain Zimmer Date: Fri, 30 Aug 2013 01:44:46 +0200 Subject: [PATCH 09/16] Split Job.dump() and Job.save() --- rq/job.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/rq/job.py b/rq/job.py index fdc4b57..274b631 100644 --- a/rq/job.py +++ b/rq/job.py @@ -291,11 +291,8 @@ class Job(object): self._status = as_text(obj.get('status') if obj.get('status') else None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} - def save(self, pipeline=None): - """Persists the current job instance to its corresponding Redis key.""" - key = self.key - connection = pipeline if pipeline is not None else self.connection - + def dump(self): + """Returns a serialization of the current job instance""" obj = {} obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') @@ -322,7 +319,14 @@ class Job(object): if self.meta: obj['meta'] = dumps(self.meta) - connection.hmset(key, obj) + return obj + + def save(self, pipeline=None): + """Persists the current job instance to its corresponding Redis key.""" + key = self.key + connection = pipeline if pipeline is not None else self.connection + + connection.hmset(key, self.dump()) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -379,13 +383,13 @@ class Job(object): - If it's a positive number, set the job to expire in X seconds. - If result_ttl is negative, don't set an expiry to it (persist forever) - """ + """ if ttl == 0: self.cancel() elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) - + def __str__(self): return '' % (self.id, self.description) From 1b558704d390579b4aab6e571b5e49d8236939fc Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 2 Sep 2013 22:51:26 +0200 Subject: [PATCH 10/16] Fix: COMPACT_QUEUE should be a unique key. This fixes #230. Thanks, @sylvinus. --- rq/queue.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 2404eea..e5b3b04 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,4 +1,6 @@ import times +import uuid + from .connections import resolve_connection from .job import Job, Status from .exceptions import (NoSuchJobError, UnpickleError, @@ -114,7 +116,7 @@ class Queue(object): """Removes all "dead" jobs from the queue by cycling through it, while guarantueeing FIFO semantics. """ - COMPACT_QUEUE = 'rq:queue:_compact' + COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) self.connection.rename(self.key, COMPACT_QUEUE) while True: From 12b6b3200e83636538a6e85d88b2ae02d3ec8999 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 2 Sep 2013 22:54:49 +0200 Subject: [PATCH 11/16] Update changelog. --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 7cae844..7578083 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,9 @@ - Ability to provide a custom job description (instead of using the default function invocation hint). Thanks, İbrahim. +- Temporary key for the compact queue is now randomly generated, which should + avoid name clashes for concurrent compact actions. + ### 0.3.11 (August 23th, 2013) From 537476b488dc8ad91c105907f04b9939f5f206b0 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 2 Sep 2013 23:02:24 +0200 Subject: [PATCH 12/16] PEP8ify. --- rq/job.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index 274b631..a168738 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,7 +4,7 @@ import times from uuid import uuid4 try: from cPickle import loads, dumps, UnpicklingError -except ImportError: # noqa +except ImportError: # noqa from pickle import loads, dumps, UnpicklingError # noqa from .local import LocalStack from .connections import resolve_connection @@ -16,8 +16,9 @@ def enum(name, *sequential, **named): values = dict(zip(sequential, range(len(sequential))), **named) return type(name, (), values) -Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', - STARTED='started') +Status = enum('Status', + QUEUED='queued', FINISHED='finished', FAILED='failed', + STARTED='started') def unpickle(pickled_string): @@ -287,7 +288,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} @@ -354,7 +355,6 @@ class Job(object): assert self.id == _job_stack.pop() return self._result - def get_ttl(self, default_ttl=None): """Returns ttl for a job that determines how long a job and its result will be persisted. In the future, this method will also be responsible @@ -390,7 +390,6 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) - def __str__(self): return '' % (self.id, self.description) From 4d92079694f33ae14b4ce15e78db06cbb3badda0 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 2 Sep 2013 23:05:18 +0200 Subject: [PATCH 13/16] PEP8ify. --- rq/connections.py | 14 ++++++-------- rq/decorators.py | 6 +++--- rq/exceptions.py | 1 + rq/local.py | 7 ++++--- rq/timeouts.py | 2 +- rq/utils.py | 7 +++---- rq/worker.py | 38 ++++++++++++++++---------------------- 7 files changed, 34 insertions(+), 41 deletions(-) diff --git a/rq/connections.py b/rq/connections.py index f4a72e4..ee07070 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -18,8 +18,8 @@ def Connection(connection=None): finally: popped = pop_connection() assert popped == connection, \ - 'Unexpected Redis connection was popped off the stack. ' \ - 'Check your Redis connection setup.' + 'Unexpected Redis connection was popped off the stack. ' \ + 'Check your Redis connection setup.' def push_connection(redis): @@ -37,7 +37,7 @@ def use_connection(redis=None): use of use_connection() and stacked connection contexts. """ assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection().' + 'You should not mix Connection contexts with use_connection().' release_local(_connection_stack) if redis is None: @@ -61,13 +61,11 @@ def resolve_connection(connection=None): connection = get_current_connection() if connection is None: - raise NoRedisConnectionException( - 'Could not resolve a Redis connection.') + raise NoRedisConnectionException('Could not resolve a Redis connection.') return connection _connection_stack = LocalStack() -__all__ = ['Connection', - 'get_current_connection', 'push_connection', 'pop_connection', - 'use_connection'] +__all__ = ['Connection', 'get_current_connection', 'push_connection', + 'pop_connection', 'use_connection'] diff --git a/rq/decorators.py b/rq/decorators.py index d57b6cc..b433904 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -4,10 +4,10 @@ from .connections import resolve_connection from .worker import DEFAULT_RESULT_TTL from rq.compat import string_types -class job(object): +class job(object): def __init__(self, queue, connection=None, timeout=None, - result_ttl=DEFAULT_RESULT_TTL): + result_ttl=DEFAULT_RESULT_TTL): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string @@ -32,6 +32,6 @@ class job(object): else: queue = self.queue return queue.enqueue_call(f, args=args, kwargs=kwargs, - timeout=self.timeout, result_ttl=self.result_ttl) + timeout=self.timeout, result_ttl=self.result_ttl) f.delay = delay return f diff --git a/rq/exceptions.py b/rq/exceptions.py index 982a580..25e4f0e 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -15,5 +15,6 @@ class UnpickleError(Exception): super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data + class DequeueTimeout(Exception): pass diff --git a/rq/local.py b/rq/local.py index 555a6d1..61f896f 100644 --- a/rq/local.py +++ b/rq/local.py @@ -13,13 +13,13 @@ # current thread ident. try: from greenlet import getcurrent as get_ident -except ImportError: # noqa +except ImportError: # noqa try: from thread import get_ident # noqa - except ImportError: # noqa + except ImportError: # noqa try: from _thread import get_ident # noqa - except ImportError: # noqa + except ImportError: # noqa from dummy_thread import get_ident # noqa @@ -119,6 +119,7 @@ class LocalStack(object): def _get__ident_func__(self): return self._local.__ident_func__ + def _set__ident_func__(self, value): # noqa object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) diff --git a/rq/timeouts.py b/rq/timeouts.py index d26528a..f1e1848 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -33,7 +33,7 @@ class death_penalty_after(object): def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % self._timeout) + 'value (%d seconds).' % self._timeout) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises diff --git a/rq/utils.py b/rq/utils.py index 44bbe65..8219c9f 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -16,8 +16,7 @@ def gettermsize(): def ioctl_GWINSZ(fd): try: import fcntl, termios, struct # noqa - cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, - '1234')) + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr @@ -53,7 +52,7 @@ class _Colorizer(object): self.codes["overline"] = esc + "06m" dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", - "purple", "teal", "lightgray"] + "purple", "teal", "lightgray"] light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] @@ -139,7 +138,7 @@ class ColorizingStreamHandler(logging.StreamHandler): def __init__(self, exclude=None, *args, **kwargs): self.exclude = exclude - if is_python_version((2,6)): + if is_python_version((2, 6)): logging.StreamHandler.__init__(self, *args, **kwargs) else: super(ColorizingStreamHandler, self).__init__(*args, **kwargs) diff --git a/rq/worker.py b/rq/worker.py index 3ba4250..64f6501 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -43,9 +43,9 @@ def iterable(x): def compact(l): return [x for x in l if x is not None] -_signames = dict((getattr(signal, signame), signame) \ - for signame in dir(signal) \ - if signame.startswith('SIG') and '_' not in signame) +_signames = dict((getattr(signal, signame), signame) + for signame in dir(signal) + if signame.startswith('SIG') and '_' not in signame) def signal_name(signum): @@ -68,8 +68,8 @@ class Worker(object): if connection is None: connection = get_current_connection() reported_working = connection.smembers(cls.redis_workers_keys) - workers = [cls.find_by_key(as_text(key), connection) for key in - reported_working] + workers = [cls.find_by_key(as_text(key), connection) + for key in reported_working] return compact(workers) @classmethod @@ -95,13 +95,12 @@ class Worker(object): worker._state = connection.hget(worker.key, 'state') or '?' if queues: worker.queues = [Queue(queue, connection=connection) - for queue in queues.split(',')] + for queue in queues.split(',')] return worker - def __init__(self, queues, name=None, - default_result_ttl=DEFAULT_RESULT_TTL, connection=None, - exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa + default_result_ttl=DEFAULT_RESULT_TTL, connection=None, + exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -193,9 +192,8 @@ class Worker(object): self.log.debug('Registering birth of worker %s' % (self.name,)) if self.connection.exists(self.key) and \ not self.connection.hexists(self.key, 'death'): - raise ValueError( - 'There exists an active worker named \'%s\' ' - 'already.' % (self.name,)) + raise ValueError('There exists an active worker named \'%s\' ' + 'already.' % (self.name,)) key = self.key now = time.time() queues = ','.join(self.queue_names()) @@ -304,8 +302,8 @@ class Worker(object): qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') - self.log.info('*** Listening on %s...' % \ - green(', '.join(qnames))) + self.log.info('*** Listening on %s...' % + green(', '.join(qnames))) timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -324,7 +322,7 @@ class Worker(object): # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + blue(job.description), job.id)) self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) @@ -336,19 +334,17 @@ class Worker(object): self.register_death() return did_perform_work - def dequeue_job_and_maintain_ttl(self, timeout): while True: try: return Queue.dequeue_any(self.queues, timeout, - connection=self.connection) + connection=self.connection) except DequeueTimeout: pass self.log.debug('Sending heartbeat to prevent worker timeout.') self.connection.expire(self.key, self.default_worker_ttl) - def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes @@ -443,12 +439,10 @@ class Worker(object): return True - def handle_exception(self, job, *exc_info): """Walks the exception handler stack to delegate exception handling.""" - exc_string = ''.join( - traceback.format_exception_only(*exc_info[:2]) + - traceback.format_exception(*exc_info)) + exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) + + traceback.format_exception(*exc_info)) self.log.error(exc_string) for handler in reversed(self._exc_handlers): From eb5bb6329c573bd4df65554ddac96543fa11370f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 2 Sep 2013 23:12:01 +0200 Subject: [PATCH 14/16] PEP8ify. --- rq/scripts/__init__.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 6fc89e4..575a8cb 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -6,27 +6,28 @@ from rq import use_connection def add_standard_arguments(parser): parser.add_argument('--config', '-c', default=None, - help='Module containing RQ settings.') + help='Module containing RQ settings.') parser.add_argument('--url', '-u', default=None, - help='URL describing Redis connection details. ' - 'Overrides other connection arguments if supplied.') + help='URL describing Redis connection details. ' + 'Overrides other connection arguments if supplied.') parser.add_argument('--host', '-H', default=None, - help='The Redis hostname (default: localhost)') + help='The Redis hostname (default: localhost)') parser.add_argument('--port', '-p', default=None, - help='The Redis portnumber (default: 6379)') + help='The Redis portnumber (default: 6379)') parser.add_argument('--db', '-d', type=int, default=None, - help='The Redis database (default: 0)') + help='The Redis database (default: 0)') parser.add_argument('--password', '-a', default=None, - help='The Redis password (default: None)') + help='The Redis password (default: None)') parser.add_argument('--socket', '-s', default=None, - help='The Redis Unix socket') + help='The Redis Unix socket') + def read_config_file(module): """Reads all UPPERCASE variables defined in the given module file.""" settings = importlib.import_module(module) return dict([(k, v) - for k, v in settings.__dict__.items() - if k.upper() == k]) + for k, v in settings.__dict__.items() + if k.upper() == k]) def setup_default_arguments(args, settings): @@ -35,9 +36,9 @@ def setup_default_arguments(args, settings): args.url = settings.get('REDIS_URL') if (args.host or args.port or args.socket or args.db or args.password): - warn('Host, port, db, password options for Redis will not be \ - supported in future versions of RQ. \ - Please use `REDIS_URL` or `--url` instead.', DeprecationWarning) + warn('Host, port, db, password options for Redis will not be ' + 'supported in future versions of RQ. ' + 'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning) if args.host is None: args.host = settings.get('REDIS_HOST', 'localhost') @@ -63,5 +64,5 @@ def setup_redis(args): redis_conn = redis.StrictRedis.from_url(args.url) else: redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, - password=args.password, unix_socket_path=args.socket) + password=args.password, unix_socket_path=args.socket) use_connection(redis_conn) From fd44ad39d49e1ea978384d1480653781fa75175c Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 3 Sep 2013 08:03:50 +0700 Subject: [PATCH 15/16] Python 3 fixes for job dependency stuff. --- rq/job.py | 2 +- rq/queue.py | 2 +- tests/test_job.py | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/job.py b/rq/job.py index 7eaf532..a10c340 100644 --- a/rq/job.py +++ b/rq/job.py @@ -318,7 +318,7 @@ class Job(object): self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) - self._dependency_id = obj.get('dependency_id', None) + self._dependency_id = as_text(obj.get('dependency_id', None)) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def dump(self): diff --git a/rq/queue.py b/rq/queue.py index 79442ac..e9f3c7f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -240,7 +240,7 @@ class Queue(object): """Enqueues all jobs in the waitlist and clears it""" # TODO: can probably be pipelined while True: - job_id = self.connection.lpop(job.waitlist_key) + job_id = as_text(self.connection.lpop(job.waitlist_key)) if job_id is None: break waitlisted_job = Job.fetch(job_id, connection=self.connection) diff --git a/tests/test_job.py b/tests/test_job.py index 13ef368..bc70d5e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,6 +7,7 @@ try: from cPickle import loads except ImportError: from pickle import loads +from rq.compat import as_text from rq.job import Job, get_current_job from rq.exceptions import NoSuchJobError, UnpickleError from rq.queue import Queue @@ -288,4 +289,4 @@ class TestJob(RQTestCase): job._dependency_id = 'id' job.save() job.register_dependency() - self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id) + self.assertEqual(as_text(self.testconn.lpop('rq:job:id:waitlist')), job.id) From 2da46565aa9f0a8172b1d30b093e294d9fd8ea1d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 3 Sep 2013 14:18:16 +0200 Subject: [PATCH 16/16] Add job dependencies to the changelog. --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7578083..e305be8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ -### 0.3.12 +### 0.4.0 (not released yet) +- Job dependencies! Thanks, Selwin. + - Ability to provide a custom job description (instead of using the default function invocation hint). Thanks, İbrahim.