Merge branch 'master' of github.com:rq/rq

main
Selwin Ong 6 years ago
commit 42d22c5220

@ -13,10 +13,9 @@ def is_python_version(*versions):
return False return False
# functools.total_ordering is only available from Python 2.7 and 3.2 try:
if is_python_version((2, 7), (3, 2)):
from functools import total_ordering from functools import total_ordering
else: except ImportError:
def total_ordering(cls): # noqa def total_ordering(cls): # noqa
"""Class decorator that fills in missing ordering methods""" """Class decorator that fills in missing ordering methods"""
convert = { convert = {
@ -36,7 +35,7 @@ else:
roots = set(dir(cls)) & set(convert) roots = set(dir(cls)) & set(convert)
if not roots: if not roots:
raise ValueError('must define at least one ordering operation: < > <= >=') # noqa raise ValueError('must define at least one ordering operation: < > <= >=') # noqa
root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
for opname, opfunc in convert[root]: for opname, opfunc in convert[root]:
if opname not in roots: if opname not in roots:
opfunc.__name__ = str(opname) opfunc.__name__ = str(opname)

@ -25,7 +25,6 @@ except ImportError: # noqa # pragma: no cover
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads loads = pickle.loads
JobStatus = enum( JobStatus = enum(
'JobStatus', 'JobStatus',
QUEUED='queued', QUEUED='queued',
@ -183,6 +182,10 @@ class Job(object):
def is_started(self): def is_started(self):
return self.get_status() == JobStatus.STARTED return self.get_status() == JobStatus.STARTED
@property
def is_deferred(self):
return self.get_status() == JobStatus.DEFERRED
@property @property
def dependency(self): def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache """Returns a job's dependency. To avoid repeated Redis fetches, we cache
@ -457,7 +460,6 @@ class Job(object):
# Fallback to uncompressed string # Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info) self.exc_info = as_text(raw_exc_info)
def to_dict(self, include_meta=True): def to_dict(self, include_meta=True):
""" """
Returns a serialization of the current job instance Returns a serialization of the current job instance
@ -545,28 +547,28 @@ class Job(object):
self.cancel(pipeline=pipeline) self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if self.get_status() == JobStatus.FINISHED: if self.is_finished:
from .registry import FinishedJobRegistry from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin, registry = FinishedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.DEFERRED: elif self.is_deferred:
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, registry = DeferredJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.STARTED: elif self.is_started:
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin, registry = StartedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.FAILED: elif self.is_failed:
from .queue import get_failed_queue from .queue import get_failed_queue
failed_queue = get_failed_queue(connection=self.connection, failed_queue = get_failed_queue(connection=self.connection,
job_class=self.__class__) job_class=self.__class__)

@ -430,7 +430,7 @@ class Worker(object):
raise StopRequested() raise StopRequested()
def handle_warm_shutdown_request(self): def handle_warm_shutdown_request(self):
self.log.warning('Warm shut down requested') self.log.info('Warm shut down requested')
def check_for_suspension(self, burst): def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`""" """Check to see if workers have been suspended by `rq suspend`"""
@ -469,7 +469,7 @@ class Worker(object):
self._install_signal_handlers() self._install_signal_handlers()
did_perform_work = False did_perform_work = False
self.register_birth() self.register_birth()
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) self.log.info("RQ worker %r started, version %s", self.key, VERSION)
self.set_state(WorkerStatus.STARTED) self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names() qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames))) self.log.info('*** Listening on %s...', green(', '.join(qnames)))
@ -491,7 +491,7 @@ class Worker(object):
result = self.dequeue_job_and_maintain_ttl(timeout) result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None: if result is None:
if burst: if burst:
self.log.info("RQ worker {0!r} done, quitting".format(self.key)) self.log.info("RQ worker %r done, quitting", self.key)
break break
job, queue = result job, queue = result
@ -526,12 +526,12 @@ class Worker(object):
job, queue = result job, queue = result
if self.log_job_description: if self.log_job_description:
self.log.info('{0}: {1} ({2})'.format(green(queue.name), self.log.info('%s: %s (%s)', green(queue.name),
blue(job.description), blue(job.description),
job.id)) job.id)
else: else:
self.log.info('{0}:{1}'.format(green(queue.name), self.log.info('%s:%s', green(queue.name),
job.id)) job.id)
break break
except DequeueTimeout: except DequeueTimeout:
@ -652,9 +652,9 @@ class Worker(object):
# Unhandled failure: move the job to the failed queue # Unhandled failure: move the job to the failed queue
self.log.warning(( self.log.warning((
'Moving job to {0!r} queue ' 'Moving job to %r queue '
'(work-horse terminated unexpectedly; waitpid returned {1})' '(work-horse terminated unexpectedly; waitpid returned %s)'
).format(self.failed_queue.name, ret_val)) ), self.failed_queue.name, ret_val)
self.failed_queue.quarantine( self.failed_queue.quarantine(
job, job,
exc_info=( exc_info=(
@ -827,7 +827,7 @@ class Worker(object):
finally: finally:
pop_connection() pop_connection()
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
if rv is not None: if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv))) log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.debug('Result: %s', yellow(log_result)) self.log.debug('Result: %s', yellow(log_result))
@ -837,9 +837,9 @@ class Worker(object):
if result_ttl == 0: if result_ttl == 0:
self.log.info('Result discarded immediately') self.log.info('Result discarded immediately')
elif result_ttl > 0: elif result_ttl > 0:
self.log.info('Result is kept for {0} seconds'.format(result_ttl)) self.log.info('Result is kept for %s seconds', result_ttl)
else: else:
self.log.warning('Result will never expire, clean up result key manually') self.log.info('Result will never expire, clean up result key manually')
return True return True
@ -869,7 +869,7 @@ class Worker(object):
def move_to_failed_queue(self, job, *exc_info): def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue.""" """Default exception handler: move the job to the failed queue."""
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.log.warning('Moving job to %r queue', self.failed_queue.name)
from .handlers import move_to_failed_queue from .handlers import move_to_failed_queue
move_to_failed_queue(job, *exc_info) move_to_failed_queue(job, *exc_info)
@ -904,7 +904,7 @@ class Worker(object):
def clean_registries(self): def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries.""" """Runs maintenance jobs on each Queue's registries."""
for queue in self.queues: for queue in self.queues:
self.log.info('Cleaning registries for queue: {0}'.format(queue.name)) self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue) clean_registries(queue)
self.last_cleaned_at = utcnow() self.last_cleaned_at = utcnow()
@ -952,7 +952,7 @@ class HerokuWorker(Worker):
def handle_warm_shutdown_request(self): def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN""" """If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0: if self.horse_pid != 0:
self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') self.log.info('Warm shut down requested, sending horse SIGRTMIN signal')
self.kill_horse(sig=signal.SIGRTMIN) self.kill_horse(sig=signal.SIGRTMIN)
else: else:
self.log.warning('Warm shut down requested, no horse found') self.log.warning('Warm shut down requested, no horse found')

