Merge branch 'master' into ttl_tests_fixes

Conflicts:
	tests/test_job.py
main
glaslos 10 years ago
commit 24d5e08992

@ -1,3 +1,25 @@
### 0.5.0
(Jan 30th, 2015)
- RQ workers can now be paused and resumed using `rq suspend` and
`rq resume` commands. Thanks Jonathan Tushman!
- Jobs that are being performed are now stored in `StartedJobRegistry`
for monitoring purposes. This also prevents currently active jobs from
being orphaned/lost in the case of hard shutdowns.
- You can now monitor finished jobs by checking `FinishedJobRegistry`.
Thanks Nic Cope for helping!
- Jobs with unmet dependencies are now created with `deferred` as their
status. You can monitor deferred jobs by checking `DeferredJobRegistry`.
- It is now possible to enqueue a job at the beginning of queue using
`queue.enqueue(func, at_front=True)`. Thanks Travis Johnson!
- Command line scripts have all been refactored to use `click`. Thanks Lyon Zhang!
- Added a new `SimpleWorker` that does not fork when executing jobs.
Useful for testing purposes. Thanks Cal Leeming!
- Added `--queue-class` and `--job-class` arguments to `rqworker` script.
Thanks David Bonner!
- Many other minor bug fixes and enhancements.
### 0.4.6 ### 0.4.6
(May 21st, 2014) (May 21st, 2014)

@ -33,7 +33,7 @@ def count_words_at_url(url):
You do use the excellent [requests][r] package, don't you? You do use the excellent [requests][r] package, don't you?
Then, create a RQ queue: Then, create an RQ queue:
```python ```python
from rq import Queue, use_connection from rq import Queue, use_connection

@ -16,22 +16,27 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, from .helpers import (get_redis_from_config, read_config_file, refresh,
show_both, show_queues, show_workers) setup_loghandlers_from_args, show_both, show_queues,
show_workers)
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL', url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.') help='URL describing Redis connection details.')
config_option = click.option('--config', '-c', help='Module containing RQ settings.') config_option = click.option('--config', '-c',
help='Module containing RQ settings.')
def connect(url, config=None): def connect(url, config=None):
if url:
return StrictRedis.from_url(url)
settings = read_config_file(config) if config else {} settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL') return get_redis_from_config(settings)
return StrictRedis.from_url(url or 'redis://localhost:6379/0')
@click.group() @click.group()
@ -148,7 +153,6 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
settings = read_config_file(config) if config else {} settings = read_config_file(config) if config else {}
# Worker specific default arguments # Worker specific default arguments
url = url or settings.get('REDIS_URL')
queues = queues or settings.get('QUEUES', ['default']) queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN') sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
@ -158,7 +162,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
setup_loghandlers_from_args(verbose, quiet) setup_loghandlers_from_args(verbose, quiet)
conn = connect(url) conn = connect(url, config)
cleanup_ghosts(conn) cleanup_ghosts(conn)
worker_class = import_attribute(worker_class) worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class) queue_class = import_attribute(queue_class)

