merging master and fixing conflicts

main
Bradley Young 10 years ago
commit d56b5424c8

@ -1,3 +1,25 @@
### 0.5.0
(Jan 30th, 2015)
- RQ workers can now be paused and resumed using `rq suspend` and
`rq resume` commands. Thanks Jonathan Tushman!
- Jobs that are being performed are now stored in `StartedJobRegistry`
for monitoring purposes. This also prevents currently active jobs from
being orphaned/lost in the case of hard shutdowns.
- You can now monitor finished jobs by checking `FinishedJobRegistry`.
Thanks Nic Cope for helping!
- Jobs with unmet dependencies are now created with `deferred` as their
status. You can monitor deferred jobs by checking `DeferredJobRegistry`.
- It is now possible to enqueue a job at the beginning of queue using
`queue.enqueue(func, at_front=True)`. Thanks Travis Johnson!
- Command line scripts have all been refactored to use `click`. Thanks Lyon Zhang!
- Added a new `SimpleWorker` that does not fork when executing jobs.
Useful for testing purposes. Thanks Cal Leeming!
- Added `--queue-class` and `--job-class` arguments to `rqworker` script.
Thanks David Bonner!
- Many other minor bug fixes and enhancements.
### 0.4.6
(May 21st, 2014)

@ -33,7 +33,7 @@ def count_words_at_url(url):
You do use the excellent [requests][r] package, don't you?
Then, create a RQ queue:
Then, create an RQ queue:
```python
from rq import Queue, use_connection

@ -16,22 +16,27 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
from .helpers import (get_redis_from_config, read_config_file, refresh,
setup_loghandlers_from_args, show_both, show_queues,
show_workers)
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')
config_option = click.option('--config', '-c', help='Module containing RQ settings.')
config_option = click.option('--config', '-c',
help='Module containing RQ settings.')
def connect(url, config=None):
if url:
return StrictRedis.from_url(url)
settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')
return get_redis_from_config(settings)
@click.group()
@ -149,7 +154,6 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
settings = read_config_file(config) if config else {}
# Worker specific default arguments
url = url or settings.get('REDIS_URL')
queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
@ -159,7 +163,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
setup_loghandlers_from_args(verbose, quiet)
conn = connect(url)
conn = connect(url, config)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)

@ -7,10 +7,11 @@ import time
from functools import partial
import click
from redis import StrictRedis
from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended
from rq.worker import WorkerStatus
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
@ -25,6 +26,19 @@ def read_config_file(module):
if k.upper() == k])
def get_redis_from_config(settings):
"""Returns a StrictRedis instance from a dictionary of settings."""
if settings.get('REDIS_URL') is not None:
return StrictRedis.from_url(settings['REDIS_URL'])
return StrictRedis(
host=settings.get('REDIS_HOST', 'localhost'),
port=settings.get('REDIS_PORT', 6379),
db=settings.get('REDIS_DB', 0),
password=settings.get('REDIS_PASSWORD', None),
)
def pad(s, pad_to_length):
"""Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,)

@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse, enum
from .utils import enum, import_attribute, utcformat, utcnow, utcparse
try:
import cPickle as pickle
@ -25,9 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
JobStatus = enum(
'JobStatus',
QUEUED='queued',
FINISHED='finished',
FAILED='failed',
STARTED='started',
DEFERRED='deferred'
)
# Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated.
@ -83,8 +88,8 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None):
result_ttl=None, ttl=None, status=None, description=None,
depends_on=None, timeout=None, id=None, origin=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -102,6 +107,9 @@ class Job(object):
if id is not None:
job.set_id(id)
if origin is not None:
job.origin = origin
# Set the core job tuple properties
job._instance = None
if inspect.ismethod(func):
@ -506,8 +514,15 @@ class Job(object):
if self.func_name is None:
return None
arg_list = [repr(arg) for arg in self.args]
arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()]
# Python 2/3 compatibility
try:
arg_list = [repr(arg).decode('utf-8') for arg in self.args]
except AttributeError:
arg_list = [repr(arg) for arg in self.args]
kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()]
# Sort here because python 3.3 & 3.4 makes different call_string
arg_list += sorted(kwargs)
args = ', '.join(arg_list)
return '%s(%s)' % (self.func_name, args)
@ -536,8 +551,14 @@ class Job(object):
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the current job in its dependency's dependents set.
This method adds the job in its dependency's dependents set
and adds the job to DeferredJobRegistry.
"""
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, connection=self.connection)
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)

@ -4,15 +4,14 @@ from __future__ import (absolute_import, division, print_function,
import uuid
from .connections import resolve_connection
from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from redis import WatchError
from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError)
from .compat import total_ordering, string_types, as_text
from redis import WatchError
from .job import Job, JobStatus
from .utils import import_attribute, utcnow
def get_failed_queue(connection=None):
@ -143,9 +142,9 @@ class Queue(object):
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None:
pipeline.lrem(self.key, 0, job_id)
pipeline.lrem(self.key, 1, job_id)
return self.connection._lrem(self.key, 0, job_id)
return self.connection._lrem(self.key, 1, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
@ -182,11 +181,11 @@ class Queue(object):
"""
timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
job = self.job_class.create(
func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on,
timeout=timeout, id=job_id, origin=self.name)
# If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it.
@ -200,6 +199,7 @@ class Queue(object):
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
@ -248,24 +248,24 @@ class Queue(object):
description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True, at_front=False):
def enqueue_job(self, job, at_front=False):
"""Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately.
"""
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
if set_meta_data:
job.origin = self.name
job.enqueued_at = utcnow()
if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save()
if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipeline)
pipeline.execute()
if self._async:
self.push_job_id(job.id, at_front=at_front)
@ -277,11 +277,16 @@ class Queue(object):
def enqueue_dependents(self, job):
"""Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True:
job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None:
break
dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent)
self.enqueue_job(dependent)
def pop_job_id(self):
@ -401,14 +406,20 @@ class FailedQueue(Queue):
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
queue).
This is different from normal job enqueueing, since certain meta data
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
data must be inserted (`ended_at` and `exc_info`).
"""
job.ended_at = utcnow()
job.exc_info = exc_info
return self.enqueue_job(job, set_meta_data=False)
with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline)
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()
return job
def requeue(self, job_id):
"""Requeues the job with the given job ID."""

@ -24,7 +24,7 @@ class BaseRegistry(object):
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job, ttl, pipeline=None):
def add(self, job, ttl=0, pipeline=None):
"""Adds a job to a registry with expiry time of now + ttl."""
score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None:
@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry):
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
"""
def __init__(self, name='default', connection=None):
super(DeferredJobRegistry, self).__init__(name, connection)
self.key = 'rq:deferred:%s' % name
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass

@ -15,4 +15,4 @@ def suspend(connection, ttl=None):
def resume(connection):
return connection.delete(WORKERS_SUSPENDED)
return connection.delete(WORKERS_SUSPENDED)

@ -9,12 +9,12 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import calendar
import importlib
import datetime
import importlib
import logging
import sys
from .compat import is_python_version, as_text
from .compat import as_text, is_python_version
class _Colorizer(object):
@ -217,4 +217,4 @@ def enum(name, *sequential, **named):
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)
return type(str(name), (), values)

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
VERSION = '0.4.6'
VERSION = '0.5.0'

@ -12,7 +12,6 @@ import sys
import time
import traceback
import warnings
from datetime import datetime
from rq.compat import as_text, string_types, text_type
@ -21,11 +20,11 @@ from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION
try:
from procname import setprocname
@ -54,8 +53,8 @@ def compact(l):
return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame)
for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame)
for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum):
@ -377,7 +376,6 @@ class Worker(object):
if before_state:
self.set_state(before_state)
def work(self, burst=False):
"""Starts the work loop.
@ -426,7 +424,6 @@ class Worker(object):
self.register_death()
return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()

@ -0,0 +1,41 @@
from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
class TestHelpers(RQTestCase):
def test_get_redis_from_config(self):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
}
# Ensure REDIS_URL is read
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
settings = {
'REDIS_URL': 'redis://localhost:1/1',
'REDIS_HOST': 'foo',
'REDIS_DB': 2,
'REDIS_PORT': 2,
'REDIS_PASSWORD': 'bar'
}
# Ensure REDIS_URL is preferred
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
# Ensure fall back to regular connection parameters
settings['REDIS_URL'] = None
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['host'], 'foo')
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')

@ -4,17 +4,18 @@ from __future__ import (absolute_import, division, print_function,
from datetime import datetime
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation)
from tests.helpers import strip_microseconds
from rq.compat import as_text, PY2
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job
from rq.queue import Queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation)
from tests.helpers import strip_microseconds
try:
from cPickle import loads, dumps
except ImportError:
@ -22,6 +23,26 @@ except ImportError:
class TestJob(RQTestCase):
def test_unicode(self):
"""Unicode in job description [issue405]"""
job = Job.create(
'myfunc',
args=[12, ""],
kwargs=dict(snowman="", null=None),
)
try:
# Python 2
test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
except AttributeError:
# Python 3
test_string = "myfunc(12, '', null=None, snowman='')"
self.assertEquals(
job.description,
test_string,
)
def test_create_empty_job(self):
"""Creation of new empty jobs."""
job = Job()
@ -331,12 +352,18 @@ class TestJob(RQTestCase):
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self):
"""Test that jobs updates the correct job dependents."""
job = Job.create(func=say_hello)
"""Ensure dependency registration works properly."""
origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=say_hello, origin=origin)
job._dependency_id = 'id'
job.save()
self.assertEqual(registry.get_job_ids(), [])
job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)
self.assertEqual(registry.get_job_ids(), [job.id])
def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis."""

@ -2,15 +2,16 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation)
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation)
class CustomJob(Job):
pass
@ -117,6 +118,7 @@ class TestQueue(RQTestCase):
# say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id
self.assertEqual(job.origin, q.name)
# Inspect data inside Redis
q_key = 'rq:queue:default'
@ -131,14 +133,12 @@ class TestQueue(RQTestCase):
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
# Preconditions
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at)
# Action
q.enqueue_job(job)
# Postconditions
self.assertEquals(job.origin, q.name)
self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self):
@ -320,30 +320,36 @@ class TestQueue(RQTestCase):
self.assertEquals(len(Queue.all()), 3)
def test_enqueue_dependents(self):
"""Enqueueing the dependent jobs pushes all jobs in the depends set to the queue."""
"""Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue()
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = Job.create(func=say_hello, depends_on=parent_job)
job_1.save()
job_1.register_dependency()
job_2 = Job.create(func=say_hello, depends_on=parent_job)
job_2.save()
job_2.register_dependency()
job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.testconn)
self.assertEqual(
set(registry.get_job_ids()),
set([job_1.id, job_2.id])
)
# After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id]))
self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job)
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
@ -351,6 +357,7 @@ class TestQueue(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects."""
@ -368,7 +375,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs still know their specified timeout after being scheduled as a dependency."""
"""Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
q = Queue()

@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from rq.compat import as_text
from rq.job import Job
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import FinishedJobRegistry, StartedJobRegistry
from rq.registry import (DeferredJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
@ -119,7 +121,6 @@ class TestFinishedJobRegistry(RQTestCase):
self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), [])
@ -135,3 +136,18 @@ class TestFinishedJobRegistry(RQTestCase):
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
class TestDeferredRegistry(RQTestCase):
def setUp(self):
super(TestDeferredRegistry, self).setUp()
self.registry = DeferredJobRegistry(connection=self.testconn)
def test_add(self):
"""Adding a job to DeferredJobsRegistry."""
job = Job()
self.registry.add(job)
job_ids = [as_text(job_id) for job_id in
self.testconn.zrange(self.registry.key, 0, -1)]
self.assertEqual(job_ids, [job.id])

@ -5,17 +5,17 @@ from __future__ import (absolute_import, division, print_function,
import os
from time import sleep
from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import suspend, resume
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, say_hello, say_pid, do_nothing)
div_by_zero, do_nothing, say_hello, say_pid)
from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker
from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
class CustomJob(Job):
pass
@ -334,7 +334,7 @@ class TestWorker(RQTestCase):
raise
q = Queue()
job = q.enqueue(create_file, SENTINEL_FILE)
q.enqueue(create_file, SENTINEL_FILE)
w = Worker([q])

Loading…
Cancel
Save