@ -6,11 +6,10 @@ import logging
from redis import Redis from redis import Redis
from rq import pop_connection, push_connection from rq import pop_connection, push_connection
from rq.compat import is_python_version
if is_python_version((2, 7), (3, 2)): try:
import unittest import unittest
else: except ImportError:
import unittest2 as unittest # noqa import unittest2 as unittest # noqa

@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function,
from click.testing import CliRunner from click.testing import CliRunner
from rq import get_failed_queue, Queue from rq import get_failed_queue, Queue
from rq.compat import is_python_version
from rq.job import Job from rq.job import Job
from rq.cli import main from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig from rq.cli.helpers import read_config_file, CliConfig
@ -13,9 +12,9 @@ import pytest
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero from tests.fixtures import div_by_zero
if is_python_version((2, 7), (3, 2)): try:
from unittest import TestCase from unittest import TestCase
else: except ImportError:
from unittest2 import TestCase # noqa from unittest2 import TestCase # noqa

@ -800,8 +800,8 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.perform_job(job, q) w.perform_job(job, q)
mock_logger_info.assert_called_with('Result is kept for 10 seconds') mock_logger_info.assert_called_with('Result is kept for %s seconds', 10)
self.assertIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list]) self.assertIn('Result is kept for %s seconds', [c[0][0] for c in mock_logger_info.call_args_list])
@mock.patch('rq.worker.logger.info') @mock.patch('rq.worker.logger.info')
def test_log_result_lifespan_false(self, mock_logger_info): def test_log_result_lifespan_false(self, mock_logger_info):
@ -823,7 +823,7 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.dequeue_job_and_maintain_ttl(10) w.dequeue_job_and_maintain_ttl(10)
self.assertIn("Frank", mock_logger_info.call_args[0][0]) self.assertIn("Frank", mock_logger_info.call_args[0][2])
@mock.patch('rq.worker.logger.info') @mock.patch('rq.worker.logger.info')
def test_log_job_description_false(self, mock_logger_info): def test_log_job_description_false(self, mock_logger_info):
@ -832,7 +832,7 @@ class TestWorker(RQTestCase):
w = Worker([q], log_job_description=False) w = Worker([q], log_job_description=False)
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.dequeue_job_and_maintain_ttl(10) w.dequeue_job_and_maintain_ttl(10)
self.assertNotIn("Frank", mock_logger_info.call_args[0][0]) self.assertNotIn("Frank", mock_logger_info.call_args[0][2])
def kill_worker(pid, double_kill): def kill_worker(pid, double_kill):

Loading…
Cancel
Save