diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 9514aff..81f4aff 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -13,10 +13,9 @@ def is_python_version(*versions): return False -# functools.total_ordering is only available from Python 2.7 and 3.2 -if is_python_version((2, 7), (3, 2)): +try: from functools import total_ordering -else: +except ImportError: def total_ordering(cls): # noqa """Class decorator that fills in missing ordering methods""" convert = { @@ -36,7 +35,7 @@ else: roots = set(dir(cls)) & set(convert) if not roots: raise ValueError('must define at least one ordering operation: < > <= >=') # noqa - root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ + root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ for opname, opfunc in convert[root]: if opname not in roots: opfunc.__name__ = str(opname) diff --git a/rq/job.py b/rq/job.py index e77b085..79101a7 100644 --- a/rq/job.py +++ b/rq/job.py @@ -25,7 +25,6 @@ except ImportError: # noqa # pragma: no cover dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) loads = pickle.loads - JobStatus = enum( 'JobStatus', QUEUED='queued', @@ -183,6 +182,10 @@ class Job(object): def is_started(self): return self.get_status() == JobStatus.STARTED + @property + def is_deferred(self): + return self.get_status() == JobStatus.DEFERRED + @property def dependency(self): """Returns a job's dependency. To avoid repeated Redis fetches, we cache @@ -457,7 +460,6 @@ class Job(object): # Fallback to uncompressed string self.exc_info = as_text(raw_exc_info) - def to_dict(self, include_meta=True): """ Returns a serialization of the current job instance @@ -545,28 +547,28 @@ class Job(object): self.cancel(pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection - if self.get_status() == JobStatus.FINISHED: + if self.is_finished: from .registry import FinishedJobRegistry registry = FinishedJobRegistry(self.origin, connection=self.connection, job_class=self.__class__) registry.remove(self, pipeline=pipeline) - elif self.get_status() == JobStatus.DEFERRED: + elif self.is_deferred: from .registry import DeferredJobRegistry registry = DeferredJobRegistry(self.origin, connection=self.connection, job_class=self.__class__) registry.remove(self, pipeline=pipeline) - elif self.get_status() == JobStatus.STARTED: + elif self.is_started: from .registry import StartedJobRegistry registry = StartedJobRegistry(self.origin, connection=self.connection, job_class=self.__class__) registry.remove(self, pipeline=pipeline) - elif self.get_status() == JobStatus.FAILED: + elif self.is_failed: from .queue import get_failed_queue failed_queue = get_failed_queue(connection=self.connection, job_class=self.__class__) diff --git a/rq/worker.py b/rq/worker.py index bdb5a0f..506c8ab 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -430,7 +430,7 @@ class Worker(object): raise StopRequested() def handle_warm_shutdown_request(self): - self.log.warning('Warm shut down requested') + self.log.info('Warm shut down requested') def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" @@ -469,7 +469,7 @@ class Worker(object): self._install_signal_handlers() did_perform_work = False self.register_birth() - self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) + self.log.info("RQ worker %r started, version %s", self.key, VERSION) self.set_state(WorkerStatus.STARTED) qnames = self.queue_names() self.log.info('*** Listening on %s...', green(', '.join(qnames))) @@ -491,7 +491,7 @@ class Worker(object): result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: - self.log.info("RQ worker {0!r} done, quitting".format(self.key)) + self.log.info("RQ worker %r done, quitting", self.key) break job, queue = result @@ -526,12 +526,12 @@ class Worker(object): job, queue = result if self.log_job_description: - self.log.info('{0}: {1} ({2})'.format(green(queue.name), - blue(job.description), - job.id)) + self.log.info('%s: %s (%s)', green(queue.name), + blue(job.description), + job.id) else: - self.log.info('{0}:{1}'.format(green(queue.name), - job.id)) + self.log.info('%s:%s', green(queue.name), + job.id) break except DequeueTimeout: @@ -652,9 +652,9 @@ class Worker(object): # Unhandled failure: move the job to the failed queue self.log.warning(( - 'Moving job to {0!r} queue ' - '(work-horse terminated unexpectedly; waitpid returned {1})' - ).format(self.failed_queue.name, ret_val)) + 'Moving job to %r queue ' + '(work-horse terminated unexpectedly; waitpid returned %s)' + ), self.failed_queue.name, ret_val) self.failed_queue.quarantine( job, exc_info=( @@ -827,7 +827,7 @@ class Worker(object): finally: pop_connection() - self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) + self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) self.log.debug('Result: %s', yellow(log_result)) @@ -837,9 +837,9 @@ class Worker(object): if result_ttl == 0: self.log.info('Result discarded immediately') elif result_ttl > 0: - self.log.info('Result is kept for {0} seconds'.format(result_ttl)) + self.log.info('Result is kept for %s seconds', result_ttl) else: - self.log.warning('Result will never expire, clean up result key manually') + self.log.info('Result will never expire, clean up result key manually') return True @@ -869,7 +869,7 @@ class Worker(object): def move_to_failed_queue(self, job, *exc_info): """Default exception handler: move the job to the failed queue.""" - self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) + self.log.warning('Moving job to %r queue', self.failed_queue.name) from .handlers import move_to_failed_queue move_to_failed_queue(job, *exc_info) @@ -904,7 +904,7 @@ class Worker(object): def clean_registries(self): """Runs maintenance jobs on each Queue's registries.""" for queue in self.queues: - self.log.info('Cleaning registries for queue: {0}'.format(queue.name)) + self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue) self.last_cleaned_at = utcnow() @@ -952,7 +952,7 @@ class HerokuWorker(Worker): def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: - self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') + self.log.info('Warm shut down requested, sending horse SIGRTMIN signal') self.kill_horse(sig=signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') diff --git a/tests/__init__.py b/tests/__init__.py index c916a30..eacd7a8 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -6,11 +6,10 @@ import logging from redis import Redis from rq import pop_connection, push_connection -from rq.compat import is_python_version -if is_python_version((2, 7), (3, 2)): +try: import unittest -else: +except ImportError: import unittest2 as unittest # noqa diff --git a/tests/test_cli.py b/tests/test_cli.py index c306c81..c7f56af 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function, from click.testing import CliRunner from rq import get_failed_queue, Queue -from rq.compat import is_python_version from rq.job import Job from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig @@ -13,9 +12,9 @@ import pytest from tests import RQTestCase from tests.fixtures import div_by_zero -if is_python_version((2, 7), (3, 2)): +try: from unittest import TestCase -else: +except ImportError: from unittest2 import TestCase # noqa diff --git a/tests/test_worker.py b/tests/test_worker.py index b826f2b..acd3ffa 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -800,8 +800,8 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w.perform_job(job, q) - mock_logger_info.assert_called_with('Result is kept for 10 seconds') - self.assertIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list]) + mock_logger_info.assert_called_with('Result is kept for %s seconds', 10) + self.assertIn('Result is kept for %s seconds', [c[0][0] for c in mock_logger_info.call_args_list]) @mock.patch('rq.worker.logger.info') def test_log_result_lifespan_false(self, mock_logger_info): @@ -823,7 +823,7 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w.dequeue_job_and_maintain_ttl(10) - self.assertIn("Frank", mock_logger_info.call_args[0][0]) + self.assertIn("Frank", mock_logger_info.call_args[0][2]) @mock.patch('rq.worker.logger.info') def test_log_job_description_false(self, mock_logger_info): @@ -832,7 +832,7 @@ class TestWorker(RQTestCase): w = Worker([q], log_job_description=False) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w.dequeue_job_and_maintain_ttl(10) - self.assertNotIn("Frank", mock_logger_info.call_args[0][0]) + self.assertNotIn("Frank", mock_logger_info.call_args[0][2]) def kill_worker(pid, double_kill):