@ -7,10 +7,11 @@ import time
from functools import partial from functools import partial
import click import click
from redis import StrictRedis
from rq import Queue, Worker from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended from rq.worker import WorkerStatus
red = partial(click.style, fg='red') red = partial(click.style, fg='red')
green = partial(click.style, fg='green') green = partial(click.style, fg='green')
@ -25,6 +26,19 @@ def read_config_file(module):
if k.upper() == k]) if k.upper() == k])
def get_redis_from_config(settings):
"""Returns a StrictRedis instance from a dictionary of settings."""
if settings.get('REDIS_URL') is not None:
return StrictRedis.from_url(settings['REDIS_URL'])
return StrictRedis(
host=settings.get('REDIS_HOST', 'localhost'),
port=settings.get('REDIS_PORT', 6379),
db=settings.get('REDIS_DB', 0),
password=settings.get('REDIS_PASSWORD', None),
)
def pad(s, pad_to_length): def pad(s, pad_to_length):
"""Pads the given string to the given length.""" """Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,) return ('%-' + '%ds' % pad_to_length) % (s,)

@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse, enum from .utils import enum, import_attribute, utcformat, utcnow, utcparse
try: try:
import cPickle as pickle import cPickle as pickle
@ -514,8 +514,15 @@ class Job(object):
if self.func_name is None: if self.func_name is None:
return None return None
arg_list = [repr(arg) for arg in self.args] # Python 2/3 compatibility
arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] try:
arg_list = [repr(arg).decode('utf-8') for arg in self.args]
except AttributeError:
arg_list = [repr(arg) for arg in self.args]
kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()]
# Sort here because python 3.3 & 3.4 makes different call_string
arg_list += sorted(kwargs)
args = ', '.join(arg_list) args = ', '.join(arg_list)
return '%s(%s)' % (self.func_name, args) return '%s(%s)' % (self.func_name, args)

@ -4,15 +4,14 @@ from __future__ import (absolute_import, division, print_function,
import uuid import uuid
from .connections import resolve_connection from redis import WatchError
from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .exceptions import (DequeueTimeout, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError) NoSuchJobError, UnpickleError)
from .compat import total_ordering, string_types, as_text from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from redis import WatchError
def get_failed_queue(connection=None): def get_failed_queue(connection=None):
@ -143,9 +142,9 @@ class Queue(object):
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None: if pipeline is not None:
pipeline.lrem(self.key, 0, job_id) pipeline.lrem(self.key, 1, job_id)
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 1, job_id)
def compact(self): def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while """Removes all "dead" jobs from the queue by cycling through it, while
@ -200,6 +199,7 @@ class Queue(object):
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != JobStatus.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
pipe.multi()
job.set_status(JobStatus.DEFERRED) job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
@ -254,7 +254,6 @@ class Queue(object):
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
# Add Queue key set # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key) self.connection.sadd(self.redis_queues_keys, self.key)

@ -15,4 +15,4 @@ def suspend(connection, ttl=None):
def resume(connection): def resume(connection):
return connection.delete(WORKERS_SUSPENDED) return connection.delete(WORKERS_SUSPENDED)

@ -9,12 +9,12 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import calendar import calendar
import importlib
import datetime import datetime
import importlib
import logging import logging
import sys import sys
from .compat import is_python_version, as_text from .compat import as_text, is_python_version
class _Colorizer(object): class _Colorizer(object):
@ -217,4 +217,4 @@ def enum(name, *sequential, **named):
# On Python 2 type() requires a byte string (which is str() on Python 2). # On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as # On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op. # a no-op.
return type(str(name), (), values) return type(str(name), (), values)

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '0.4.6'
VERSION = '0.5.0'

@ -12,7 +12,6 @@ import sys
import time import time
import traceback import traceback
import warnings import warnings
from datetime import datetime
from rq.compat import as_text, string_types, text_type from rq.compat import as_text, string_types, text_type
@ -21,11 +20,11 @@ from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry from .registry import FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse
from .version import VERSION
try: try:
from procname import setprocname from procname import setprocname
@ -54,8 +53,8 @@ def compact(l):
return [x for x in l if x is not None] return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame) _signames = dict((getattr(signal, signame), signame)
for signame in dir(signal) for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame) if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum): def signal_name(signum):
@ -246,6 +245,21 @@ class Worker(object):
p.expire(self.key, 60) p.expire(self.key, 60)
p.execute() p.execute()
@property
def birth_date(self):
"""Fetches birth date from Redis."""
birth_timestamp = self.connection.hget(self.key, 'birth')
if birth_timestamp is not None:
return utcparse(as_text(birth_timestamp))
@property
def death_date(self):
"""Fetches death date from Redis."""
death_timestamp = self.connection.hget(self.key, 'death')
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline=None): def set_state(self, state, pipeline=None):
self._state = state self._state = state
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
@ -367,7 +381,6 @@ class Worker(object):
if before_state: if before_state:
self.set_state(before_state) self.set_state(before_state)
def work(self, burst=False): def work(self, burst=False):
"""Starts the work loop. """Starts the work loop.
@ -416,7 +429,6 @@ class Worker(object):
self.register_death() self.register_death()
return did_perform_work return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout): def dequeue_job_and_maintain_ttl(self, timeout):
result = None result = None
qnames = self.queue_names() qnames = self.queue_names()

@ -101,9 +101,3 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 1) self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output) self.assertIn("Duration must be an integer greater than 1", result.output)

@ -0,0 +1,41 @@
from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
class TestHelpers(RQTestCase):
def test_get_redis_from_config(self):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
}
# Ensure REDIS_URL is read
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
settings = {
'REDIS_URL': 'redis://localhost:1/1',
'REDIS_HOST': 'foo',
'REDIS_DB': 2,
'REDIS_PORT': 2,
'REDIS_PASSWORD': 'bar'
}
# Ensure REDIS_URL is preferred
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
# Ensure fall back to regular connection parameters
settings['REDIS_URL'] = None
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['host'], 'foo')
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')

@ -4,11 +4,16 @@ from __future__ import (absolute_import, division, print_function,
from datetime import datetime from datetime import datetime
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation)
from tests.helpers import strip_microseconds
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job from rq.job import get_current_job, Job
from rq.registry import DeferredJobRegistry
from rq.queue import Queue from rq.queue import Queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
from tests import RQTestCase from tests import RQTestCase
@ -23,6 +28,26 @@ except ImportError:
class TestJob(RQTestCase): class TestJob(RQTestCase):
def test_unicode(self):
"""Unicode in job description [issue405]"""
job = Job.create(
'myfunc',
args=[12, ""],
kwargs=dict(snowman="", null=None),
)
try:
# Python 2
test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
except AttributeError:
# Python 3
test_string = "myfunc(12, '', null=None, snowman='')"
self.assertEquals(
job.description,
test_string,
)
def test_create_empty_job(self): def test_create_empty_job(self):
"""Creation of new empty jobs.""" """Creation of new empty jobs."""
job = Job() job = Job()
@ -355,7 +380,7 @@ class TestJob(RQTestCase):
job = Job.create(func=say_hello, origin=origin) job = Job.create(func=say_hello, origin=origin)
job._dependency_id = 'id' job._dependency_id = 'id'
job.save() job.save()
self.assertEqual(registry.get_job_ids(), []) self.assertEqual(registry.get_job_ids(), [])
job.register_dependency() job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)

@ -2,16 +2,16 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation)
from rq import get_failed_queue, Queue from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation)
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -340,7 +340,7 @@ class TestQueue(RQTestCase):
self.assertFalse(self.testconn.exists(parent_job.dependents_key)) self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty # DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), []) self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""

@ -5,17 +5,17 @@ from __future__ import (absolute_import, division, print_function,
import os import os
from time import sleep from time import sleep
from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import suspend, resume
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, say_hello, say_pid, do_nothing) div_by_zero, do_nothing, say_hello, say_pid)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker
from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -334,7 +334,7 @@ class TestWorker(RQTestCase):
raise raise
q = Queue() q = Queue()
job = q.enqueue(create_file, SENTINEL_FILE) q.enqueue(create_file, SENTINEL_FILE)
w = Worker([q]) w = Worker([q])
@ -379,3 +379,25 @@ class TestWorker(RQTestCase):
w3 = Worker([q], name="worker1") w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3]) worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2) self.assertEquals(len(worker_set), 2)
def test_worker_sets_birth(self):
"""Ensure worker correctly sets worker birth date."""
q = Queue()
w = Worker([q])
w.register_birth()
birth_date = w.birth_date
self.assertIsNotNone(birth_date)
self.assertEquals(type(birth_date).__name__, 'datetime')
def test_worker_sets_death(self):
"""Ensure worker correctly sets worker death date."""
q = Queue()
w = Worker([q])
w.register_death()
death_date = w.death_date
self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime')

Loading…
Cancel
Save