diff --git a/.gitignore b/.gitignore index 25a046e..f95cc7b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,3 @@ .tox .vagrant Vagrantfile - -# PyCharm -.idea diff --git a/CHANGES.md b/CHANGES.md index 9097cb9..12f273a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,25 @@ +### 0.5.0 +(Jan 30th, 2015) + +- RQ workers can now be paused and resumed using `rq suspend` and + `rq resume` commands. Thanks Jonathan Tushman! +- Jobs that are being performed are now stored in `StartedJobRegistry` + for monitoring purposes. This also prevents currently active jobs from + being orphaned/lost in the case of hard shutdowns. +- You can now monitor finished jobs by checking `FinishedJobRegistry`. + Thanks Nic Cope for helping! +- Jobs with unmet dependencies are now created with `deferred` as their + status. You can monitor deferred jobs by checking `DeferredJobRegistry`. +- It is now possible to enqueue a job at the beginning of queue using + `queue.enqueue(func, at_front=True)`. Thanks Travis Johnson! +- Command line scripts have all been refactored to use `click`. Thanks Lyon Zhang! +- Added a new `SimpleWorker` that does not fork when executing jobs. + Useful for testing purposes. Thanks Cal Leeming! +- Added `--queue-class` and `--job-class` arguments to `rqworker` script. + Thanks David Bonner! +- Many other minor bug fixes and enhancements. + + ### 0.4.6 (May 21st, 2014) diff --git a/README.md b/README.md index 8b83f68..2b1fa71 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ def count_words_at_url(url): You do use the excellent [requests][r] package, don't you? -Then, create a RQ queue: +Then, create an RQ queue: ```python from rq import Queue, use_connection diff --git a/requirements.txt b/requirements.txt index 539b9a4..8da152d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis -click +redis==2.7.0 +click>=3.0.0 diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 18979dc..8dd0b2b 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue from rq.contrib.legacy import cleanup_ghosts from rq.exceptions import InvalidJobOperationError from rq.utils import import_attribute +from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, show_both, show_queues, show_workers) @@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.') +config_option = click.option('--config', '-c', help='Module containing RQ settings.') -def connect(url): + +def connect(url, config=None): + settings = read_config_file(config) if config else {} + url = url or settings.get('REDIS_URL') return StrictRedis.from_url(url or 'redis://localhost:6379/0') @@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): @main.command() @url_option -@click.option('--config', '-c', help='Module containing RQ settings.') +@config_option @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') @click.option('--name', '-n', help='Specify a different name') @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) + if is_suspended(conn): + click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') + sys.exit(1) + try: + queues = [queue_class(queue, connection=conn) for queue in queues] w = worker_class(queues, name=name, @@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, except ConnectionError as e: print(e) sys.exit(1) + + +@main.command() +@url_option +@config_option +@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int) +def suspend(url, config, duration): + """Suspends all workers, to resume run `rq resume`""" + if duration is not None and duration < 1: + click.echo("Duration must be an integer greater than 1") + sys.exit(1) + + connection = connect(url, config) + connection_suspend(connection, duration) + + if duration: + msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will + automatically resume""".format(duration) + click.echo(msg) + else: + click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") + + +@main.command() +@url_option +@config_option +def resume(url, config): + """Resumes processing of queues, that where suspended with `rq suspend`""" + connection = connect(url, config) + connection_resume(connection) + click.echo("Resuming workers.") diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index f25cc81..0730d17 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -9,6 +9,7 @@ from functools import partial import click from rq import Queue, Worker from rq.logutils import setup_loghandlers +from rq.worker import WorkerStatus red = partial(click.style, fg='red') green = partial(click.style, fg='green') @@ -39,8 +40,9 @@ def get_scale(x): def state_symbol(state): symbols = { - 'busy': red('busy'), - 'idle': green('idle'), + WorkerStatus.BUSY: red('busy'), + WorkerStatus.IDLE: green('idle'), + WorkerStatus.SUSPENDED: yellow('suspended'), } try: return symbols[state] diff --git a/rq/job.py b/rq/job.py index 8067a01..981694c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError, UnpickleError from .local import LocalStack -from .utils import import_attribute, utcformat, utcnow, utcparse +from .utils import enum, import_attribute, utcformat, utcnow, utcparse try: import cPickle as pickle @@ -25,18 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) loads = pickle.loads -def enum(name, *sequential, **named): - values = dict(zip(sequential, range(len(sequential))), **named) - - # NOTE: Yes, we *really* want to cast using str() here. - # On Python 2 type() requires a byte string (which is str() on Python 2). - # On Python 3 it does not matter, so we'll use str(), which acts as - # a no-op. - return type(str(name), (), values) - -Status = enum('Status', - QUEUED='queued', FINISHED='finished', FAILED='failed', - STARTED='started') +JobStatus = enum( + 'JobStatus', + QUEUED='queued', + FINISHED='finished', + FAILED='failed', + STARTED='started', + DEFERRED='deferred' +) # Sentinel value to mark that some of our lazily evaluated properties have not # yet been evaluated. @@ -92,8 +88,8 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, - id=None): + result_ttl=None, ttl=None, status=None, description=None, + depends_on=None, timeout=None, id=None, origin=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -111,6 +107,9 @@ class Job(object): if id is not None: job.set_id(id) + if origin is not None: + job.origin = origin + # Set the core job tuple properties job._instance = None if inspect.ismethod(func): @@ -167,19 +166,19 @@ class Job(object): @property def is_finished(self): - return self.get_status() == Status.FINISHED + return self.get_status() == JobStatus.FINISHED @property def is_queued(self): - return self.get_status() == Status.QUEUED + return self.get_status() == JobStatus.QUEUED @property def is_failed(self): - return self.get_status() == Status.FAILED + return self.get_status() == JobStatus.FAILED @property def is_started(self): - return self.get_status() == Status.STARTED + return self.get_status() == JobStatus.STARTED @property def dependency(self): @@ -545,8 +544,14 @@ class Job(object): rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} - This method adds the current job in its dependency's dependents set. + This method adds the job in its dependency's dependents set + and adds the job to DeferredJobRegistry. """ + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.origin, connection=self.connection) + registry.add(self, pipeline=pipeline) + connection = pipeline if pipeline is not None else self.connection connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) diff --git a/rq/queue.py b/rq/queue.py index a91b4a2..3f59a50 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,15 +4,14 @@ from __future__ import (absolute_import, division, print_function, import uuid -from .connections import resolve_connection -from .job import Job, Status -from .utils import import_attribute, utcnow +from redis import WatchError +from .compat import as_text, string_types, total_ordering +from .connections import resolve_connection from .exceptions import (DequeueTimeout, InvalidJobOperationError, NoSuchJobError, UnpickleError) -from .compat import total_ordering, string_types, as_text - -from redis import WatchError +from .job import Job, JobStatus +from .utils import import_attribute, utcnow def get_failed_queue(connection=None): @@ -149,7 +148,7 @@ class Queue(object): def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while - guarantueeing FIFO semantics. + guaranteeing FIFO semantics. """ COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) @@ -161,14 +160,18 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None): - """Pushes a job ID on the corresponding Redis queue.""" + def push_job_id(self, job_id, pipeline=None, at_front=False): + """Pushes a job ID on the corresponding Redis queue. + 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection - connection.rpush(self.key, job_id) + if at_front: + connection.lpush(self.key, job_id) + else: + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, description=None, - depends_on=None, job_id=None): + depends_on=None, job_id=None, at_front=False): """Creates a job to represent the delayed function call and enqueues it. @@ -178,11 +181,11 @@ class Queue(object): """ timeout = timeout or self._default_timeout - # TODO: job with dependency shouldn't have "queued" as status - job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED, - description=description, depends_on=depends_on, timeout=timeout, - id=job_id) + job = self.job_class.create( + func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=JobStatus.QUEUED, + description=description, depends_on=depends_on, + timeout=timeout, id=job_id, origin=self.name) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. @@ -195,7 +198,8 @@ class Queue(object): while True: try: pipe.watch(depends_on.key) - if depends_on.get_status() != Status.FINISHED: + if depends_on.get_status() != JobStatus.FINISHED: + job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) pipe.execute() @@ -204,7 +208,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job) + return self.enqueue_job(job, at_front=at_front) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -232,6 +236,7 @@ class Queue(object): ttl = kwargs.pop('ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) + at_front = kwargs.pop('at_front', False) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -241,29 +246,29 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, description=description, depends_on=depends_on, - job_id=job_id) + job_id=job_id, at_front=at_front) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, at_front=False): """Enqueues a job for delayed execution. - If the `set_meta_data` argument is `True` (default), it will update - the properties `origin` and `enqueued_at`. - If Queue is instantiated with async=False, job is executed immediately. """ - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) + with self.connection._pipeline() as pipeline: + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipeline) - if set_meta_data: job.origin = self.name job.enqueued_at = utcnow() - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipeline) + + pipeline.execute() if self._async: - self.push_job_id(job.id) + self.push_job_id(job.id, at_front=at_front) else: job.perform() job.save() @@ -272,11 +277,16 @@ class Queue(object): def enqueue_dependents(self, job): """Enqueues all jobs in the given job's dependents set and clears it.""" # TODO: can probably be pipelined + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) + registry.remove(dependent) self.enqueue_job(dependent) def pop_job_id(self): @@ -391,19 +401,25 @@ class Queue(object): class FailedQueue(Queue): def __init__(self, connection=None): - super(FailedQueue, self).__init__(Status.FAILED, connection=connection) + super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection) def quarantine(self, job, exc_info): """Puts the given Job in quarantine (i.e. put it on the failed queue). - - This is different from normal job enqueueing, since certain meta data - must not be overridden (e.g. `origin` or `enqueued_at`) and other meta - data must be inserted (`ended_at` and `exc_info`). """ - job.ended_at = utcnow() - job.exc_info = exc_info - return self.enqueue_job(job, set_meta_data=False) + + with self.connection._pipeline() as pipeline: + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + + job.ended_at = utcnow() + job.exc_info = exc_info + job.save(pipeline=pipeline) + + self.push_job_id(job.id, pipeline=pipeline) + pipeline.execute() + + return job def requeue(self, job_id): """Requeues the job with the given job ID.""" @@ -418,7 +434,7 @@ class FailedQueue(Queue): if self.remove(job) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') - job.set_status(Status.QUEUED) + job.set_status(JobStatus.QUEUED) job.exc_info = None q = Queue(job.origin, connection=self.connection) q.enqueue_job(job) diff --git a/rq/registry.py b/rq/registry.py index b4cf43f..08798eb 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -24,7 +24,7 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, ttl, pipeline=None): + def add(self, job, ttl=0, pipeline=None): """Adds a job to a registry with expiry time of now + ttl.""" score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: @@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry): """ score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) + + +class DeferredJobRegistry(BaseRegistry): + """ + Registry of deferred jobs (waiting for another job to finish). + """ + + def __init__(self, name='default', connection=None): + super(DeferredJobRegistry, self).__init__(name, connection) + self.key = 'rq:deferred:%s' % name + + def cleanup(self): + """This method is only here to prevent errors because this method is + automatically called by `count()` and `get_job_ids()` methods + implemented in BaseRegistry.""" + pass diff --git a/rq/suspension.py b/rq/suspension.py new file mode 100644 index 0000000..93152b9 --- /dev/null +++ b/rq/suspension.py @@ -0,0 +1,18 @@ +WORKERS_SUSPENDED = 'rq:suspended' + + +def is_suspended(connection): + return connection.exists(WORKERS_SUSPENDED) + + +def suspend(connection, ttl=None): + """ttl = time to live in seconds. Default is no expiration + Note: If you pass in 0 it will invalidate right away + """ + connection.set(WORKERS_SUSPENDED, 1) + if ttl is not None: + connection.expire(WORKERS_SUSPENDED, ttl) + + +def resume(connection): + return connection.delete(WORKERS_SUSPENDED) diff --git a/rq/utils.py b/rq/utils.py index 8233ac6..3e44a98 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -9,12 +9,12 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import calendar -import importlib import datetime +import importlib import logging import sys -from .compat import is_python_version, as_text +from .compat import as_text, is_python_version class _Colorizer(object): @@ -208,3 +208,13 @@ def first(iterable, default=None, key=None): def current_timestamp(): """Returns current UTC timestamp""" return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) + + +def enum(name, *sequential, **named): + values = dict(zip(sequential, range(len(sequential))), **named) + + # NOTE: Yes, we *really* want to cast using str() here. + # On Python 2 type() requires a byte string (which is str() on Python 2). + # On Python 3 it does not matter, so we'll use str(), which acts as + # a no-op. + return type(str(name), (), values) diff --git a/rq/version.py b/rq/version.py index 28d738f..0d96ce8 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.4.6' + +VERSION = '0.5.0' diff --git a/rq/worker.py b/rq/worker.py index 73347c7..c863b56 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -17,13 +17,14 @@ from rq.compat import as_text, string_types, text_type from .connections import get_current_connection from .exceptions import DequeueTimeout, NoQueueError -from .job import Job, Status +from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue +from .registry import FinishedJobRegistry, StartedJobRegistry +from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty -from .utils import import_attribute, make_colorizer, utcformat, utcnow, utcparse +from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse from .version import VERSION -from .registry import FinishedJobRegistry, StartedJobRegistry try: from procname import setprocname @@ -65,6 +66,15 @@ def signal_name(signum): return 'SIG_UNKNOWN' +WorkerStatus = enum( + 'WorkerStatus', + STARTED='started', + SUSPENDED='suspended', + BUSY='busy', + IDLE='idle' +) + + class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' @@ -158,11 +168,11 @@ class Worker(object): def queue_names(self): """Returns the queue names of this worker's queues.""" - return map(lambda q: q.name, self.queues) + return list(map(lambda q: q.name, self.queues)) def queue_keys(self): """Returns the Redis keys representing this worker's queues.""" - return map(lambda q: q.key, self.queues) + return list(map(lambda q: q.key, self.queues)) @property def name(self): @@ -348,6 +358,29 @@ class Worker(object): signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) + def check_for_suspension(self, burst): + """Check to see if workers have been suspended by `rq suspend`""" + + before_state = None + notified = False + + while not self.stopped and is_suspended(self.connection): + + if burst: + self.log.info('Suspended in burst mode -- exiting.' + 'Note: There could still be unperformed jobs on the queue') + raise StopRequested + + if not notified: + self.log.info('Worker suspended, use "rq resume" command to resume') + before_state = self.get_state() + self.set_state(WorkerStatus.SUSPENDED) + notified = True + time.sleep(1) + + if before_state: + self.set_state(before_state) + def work(self, burst=False): """Starts the work loop. @@ -363,15 +396,19 @@ class Worker(object): did_perform_work = False self.register_birth() self.log.info('RQ worker started, version %s' % VERSION) - self.set_state('starting') + self.set_state(WorkerStatus.STARTED) + try: while True: - if self.stopped: - self.log.info('Stopping on request.') - break - - timeout = None if burst else max(1, self.default_worker_ttl - 60) try: + self.check_for_suspension(burst) + + if self.stopped: + self.log.info('Stopping on request.') + break + + timeout = None if burst else max(1, self.default_worker_ttl - 60) + result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: break @@ -382,10 +419,11 @@ class Worker(object): self.execute_job(job) self.heartbeat() - if job.get_status() == Status.FINISHED: + if job.get_status() == JobStatus.FINISHED: queue.enqueue_dependents(job) did_perform_work = True + finally: if not self.is_horse: self.register_death() @@ -395,7 +433,7 @@ class Worker(object): result = None qnames = self.queue_names() - self.set_state('idle') + self.set_state(WorkerStatus.IDLE) self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') self.log.info('*** Listening on %s...' % @@ -410,7 +448,7 @@ class Worker(object): if result is not None: job, queue = result self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + blue(job.description), job.id)) break except DequeueTimeout: @@ -450,7 +488,9 @@ class Worker(object): self.procline('Forked %d at %d' % (child_pid, time.time())) while True: try: + self.set_state('busy') os.waitpid(child_pid, 0) + self.set_state('idle') break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -492,12 +532,12 @@ class Worker(object): timeout = (job.timeout or 180) + 60 with self.connection._pipeline() as pipeline: - self.set_state('busy', pipeline=pipeline) + self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) registry = StartedJobRegistry(job.origin, self.connection) registry.add(job, timeout, pipeline=pipeline) - job.set_status(Status.STARTED, pipeline=pipeline) + job.set_status(JobStatus.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -526,7 +566,7 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() - job._status = Status.FINISHED + job._status = JobStatus.FINISHED job.save(pipeline=pipeline) finished_job_registry = FinishedJobRegistry(job.origin, self.connection) @@ -538,7 +578,7 @@ class Worker(object): pipeline.execute() except Exception: - job.set_status(Status.FAILED, pipeline=pipeline) + job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) @@ -567,7 +607,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) diff --git a/tests/test_cli.py b/tests/test_cli.py index a92fb34..f1b7fd4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -26,6 +26,17 @@ class TestCommandLine(TestCase): class TestRQCli(RQTestCase): + + def assert_normal_execution(self, result): + if result.exit_code == 0: + return True + else: + print("Non normal execution") + print("Exit Code: {}".format(result.exit_code)) + print("Output: {}".format(result.output)) + print("Exception: {}".format(result.exception)) + self.assertEqual(result.exit_code, 0) + """Test rq_cli script""" def setUp(self): super(TestRQCli, self).setUp() @@ -41,25 +52,52 @@ class TestRQCli(RQTestCase): """rq empty -u failed""" runner = CliRunner() result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') def test_requeue(self): """rq requeue -u --all""" runner = CliRunner() result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue') def test_info(self): """rq info -u """ runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url]) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) def test_worker(self): """rq worker -u -b""" runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) - self.assertEqual(result.exit_code, 0) + self.assert_normal_execution(result) + + def test_suspend_and_resume(self): + """rq suspend -u + rq resume -u + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url]) + self.assert_normal_execution(result) + + result = runner.invoke(main, ['resume', '-u', self.redis_url]) + self.assert_normal_execution(result) + + def test_suspend_with_ttl(self): + """rq suspend -u --duration=2 + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1]) + self.assert_normal_execution(result) + + def test_suspend_with_invalid_ttl(self): + """rq suspend -u --duration=0 + """ + runner = CliRunner() + result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0]) + + self.assertEqual(result.exit_code, 1) + self.assertIn("Duration must be an integer greater than 1", result.output) diff --git a/tests/test_job.py b/tests/test_job.py index 34859a7..e117ea9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,17 +4,18 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime +from tests import RQTestCase +from tests.fixtures import (access_self, CallableObject, Number, say_hello, + some_calculation) +from tests.helpers import strip_microseconds + from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job from rq.queue import Queue +from rq.registry import DeferredJobRegistry from rq.utils import utcformat -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation) -from tests.helpers import strip_microseconds - try: from cPickle import loads, dumps except ImportError: @@ -331,12 +332,18 @@ class TestJob(RQTestCase): self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) def test_register_dependency(self): - """Test that jobs updates the correct job dependents.""" - job = Job.create(func=say_hello) + """Ensure dependency registration works properly.""" + origin = 'some_queue' + registry = DeferredJobRegistry(origin, self.testconn) + + job = Job.create(func=say_hello, origin=origin) job._dependency_id = 'id' job.save() + + self.assertEqual(registry.get_job_ids(), []) job.register_dependency() self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) + self.assertEqual(registry.get_job_ids(), [job.id]) def test_cancel(self): """job.cancel() deletes itself & dependents mapping from Redis.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index ed42204..e4e9253 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,15 +2,16 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobOperationError -from rq.job import Job, Status -from rq.worker import Worker - from tests import RQTestCase from tests.fixtures import (div_by_zero, echo, Number, say_hello, some_calculation) +from rq import get_failed_queue, Queue +from rq.exceptions import InvalidJobOperationError +from rq.job import Job, JobStatus +from rq.registry import DeferredJobRegistry +from rq.worker import Worker + class CustomJob(Job): pass @@ -117,6 +118,7 @@ class TestQueue(RQTestCase): # say_hello spec holds which queue this is sent to job = q.enqueue(say_hello, 'Nick', foo='bar') job_id = job.id + self.assertEqual(job.origin, q.name) # Inspect data inside Redis q_key = 'rq:queue:default' @@ -131,14 +133,12 @@ class TestQueue(RQTestCase): job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar')) # Preconditions - self.assertIsNone(job.origin) self.assertIsNone(job.enqueued_at) # Action q.enqueue_job(job) # Postconditions - self.assertEquals(job.origin, q.name) self.assertIsNotNone(job.enqueued_at) def test_pop_job_id(self): @@ -262,7 +262,7 @@ class TestQueue(RQTestCase): """Enqueueing a job sets its status to "queued".""" q = Queue() job = q.enqueue(say_hello) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_explicit_args(self): """enqueue() works for both implicit/explicit args.""" @@ -320,37 +320,44 @@ class TestQueue(RQTestCase): self.assertEquals(len(Queue.all()), 3) def test_enqueue_dependents(self): - """Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" + """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + and removes them from DeferredJobQueue.""" q = Queue() parent_job = Job.create(func=say_hello) parent_job.save() - job_1 = Job.create(func=say_hello, depends_on=parent_job) - job_1.save() - job_1.register_dependency() - job_2 = Job.create(func=say_hello, depends_on=parent_job) - job_2.save() - job_2.register_dependency() + job_1 = q.enqueue(say_hello, depends_on=parent_job) + job_2 = q.enqueue(say_hello, depends_on=parent_job) + registry = DeferredJobRegistry(q.name, connection=self.testconn) + self.assertEqual( + set(registry.get_job_ids()), + set([job_1.id, job_2.id]) + ) # After dependents is enqueued, job_1 and job_2 should be in queue self.assertEqual(q.job_ids, []) q.enqueue_dependents(parent_job) - self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) + self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id])) self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + # DeferredJobRegistry should also be empty + self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() - q.enqueue_call(say_hello, depends_on=parent_job) + job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, []) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_job_with_dependency_by_id(self): """Enqueueing jobs should work as expected by id as well as job-objects.""" @@ -361,14 +368,14 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job.id) self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) def test_enqueue_job_with_dependency_and_timeout(self): - """Jobs still know their specified timeout after being scheduled as a dependency.""" + """Jobs remember their timeout when enqueued as a dependency.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() @@ -377,7 +384,7 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, 123) # Jobs dependent on finished jobs are immediately enqueued - parent_job.set_status(Status.FINISHED) + parent_job.set_status(JobStatus.FINISHED) parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) self.assertEqual(q.job_ids, [job.id]) @@ -439,7 +446,7 @@ class TestFailedQueue(RQTestCase): get_failed_queue().requeue(job.id) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_preserves_result_ttl(self): """Enqueueing persists result_ttl.""" @@ -459,3 +466,13 @@ class TestFailedQueue(RQTestCase): """Ensure custom job class assignment works as expected.""" q = Queue(job_class=CustomJob) self.assertEqual(q.job_class, CustomJob) + + def test_skip_queue(self): + """Ensure the skip_queue option functions""" + q = Queue('foo') + job1 = q.enqueue(say_hello) + job2 = q.enqueue(say_hello) + assert q.dequeue() == job1 + skip_job = q.enqueue(say_hello, at_front=True) + assert q.dequeue() == skip_job + assert q.dequeue() == job2 diff --git a/tests/test_registry.py b/tests/test_registry.py index 26470e3..f54b315 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,11 +1,13 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import +from rq.compat import as_text from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import FinishedJobRegistry, StartedJobRegistry +from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, + StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -119,7 +121,6 @@ class TestFinishedJobRegistry(RQTestCase): self.registry.cleanup(timestamp + 20) self.assertEqual(self.registry.get_job_ids(), ['baz']) - def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry.""" self.assertEqual(self.registry.get_job_ids(), []) @@ -135,3 +136,18 @@ class TestFinishedJobRegistry(RQTestCase): failed_job = queue.enqueue(div_by_zero) worker.perform_job(failed_job) self.assertEqual(self.registry.get_job_ids(), [job.id]) + + +class TestDeferredRegistry(RQTestCase): + + def setUp(self): + super(TestDeferredRegistry, self).setUp() + self.registry = DeferredJobRegistry(connection=self.testconn) + + def test_add(self): + """Adding a job to DeferredJobsRegistry.""" + job = Job() + self.registry.add(job) + job_ids = [as_text(job_id) for job_id in + self.testconn.zrange(self.registry.key, 0, -1)] + self.assertEqual(job_ids, [job.id]) diff --git a/tests/test_worker.py b/tests/test_worker.py index 6fa127f..c47a62e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,17 +3,19 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os - -from rq import get_failed_queue, Queue, Worker, SimpleWorker -from rq.compat import as_text -from rq.job import Job, Status -from rq.registry import StartedJobRegistry +from time import sleep from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, say_hello, say_pid) + div_by_zero, do_nothing, say_hello, say_pid) from tests.helpers import strip_microseconds +from rq import get_failed_queue, Queue, SimpleWorker, Worker +from rq.compat import as_text +from rq.job import Job, JobStatus +from rq.registry import StartedJobRegistry +from rq.suspension import resume, suspend + class CustomJob(Job): pass @@ -222,14 +224,14 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue(say_hello) - self.assertEqual(job.get_status(), Status.QUEUED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.is_queued, True) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, False) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, True) self.assertEqual(job.is_failed, False) @@ -238,7 +240,7 @@ class TestWorker(RQTestCase): job = q.enqueue(div_by_zero, args=(1,)) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FAILED) + self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertEqual(job.is_queued, False) self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) @@ -251,13 +253,13 @@ class TestWorker(RQTestCase): job = q.enqueue_call(say_hello, depends_on=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.get_status(), Status.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) parent_job = q.enqueue(div_by_zero) job = q.enqueue_call(say_hello, depends_on=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertNotEqual(job.get_status(), Status.FINISHED) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) def test_get_current_job(self): """Ensure worker.get_current_job() works properly""" @@ -319,6 +321,56 @@ class TestWorker(RQTestCase): self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.description, '你好 世界!') + def test_suspend_worker_execution(self): + """Test Pause Worker Execution""" + + SENTINEL_FILE = '/tmp/rq-tests.txt' + + try: + # Remove the sentinel if it is leftover from a previous test run + os.remove(SENTINEL_FILE) + except OSError as e: + if e.errno != 2: + raise + + q = Queue() + q.enqueue(create_file, SENTINEL_FILE) + + w = Worker([q]) + + suspend(self.testconn) + + w.work(burst=True) + assert q.count == 1 + + # Should not have created evidence of execution + self.assertEquals(os.path.exists(SENTINEL_FILE), False) + + resume(self.testconn) + w.work(burst=True) + assert q.count == 0 + self.assertEquals(os.path.exists(SENTINEL_FILE), True) + + def test_suspend_with_duration(self): + q = Queue() + for _ in range(5): + q.enqueue(do_nothing) + + w = Worker([q]) + + # This suspends workers for working for 2 second + suspend(self.testconn, 2) + + # So when this burst of work happens the queue should remain at 5 + w.work(burst=True) + assert q.count == 5 + + sleep(3) + + # The suspension should be expired now, and a burst of work should now clear the queue + w.work(burst=True) + assert q.count == 0 + def test_worker_hash_(self): """Workers are hashed by their .name attribute""" q = Queue('foo')