From 3467868f1f15392970d2d90d15021c7f6fb57e1c Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 11 Nov 2014 15:09:06 -0500 Subject: [PATCH 01/28] allow depends_on to be a job id or a job itself Other parts of the code (i.e.: the `.create()` method) allow the `depends_on` kwarg to be a `Job` object *or* a job id. This is an attempt to allow that same idea within the `.enqueue_call()` method for a queue. Since this part of the code requires actually knowing the precise redis key for the job that's depended on, my intuition is that a `.fetch()` is the only way to solve this. --- rq/queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index 1694268..ff40e03 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -189,6 +189,8 @@ class Queue(object): # If WatchError is raised in the process, that means something else is # modifying the dependency. In this case we simply retry if depends_on is not None: + if not isinstance(depends_on, Job): + depends_on = Job.fetch(id=depends_on, connection=self.connection) with self.connection.pipeline() as pipe: while True: try: From 14d118624156bfc625f07153a24818a073dcdae7 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 17 Nov 2014 15:04:20 -0500 Subject: [PATCH 02/28] use internal job_class for check --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index ff40e03..ff5f860 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -189,7 +189,7 @@ class Queue(object): # If WatchError is raised in the process, that means something else is # modifying the dependency. In this case we simply retry if depends_on is not None: - if not isinstance(depends_on, Job): + if not isinstance(depends_on, self.job_class): depends_on = Job.fetch(id=depends_on, connection=self.connection) with self.connection.pipeline() as pipe: while True: From 629b3929244d12bdd5b54ede5caf6a22f9718620 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 17 Nov 2014 15:04:58 -0500 Subject: [PATCH 03/28] add PyCharm .idea folder to .gitignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ff9c616..25a046e 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,7 @@ /build .tox .vagrant -Vagrantfile \ No newline at end of file +Vagrantfile + +# PyCharm +.idea From 82729c98dcb278c56ead02f5e318210228e3cc02 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 17 Nov 2014 15:22:48 -0500 Subject: [PATCH 04/28] test for id based job dependancies --- tests/test_queue.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index e61568e..8990e35 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -352,6 +352,23 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + def test_enqueue_job_with_dependency_by_id(self): + """Enqueueing jobs should work as expected by id as well as job-objects.""" + parent_job = Job.create(func=say_hello) + # We need to save the job for the ID to exist in redis + parent_job.save() + + q = Queue() + q.enqueue_call(say_hello, depends_on=parent_job.id) + self.assertEqual(q.job_ids, []) + + # Jobs dependent on finished jobs are immediately enqueued + parent_job.set_status(Status.FINISHED) + parent_job.save() + job = q.enqueue_call(say_hello, depends_on=parent_job.id) + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + def test_enqueue_job_with_dependency_and_timeout(self): """Jobs still know their specified timeout after being scheduled as a dependency.""" # Job with unfinished dependency is not immediately enqueued From af4b0436c1b3b397183a5a9d7b1d3f98e3f0fce0 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 18 Nov 2014 12:09:14 -0500 Subject: [PATCH 05/28] instantiate a job instead of fetch this removes the need for the depended on job.id already being saved --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index ff5f860..7dfb0f8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -190,7 +190,7 @@ class Queue(object): # modifying the dependency. In this case we simply retry if depends_on is not None: if not isinstance(depends_on, self.job_class): - depends_on = Job.fetch(id=depends_on, connection=self.connection) + depends_on = Job(id=depends_on, connection=self.connection) with self.connection.pipeline() as pipe: while True: try: From b14f739dfe888f9fd845a489c398cb5adcd58ab4 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 18 Nov 2014 12:09:22 -0500 Subject: [PATCH 06/28] no need for that save anymore --- tests/test_queue.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8990e35..ed42204 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -355,9 +355,7 @@ class TestQueue(RQTestCase): def test_enqueue_job_with_dependency_by_id(self): """Enqueueing jobs should work as expected by id as well as job-objects.""" parent_job = Job.create(func=say_hello) - # We need to save the job for the ID to exist in redis - parent_job.save() - + q = Queue() q.enqueue_call(say_hello, depends_on=parent_job.id) self.assertEqual(q.job_ids, []) From 55c541bc5914cc09cb634c815337e533c37180db Mon Sep 17 00:00:00 2001 From: glaslos Date: Mon, 24 Nov 2014 14:12:51 +0100 Subject: [PATCH 07/28] added job ttl to queue.enqueue() --- rq/job.py | 17 ++++++++++++++--- rq/queue.py | 9 +++++---- rq/worker.py | 2 +- tests/test_job.py | 20 +++++++++++++++----- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index e8080f8..a026fda 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,6 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.job_ttl = job_ttl job.timeout = timeout job._status = status @@ -311,6 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.job_ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -455,6 +457,8 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) + if self.job_ttl: + connection.expire(key, self.job_ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -491,8 +495,15 @@ class Job(object): return self._result def get_ttl(self, default_ttl=None): - """Returns ttl for a job that determines how long a job and its result - will be persisted. In the future, this method will also be responsible + """Returns ttl for a job that determines how long a job will be + persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.job_ttl is None else self.job_ttl + + def get_result_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a jobs result will + be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.result_ttl is None else self.result_ttl diff --git a/rq/queue.py b/rq/queue.py index ff5f860..9a2c9af 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,8 +167,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None, - job_id=None): + result_ttl=None, job_ttl=None, description=None, + depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED, + result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,6 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) + job_ttl = kwargs.pop('job_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -238,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, + timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..20cd833 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -508,7 +508,7 @@ class Worker(object): self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_ttl(self.default_result_ttl) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() job._status = Status.FINISHED diff --git a/tests/test_job.py b/tests/test_job.py index 28fad40..27cebb0 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -290,17 +290,27 @@ class TestJob(RQTestCase): self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) - def test_get_ttl(self): + def test_get_result_ttl(self): + """Getting job result TTL.""" + job_result_ttl = 1 + default_ttl = 2 + job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) + self.assertEqual(job.get_result_ttl(), job_result_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_result_ttl(), None) + + def test_get_job_ttl(self): """Getting job TTL.""" job_ttl = 1 - default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_ttl) + job = Job.create(func=say_hello, job_ttl=job_ttl) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), job_ttl) job = Job.create(func=say_hello) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_ttl(), None) def test_cleanup(self): From 8fa184b86b87528515e9e8a7e0e0afe219466034 Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Tue, 25 Nov 2014 19:57:49 -0500 Subject: [PATCH 08/28] Ensure the FinishedJobRegistry honors an 'infinite' result_ttl of -1 --- rq/registry.py | 4 ++-- tests/test_registry.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index c59bf88..a1b89b2 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -28,8 +28,8 @@ class BaseRegistry(object): return self.connection.zcard(self.key) def add(self, job, timeout, pipeline=None): - """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" - score = current_timestamp() + timeout + """Adds a job to a registry with expiry time of now + timeout.""" + score = timeout if timeout == -1 else current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) diff --git a/tests/test_registry.py b/tests/test_registry.py index ce4a345..36b9792 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -27,6 +27,10 @@ class TestRegistry(RQTestCase): self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1002) + # Ensure that a timeout of -1 results in a score of -1 + self.registry.add(job, -1) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), -1) + # Ensure that job is properly removed from sorted set self.registry.remove(job) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) From 8a3fd91e2ec2bc09af30652f3ed9b8ce846d8922 Mon Sep 17 00:00:00 2001 From: glaslos Date: Wed, 26 Nov 2014 10:12:52 +0100 Subject: [PATCH 09/28] renaming job.job_ttl to job.ttl --- rq/job.py | 12 ++++++------ rq/queue.py | 8 ++++---- tests/test_job.py | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index a026fda..865ca30 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,7 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl - job.job_ttl = job_ttl + job.ttl = ttl job.timeout = timeout job._status = status @@ -312,7 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None - self.job_ttl = None + self.ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -457,8 +457,8 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) - if self.job_ttl: - connection.expire(key, self.job_ttl) + if self.ttl: + connection.expire(key, self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -499,7 +499,7 @@ class Job(object): persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ - return default_ttl if self.job_ttl is None else self.job_ttl + return default_ttl if self.ttl is None else self.ttl def get_result_ttl(self, default_ttl=None): """Returns ttl for a job that determines how long a jobs result will diff --git a/rq/queue.py b/rq/queue.py index 9a2c9af..ccd4e6b 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,7 +167,7 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, job_ttl=None, description=None, + result_ttl=None, ttl=None, description=None, depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED, + result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,7 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) - job_ttl = kwargs.pop('job_ttl', None) + ttl = kwargs.pop('ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -239,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl, + timeout=timeout, result_ttl=result_ttl, ttl=ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/tests/test_job.py b/tests/test_job.py index 27cebb0..34859a7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -305,10 +305,10 @@ class TestJob(RQTestCase): def test_get_job_ttl(self): """Getting job TTL.""" - job_ttl = 1 - job = Job.create(func=say_hello, job_ttl=job_ttl) + ttl = 1 + job = Job.create(func=say_hello, ttl=ttl) job.save() - self.assertEqual(job.get_ttl(), job_ttl) + self.assertEqual(job.get_ttl(), ttl) job = Job.create(func=say_hello) job.save() self.assertEqual(job.get_ttl(), None) From c6a83eaa824bc8cd4459356afd1569b2ef7f95f7 Mon Sep 17 00:00:00 2001 From: glaslos Date: Mon, 1 Dec 2014 11:37:54 +0100 Subject: [PATCH 10/28] calling self.cleanup to set job expiration --- rq/job.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 865ca30..8067a01 100644 --- a/rq/job.py +++ b/rq/job.py @@ -457,8 +457,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) - if self.ttl: - connection.expire(key, self.ttl) + self.cleanup(self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -524,14 +523,16 @@ class Job(object): def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its - result depends on the value of result_ttl: - - If result_ttl is 0, cleanup the job immediately. + result depends on the value of ttl: + - If ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist + - If ttl is negative, don't set an expiry to it (persist forever) """ if ttl == 0: self.cancel() + elif not ttl: + return elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) From 9f699d758d125a2e8b0b3f962039643ac6793e27 Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:19:37 -0500 Subject: [PATCH 11/28] Any negative number should cause a job to live forever in a registry. While the documentation explicitly mentions ttl=-1, this matches better the behaviour of the job.cleanup() method. --- rq/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/registry.py b/rq/registry.py index a1b89b2..c620eb9 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -29,7 +29,7 @@ class BaseRegistry(object): def add(self, job, timeout, pipeline=None): """Adds a job to a registry with expiry time of now + timeout.""" - score = timeout if timeout == -1 else current_timestamp() + timeout + score = timeout if timeout < 0 else current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) From 6ab7070a923f6c54c6f253ee6814cdf4dfd0663c Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:23:38 -0500 Subject: [PATCH 12/28] Rename registry timeout to registry ttl This matches job.cleanup() better and (I think) makes the purpose slightly clearer. --- rq/registry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index c620eb9..beabbad 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -27,9 +27,9 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, timeout, pipeline=None): - """Adds a job to a registry with expiry time of now + timeout.""" - score = timeout if timeout < 0 else current_timestamp() + timeout + def add(self, job, ttl, pipeline=None): + """Adds a job to a registry with expiry time of now + ttl.""" + score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: return pipeline.zadd(self.key, score, job.id) From fc5295edfcfd589489a56d4f4e4849edc5e063d0 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 2 Dec 2014 11:02:14 +0100 Subject: [PATCH 13/28] Set busy state on parent while waiting for horse --- rq/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..51411b2 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -435,7 +435,9 @@ class Worker(object): self.procline('Forked %d at %d' % (child_pid, time.time())) while True: try: + self.set_state('busy') os.waitpid(child_pid, 0) + self.set_state('idle') break except OSError as e: # In case we encountered an OSError due to EINTR (which is From 09cab7a90d5898779ca57a433ebcaca5c630ced2 Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:50:30 -0500 Subject: [PATCH 14/28] Allow maximum job age to be specified when cleaning up or getting expired jobs from a registry. This ensures that all jobs cleaned from a started registry end up in the failed queue. --- rq/registry.py | 43 +++++++++++++++++++++++++++--------------- tests/test_registry.py | 17 +++++++++++++++-- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index beabbad..b4cf43f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,9 +8,6 @@ class BaseRegistry(object): """ Base implementation of job registry, implemented in Redis sorted set. Each job is stored as a key in the registry, scored by expiration time (unix timestamp). - - Jobs with scores are lower than current time is considered "expired" and - should be cleaned up. """ def __init__(self, name='default', connection=None): @@ -39,10 +36,16 @@ class BaseRegistry(object): connection = pipeline if pipeline is not None else self.connection return connection.zrem(self.key, job.id) - def get_expired_job_ids(self): - """Returns job ids whose score are less than current timestamp.""" + def get_expired_job_ids(self, timestamp=None): + """Returns job ids whose score are less than current timestamp. + + Returns ids for jobs with an expiry time earlier than timestamp, + specified as seconds since the Unix epoch. timestamp defaults to call + time if unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, current_timestamp())] + self.connection.zrangebyscore(self.key, 0, score)] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" @@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". """ def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) self.key = 'rq:wip:%s' % name - def cleanup(self): - """Remove expired jobs from registry and add them to FailedQueue.""" - job_ids = self.get_expired_job_ids() + def cleanup(self, timestamp=None): + """Remove expired jobs from registry and add them to FailedQueue. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. Removed jobs are added to the global failed job queue. + """ + score = timestamp if timestamp is not None else current_timestamp() + job_ids = self.get_expired_job_ids(score) if job_ids: failed_queue = FailedQueue(connection=self.connection) with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) - pipeline.zremrangebyscore(self.key, 0, current_timestamp()) + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() return job_ids @@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry): super(FinishedJobRegistry, self).__init__(name, connection) self.key = 'rq:finished:%s' % name - def cleanup(self): - """Remove expired jobs from registry.""" - self.connection.zremrangebyscore(self.key, 0, current_timestamp()) + def cleanup(self, timestamp=None): + """Remove expired jobs from registry. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() + self.connection.zremrangebyscore(self.key, 0, score) diff --git a/tests/test_registry.py b/tests/test_registry.py index 36b9792..26470e3 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -48,14 +48,22 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), + ['foo', 'bar']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 2, 'foo') + + self.registry.cleanup(1) + self.assertNotIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) @@ -103,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase): timestamp = current_timestamp() self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.registry.cleanup() - self.assertEqual(self.registry.get_job_ids(), ['bar']) + self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) + + self.registry.cleanup(timestamp + 20) + self.assertEqual(self.registry.get_job_ids(), ['baz']) + def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry.""" From de72f98fbe01d571ac6eea4eca8ecc2dcefbfdc0 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:17:32 -0500 Subject: [PATCH 15/28] test for workers equality --- tests/test_worker.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index c6d85ff..59f0b26 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -318,3 +318,12 @@ class TestWorker(RQTestCase): 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.description, '你好 世界!') + + def test_worker_hash_(self): + """Workers are hashed by their .name attribute""" + q = Queue('foo') + w1 = Worker([q], name="worker1") + w2 = Worker([q], name="worker2") + w3 = Worker([q], name="worker1") + worker_set = {w1, w2, w3} + self.assertEquals(len(worker_set), 2) From c301369c2eb45a0969aa32384ddc7115458f5da1 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:17:55 -0500 Subject: [PATCH 16/28] implement __eq__ and __hash__ for workers --- rq/worker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..d14bf88 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -580,6 +580,14 @@ class Worker(object): """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def __eq__(self, other): + if not isinstance(other, self.__class__): + raise TypeError('Cannot compare workers to other types (of workers)') + return self.name == other.name + + def __hash__(self): + return hash(self.name) + class SimpleWorker(Worker): def _install_signal_handlers(self, *args, **kwargs): From 0ad4cb3410483dae10381a781d14e3d485a91a08 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:29:55 -0500 Subject: [PATCH 17/28] comments --- rq/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index d14bf88..a2bbb2a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -581,11 +581,13 @@ class Worker(object): return self._exc_handlers.pop() def __eq__(self, other): + """Equality does not take the database/connection into account""" if not isinstance(other, self.__class__): raise TypeError('Cannot compare workers to other types (of workers)') return self.name == other.name def __hash__(self): + """The hash does not take the database/connection into account""" return hash(self.name) From 6ef9177a7d8a34f49483c80dfe3a8097832fe006 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 11:13:56 -0500 Subject: [PATCH 18/28] 2.6 compatible set syntax --- tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 59f0b26..6fa127f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -325,5 +325,5 @@ class TestWorker(RQTestCase): w1 = Worker([q], name="worker1") w2 = Worker([q], name="worker2") w3 = Worker([q], name="worker1") - worker_set = {w1, w2, w3} + worker_set = set([w1, w2, w3]) self.assertEquals(len(worker_set), 2) From d4b72d330d05d2a988de19056e3ec2d5794c1344 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:04:57 -0500 Subject: [PATCH 19/28] test for skip_queue mechanics --- tests/test_queue.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index ed42204..8bfde6d 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -459,3 +459,13 @@ class TestFailedQueue(RQTestCase): """Ensure custom job class assignment works as expected.""" q = Queue(job_class=CustomJob) self.assertEqual(q.job_class, CustomJob) + + def test_skip_queue(self): + """Ensure the skip_queue option functions""" + q = Queue('foo') + job1 = q.enqueue(say_hello) + job2 = q.enqueue(say_hello) + assert q.dequeue() == job1 + skip_job = q.enqueue(say_hello, skip_queue=True) + assert q.dequeue() == skip_job + assert q.dequeue() == job2 From f60e4884df50db2f65e5a35bd602c4074981cbab Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:05:12 -0500 Subject: [PATCH 20/28] a comment typo --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 7dfb0f8..a9049dd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -149,7 +149,7 @@ class Queue(object): def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while - guarantueeing FIFO semantics. + guaranteeing FIFO semantics. """ COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) From ac61f502a1ddb83aa56c761a9ef4adddcd222ee6 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:05:19 -0500 Subject: [PATCH 21/28] skip_queue functionality --- rq/queue.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a9049dd..ddb31e6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -161,14 +161,18 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None): - """Pushes a job ID on the corresponding Redis queue.""" + def push_job_id(self, job_id, pipeline=None, skip_queue=False): + """Pushes a job ID on the corresponding Redis queue. + 'skip_queue' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection - connection.rpush(self.key, job_id) + if skip_queue: + connection.lpush(self.key, job_id) + else: + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None, - job_id=None): + job_id=None, skip_queue=False): """Creates a job to represent the delayed function call and enqueues it. @@ -204,7 +208,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job) + return self.enqueue_job(job, skip_queue=skip_queue) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -231,6 +235,7 @@ class Queue(object): result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) + skip_queue = kwargs.pop('skip_queue', False) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -240,9 +245,9 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, description=description, depends_on=depends_on, - job_id=job_id) + job_id=job_id, skip_queue=skip_queue) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, set_meta_data=True, skip_queue=False): """Enqueues a job for delayed execution. If the `set_meta_data` argument is `True` (default), it will update @@ -262,7 +267,7 @@ class Queue(object): job.save() if self._async: - self.push_job_id(job.id) + self.push_job_id(job.id, skip_queue=skip_queue) else: job.perform() job.save() From db75958ad23d1ae16a028210a04bcd03e3a540b2 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Fri, 12 Dec 2014 12:25:33 -0500 Subject: [PATCH 22/28] use 'at_front' instead of 'skip_queue' --- rq/queue.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index ddb31e6..847319f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -161,18 +161,18 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None, skip_queue=False): + def push_job_id(self, job_id, pipeline=None, at_front=False): """Pushes a job ID on the corresponding Redis queue. - 'skip_queue' allows you to push the job onto the front instead of the back of the queue""" + 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection - if skip_queue: + if at_front: connection.lpush(self.key, job_id) else: connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None, - job_id=None, skip_queue=False): + job_id=None, at_front=False): """Creates a job to represent the delayed function call and enqueues it. @@ -208,7 +208,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job, skip_queue=skip_queue) + return self.enqueue_job(job, at_front=at_front) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -235,7 +235,7 @@ class Queue(object): result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) - skip_queue = kwargs.pop('skip_queue', False) + at_front = kwargs.pop('at_front', False) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -245,9 +245,9 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, description=description, depends_on=depends_on, - job_id=job_id, skip_queue=skip_queue) + job_id=job_id, at_front=at_front) - def enqueue_job(self, job, set_meta_data=True, skip_queue=False): + def enqueue_job(self, job, set_meta_data=True, at_front=False): """Enqueues a job for delayed execution. If the `set_meta_data` argument is `True` (default), it will update @@ -267,7 +267,7 @@ class Queue(object): job.save() if self._async: - self.push_job_id(job.id, skip_queue=skip_queue) + self.push_job_id(job.id, at_front=at_front) else: job.perform() job.save() From 82333d2ad5053876004ff57af209ed7ab447098a Mon Sep 17 00:00:00 2001 From: Jonathan Tushman Date: Thu, 9 Oct 2014 14:09:52 -0400 Subject: [PATCH 23/28] triggering shutdown by setting a redis flag --- .gitignore | 3 -- requirements.txt | 4 +-- rq/cli/cli.py | 45 +++++++++++++++++++++++++-- rq/cli/helpers.py | 7 +++-- rq/job.py | 21 ++++--------- rq/queue.py | 10 +++--- rq/suspension.py | 18 +++++++++++ rq/utils.py | 10 ++++++ rq/worker.py | 74 +++++++++++++++++++++++++++++++++----------- tests/test_cli.py | 52 ++++++++++++++++++++++++++++--- tests/test_queue.py | 12 +++---- tests/test_worker.py | 43 +++++++++++++++++++++---- 12 files changed, 236 insertions(+), 63 deletions(-) create mode 100644 rq/suspension.py diff --git a/.gitignore b/.gitignore index 25a046e..f95cc7b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,3 @@ .tox .vagrant Vagrantfile - -# PyCharm -.idea diff --git a/requirements.txt b/requirements.txt index 539b9a4..8da152d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis -click +redis==2.7.0 +click>=3.0.0 diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 18979dc..afb32ea 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue from rq.contrib.legacy import cleanup_ghosts from rq.exceptions import InvalidJobOperationError from rq.utils import import_attribute +from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, show_both, show_queues, show_workers) @@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.') +config_option = click.option('--config', '-c', help='Module containing RQ settings.') -def connect(url): + +def connect(url, config=None): + settings = read_config_file(config) if config else {} + url = url or settings.get('REDIS_URL') return StrictRedis.from_url(url or 'redis://localhost:6379/0') @@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): @main.command() @url_option -@click.option('--config', '-c', help='Module containing RQ settings.') +@config_option @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') @click.option('--name', '-n', help='Specify a different name') @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) + if is_suspended(conn): + click.secho("The worker has been paused, run reset_paused", fg='red') + sys.exit(1) + try: + queues = [queue_class(queue, connection=conn) for queue in queues] w = worker_class(queues, name=name, @@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, except ConnectionError as e: print(e) sys.exit(1) + + +@main.command() +@url_option +@config_option +@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int) +def suspend(url, config, duration): + """Suspends all workers, to resume run `rq resume`""" + if duration is not None and duration < 1: + click.echo("Duration must be an integer greater than 1") + sys.exit(1) + + connection = connect(url, config) + connection_suspend(connection, duration) + + if duration: + msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will + automatically resume""".format(duration) + click.echo(msg) + else: + click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") + + +@main.command() +@url_option +@config_option +def resume(url, config): + """Resumes processing of queues, that where suspended with `rq suspend`""" + connection = connect(url, config) + connection_resume(connection) + click.echo("Resuming workers.") diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index f25cc81..9ea4b58 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -8,7 +8,9 @@ from functools import partial import click from rq import Queue, Worker +from rq.worker import WorkerStatus from rq.logutils import setup_loghandlers +from rq.suspension import is_suspended red = partial(click.style, fg='red') green = partial(click.style, fg='green') @@ -39,8 +41,9 @@ def get_scale(x): def state_symbol(state): symbols = { - 'busy': red('busy'), - 'idle': green('idle'), + WorkerStatus.BUSY: red('busy'), + WorkerStatus.IDLE: green('idle'), + WorkerStatus.SUSPENDED: yellow('suspended'), } try: return symbols[state] diff --git a/rq/job.py b/rq/job.py index e8080f8..7b17492 100644 --- a/rq/job.py +++ b/rq/job.py @@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError, UnpickleError from .local import LocalStack -from .utils import import_attribute, utcformat, utcnow, utcparse +from .utils import import_attribute, utcformat, utcnow, utcparse, enum try: import cPickle as pickle @@ -25,16 +25,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) loads = pickle.loads -def enum(name, *sequential, **named): - values = dict(zip(sequential, range(len(sequential))), **named) - - # NOTE: Yes, we *really* want to cast using str() here. - # On Python 2 type() requires a byte string (which is str() on Python 2). - # On Python 3 it does not matter, so we'll use str(), which acts as - # a no-op. - return type(str(name), (), values) - -Status = enum('Status', +JobStatus = enum('JobStatus', QUEUED='queued', FINISHED='finished', FAILED='failed', STARTED='started') @@ -166,19 +157,19 @@ class Job(object): @property def is_finished(self): - return self.get_status() == Status.FINISHED + return self.get_status() == JobStatus.FINISHED @property def is_queued(self): - return self.get_status() == Status.QUEUED + return self.get_status() == JobStatus.QUEUED @property def is_failed(self): - return self.get_status() == Status.FAILED + return self.get_status() == JobStatus.FAILED @property def is_started(self): - return self.get_status() == Status.STARTED + return self.get_status() == JobStatus.STARTED @property def dependency(self): diff --git a/rq/queue.py b/rq/queue.py index ff5f860..61f5575 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function, import uuid from .connections import resolve_connection -from .job import Job, Status +from .job import Job, JobStatus from .utils import import_attribute, utcnow from .exceptions import (DequeueTimeout, InvalidJobOperationError, @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED, + result_ttl=result_ttl, status=JobStatus.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -195,7 +195,7 @@ class Queue(object): while True: try: pipe.watch(depends_on.key) - if depends_on.get_status() != Status.FINISHED: + if depends_on.get_status() != JobStatus.FINISHED: job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) pipe.execute() @@ -390,7 +390,7 @@ class Queue(object): class FailedQueue(Queue): def __init__(self, connection=None): - super(FailedQueue, self).__init__(Status.FAILED, connection=connection) + super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection) def quarantine(self, job, exc_info): """Puts the given Job in quarantine (i.e. put it on the failed @@ -417,7 +417,7 @@ class FailedQueue(Queue): if self.remove(job) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') - job.set_status(Status.QUEUED) + job.set_status(JobStatus.QUEUED) job.exc_info = None q = Queue(job.origin, connection=self.connection) q.enqueue_job(job) diff --git a/rq/suspension.py b/rq/suspension.py new file mode 100644 index 0000000..b734acd --- /dev/null +++ b/rq/suspension.py @@ -0,0 +1,18 @@ +WORKERS_SUSPENDED = 'rq:suspended' + + +def is_suspended(connection): + return connection.exists(WORKERS_SUSPENDED) + + +def suspend(connection, ttl=None): + """ttl = time to live in seconds. Default is no expiration + Note: If you pass in 0 it will invalidate right away + """ + connection.set(WORKERS_SUSPENDED, 1) + if ttl is not None: + connection.expire(WORKERS_SUSPENDED, ttl) + + +def resume(connection): + return connection.delete(WORKERS_SUSPENDED) \ No newline at end of file diff --git a/rq/utils.py b/rq/utils.py index 8233ac6..db56020 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -208,3 +208,13 @@ def first(iterable, default=None, key=None): def current_timestamp(): """Returns current UTC timestamp""" return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) + + +def enum(name, *sequential, **named): + values = dict(zip(sequential, range(len(sequential))), **named) + + # NOTE: Yes, we *really* want to cast using str() here. + # On Python 2 type() requires a byte string (which is str() on Python 2). + # On Python 3 it does not matter, so we'll use str(), which acts as + # a no-op. + return type(str(name), (), values) \ No newline at end of file diff --git a/rq/worker.py b/rq/worker.py index bf40a65..72f1aa4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -12,18 +12,20 @@ import sys import time import traceback import warnings +from datetime import datetime from rq.compat import as_text, string_types, text_type from .connections import get_current_connection from .exceptions import DequeueTimeout, NoQueueError -from .job import Job, Status +from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty -from .utils import import_attribute, make_colorizer, utcformat, utcnow +from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum from .version import VERSION from .registry import FinishedJobRegistry, StartedJobRegistry +from .suspension import is_suspended try: from procname import setprocname @@ -52,8 +54,8 @@ def compact(l): return [x for x in l if x is not None] _signames = dict((getattr(signal, signame), signame) - for signame in dir(signal) - if signame.startswith('SIG') and '_' not in signame) + for signame in dir(signal) + if signame.startswith('SIG') and '_' not in signame) def signal_name(signum): @@ -65,6 +67,12 @@ def signal_name(signum): return 'SIG_UNKNOWN' +WorkerStatus = enum('WorkerStatus', + STARTED='started', SUSPENDED='suspended', BUSY='busy', + IDLE='idle' + ) + + class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' @@ -333,6 +341,30 @@ class Worker(object): signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) + def check_for_suspension(self, burst): + """Check to see if the workers have been suspended by something like `rq suspend`""" + + before_state = None + notified = False + + while not self.stopped and is_suspended(self.connection): + + if burst: + self.log.info('Suspended in burst mode -- exiting.') + self.log.info('Note: There could still be unperformed jobs on the queue') + raise StopRequested + + if not notified: + self.log.info('Worker suspended, use "rq resume" command to resume') + before_state = self.get_state() + self.set_state(WorkerStatus.SUSPENDED) + notified = True + time.sleep(1) + + if before_state: + self.set_state(before_state) + + def work(self, burst=False): """Starts the work loop. @@ -348,15 +380,19 @@ class Worker(object): did_perform_work = False self.register_birth() self.log.info('RQ worker started, version %s' % VERSION) - self.set_state('starting') + self.set_state(WorkerStatus.STARTED) + try: while True: - if self.stopped: - self.log.info('Stopping on request.') - break - - timeout = None if burst else max(1, self.default_worker_ttl - 60) try: + self.check_for_suspension(burst) + + if self.stopped: + self.log.info('Stopping on request.') + break + + timeout = None if burst else max(1, self.default_worker_ttl - 60) + result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: break @@ -367,20 +403,22 @@ class Worker(object): self.execute_job(job) self.heartbeat() - if job.get_status() == Status.FINISHED: + if job.get_status() == JobStatus.FINISHED: queue.enqueue_dependents(job) did_perform_work = True + finally: if not self.is_horse: self.register_death() return did_perform_work + def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() - self.set_state('idle') + self.set_state(WorkerStatus.IDLE) self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') self.log.info('*** Listening on %s...' % @@ -395,7 +433,7 @@ class Worker(object): if result is not None: job, queue = result self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + blue(job.description), job.id)) break except DequeueTimeout: @@ -477,12 +515,12 @@ class Worker(object): timeout = (job.timeout or 180) + 60 with self.connection._pipeline() as pipeline: - self.set_state('busy', pipeline=pipeline) + self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) registry = StartedJobRegistry(job.origin, self.connection) registry.add(job, timeout, pipeline=pipeline) - job.set_status(Status.STARTED, pipeline=pipeline) + job.set_status(JobStatus.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -511,7 +549,7 @@ class Worker(object): result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() - job._status = Status.FINISHED + job._status = JobStatus.FINISHED job.save(pipeline=pipeline) finished_job_registry = FinishedJobRegistry(job.origin, self.connection) @@ -523,7 +561,7 @@ class Worker(object): pipeline.execute() except Exception: - job.set_status(Status.FAILED, pipeline=pipeline) + job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) @@ -552,7 +590,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) diff --git a/tests/test_cli.py b/tests/test_cli.py index a92fb34..3977006 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -26,6 +26,17 @@ class TestCommandLine(TestCase): class TestRQCli(RQTestCase): + + def assert_normal_execution(self, result): + if result.exit_code == 0: + return True + else: + print("Non normal execution") + print("Exit Code: {}".format(result.exit_code)) + print("Output: {}".format(result.output)) + print("Exception: {}".format(result.exception)) + self.assertEqual(result.exit_code, 0) + """Test rq_cli script""" def setUp(self): super(TestRQCli, self).setUp() @@ -41,25 +52,58 @@ class TestRQCli(RQTestCase): """rq empty -u failed""" runner = CliRunner() result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') def test_requeue(self): """rq requeue -u --all""" runner = CliRunner() result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue') def test_info(self): """rq info -u """ runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url]) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) def test_worker(self): """rq worker -u -b""" runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) + + def test_suspend_and_resume(self): + """rq suspend -u + rq resume -u + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url]) + self.assert_normal_execution(result) + + result = runner.invoke(main, ['resume', '-u', self.redis_url]) + self.assert_normal_execution(result) + + def test_suspend_with_ttl(self): + """rq suspend -u --duration=2 + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1]) + self.assert_normal_execution(result) + + def test_suspend_with_invalid_ttl(self): + """rq suspend -u --duration=0 + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0]) + + self.assertEqual(result.exit_code, 1) + self.assertIn("Duration must be an integer greater than 1", result.output) + + + + + + diff --git a/tests/test_queue.py b/tests/test_queue.py index 8990e35..c905f13 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, from rq import get_failed_queue, Queue from rq.exceptions import InvalidJobOperationError -from rq.job import Job, Status +from rq.job import Job, JobStatus from rq.worker import Worker from tests import RQTestCase @@ -262,7 +262,7 @@ class TestQueue(RQTestCase): """Enqueueing a job sets its status to "queued".""" q = Queue() job = q.enqueue(say_hello) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_explicit_args(self): """enqueue() works for both implicit/explicit args.""" @@ -346,7 +346,7 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, [job.id]) @@ -363,7 +363,7 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job.id) self.assertEqual(q.job_ids, [job.id]) @@ -379,7 +379,7 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, 123) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) self.assertEqual(q.job_ids, [job.id]) @@ -441,7 +441,7 @@ class TestFailedQueue(RQTestCase): get_failed_queue().requeue(job.id) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_preserves_result_ttl(self): """Enqueueing persists result_ttl.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index c6d85ff..75f30aa 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -6,8 +6,9 @@ import os from rq import get_failed_queue, Queue, Worker, SimpleWorker from rq.compat import as_text -from rq.job import Job, Status +from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry +from rq.suspension import suspend, resume from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, @@ -222,14 +223,14 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue(say_hello) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.is_queued, True) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, False) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, True) self.assertEqual(job.is_failed, False) @@ -238,7 +239,7 @@ class TestWorker(RQTestCase): job = q.enqueue(div_by_zero, args=(1,)) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FAILED) + self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) @@ -251,13 +252,13 @@ class TestWorker(RQTestCase): job = q.enqueue_call(say_hello, depends_on=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) parent_job = q.enqueue(div_by_zero) job = q.enqueue_call(say_hello, depends_on=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertNotEqual(job.get_status(), Status.FINISHED) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) def test_get_current_job(self): """Ensure worker.get_current_job() works properly""" @@ -318,3 +319,33 @@ class TestWorker(RQTestCase): 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.description, '你好 世界!') + + def test_pause_worker_execution(self): + """Test Pause Worker Execution""" + + SENTINEL_FILE = '/tmp/rq-tests.txt' + + try: + # Remove the sentinel if it is leftover from a previous test run + os.remove(SENTINEL_FILE) + except OSError as e: + if e.errno != 2: + raise + + q = Queue() + job = q.enqueue(create_file, SENTINEL_FILE) + + w = Worker([q]) + + suspend(self.testconn) + + w.work(burst=True) + assert q.count == 1 + + # Should not have created evidence of execution + self.assertEquals(os.path.exists(SENTINEL_FILE), False) + + resume(self.testconn) + w.work(burst=True) + assert q.count == 0 + self.assertEquals(os.path.exists(SENTINEL_FILE), True) From 2667088b686ff55972a146efc05374f9f846b363 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Sun, 14 Dec 2014 23:02:47 -0500 Subject: [PATCH 24/28] changed the kwarg, but forgot to change the test --- tests/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8bfde6d..85bbc00 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -466,6 +466,6 @@ class TestFailedQueue(RQTestCase): job1 = q.enqueue(say_hello) job2 = q.enqueue(say_hello) assert q.dequeue() == job1 - skip_job = q.enqueue(say_hello, skip_queue=True) + skip_job = q.enqueue(say_hello, at_front=True) assert q.dequeue() == skip_job assert q.dequeue() == job2 From ccd41396f61ba213c6b32252c1aeff57c9cda7a4 Mon Sep 17 00:00:00 2001 From: Jonathan Tushman Date: Mon, 15 Dec 2014 10:04:13 -0500 Subject: [PATCH 25/28] adding suspend with duration test --- rq/cli/cli.py | 2 +- tests/test_worker.py | 25 +++++++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index afb32ea..8dd0b2b 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -164,7 +164,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, queue_class = import_attribute(queue_class) if is_suspended(conn): - click.secho("The worker has been paused, run reset_paused", fg='red') + click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') sys.exit(1) try: diff --git a/tests/test_worker.py b/tests/test_worker.py index 75f30aa..cebfdc4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +from time import sleep from rq import get_failed_queue, Queue, Worker, SimpleWorker from rq.compat import as_text @@ -12,7 +13,7 @@ from rq.suspension import suspend, resume from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, say_hello, say_pid) + div_by_zero, say_hello, say_pid, do_nothing) from tests.helpers import strip_microseconds @@ -320,7 +321,7 @@ class TestWorker(RQTestCase): self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.description, '你好 世界!') - def test_pause_worker_execution(self): + def test_suspend_worker_execution(self): """Test Pause Worker Execution""" SENTINEL_FILE = '/tmp/rq-tests.txt' @@ -349,3 +350,23 @@ class TestWorker(RQTestCase): w.work(burst=True) assert q.count == 0 self.assertEquals(os.path.exists(SENTINEL_FILE), True) + + def test_suspend_with_duration(self): + q = Queue() + for _ in xrange(5): + q.enqueue(do_nothing) + + w = Worker([q]) + + # This suspends workers for working for 2 second + suspend(self.testconn, 2) + + # So when this burst of work happens the queue should remain at 5 + w.work(burst=True) + assert q.count == 5 + + sleep(3) + + # The suspension should be expired now, and a burst of work should now clear the queue + w.work(burst=True) + assert q.count == 0 From 60c27d5a2768a34a42f19ab92f3278bcf17e468d Mon Sep 17 00:00:00 2001 From: Jonathan Tushman Date: Mon, 15 Dec 2014 10:35:19 -0500 Subject: [PATCH 26/28] fixing xrange for python3 compatability --- tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index fd9d0bd..d3b8f41 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -353,7 +353,7 @@ class TestWorker(RQTestCase): def test_suspend_with_duration(self): q = Queue() - for _ in xrange(5): + for _ in range(5): q.enqueue(do_nothing) w = Worker([q]) From 543bcbc60b117e424995483cb3dcfe8dbfbbd77e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 21 Jan 2015 16:12:08 -0800 Subject: [PATCH 27/28] Explicitly cast map() result to a list, for Python 3 compat. --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3ba297a..929f91d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -166,11 +166,11 @@ class Worker(object): def queue_names(self): """Returns the queue names of this worker's queues.""" - return map(lambda q: q.name, self.queues) + return list(map(lambda q: q.name, self.queues)) def queue_keys(self): """Returns the Redis keys representing this worker's queues.""" - return map(lambda q: q.key, self.queues) + return list(map(lambda q: q.key, self.queues)) @property def name(self): From dc09676ee03b978691d5d3c0bc725ab30bc0077a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 13:58:25 +0700 Subject: [PATCH 28/28] Minor copywriting fixes. --- rq/worker.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 929f91d..b971a24 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -67,10 +67,13 @@ def signal_name(signum): return 'SIG_UNKNOWN' -WorkerStatus = enum('WorkerStatus', - STARTED='started', SUSPENDED='suspended', BUSY='busy', - IDLE='idle' - ) +WorkerStatus = enum( + 'WorkerStatus', + STARTED='started', + SUSPENDED='suspended', + BUSY='busy', + IDLE='idle' +) class Worker(object): @@ -342,7 +345,7 @@ class Worker(object): signal.signal(signal.SIGTERM, request_stop) def check_for_suspension(self, burst): - """Check to see if the workers have been suspended by something like `rq suspend`""" + """Check to see if workers have been suspended by `rq suspend`""" before_state = None notified = False @@ -350,8 +353,8 @@ class Worker(object): while not self.stopped and is_suspended(self.connection): if burst: - self.log.info('Suspended in burst mode -- exiting.') - self.log.info('Note: There could still be unperformed jobs on the queue') + self.log.info('Suspended in burst mode -- exiting.' + 'Note: There could still be unperformed jobs on the queue') raise StopRequested if not notified: