Merge remote-tracking branch 'origin/master' into multi-dependencies

main
Selwin Ong 5 years ago
commit 21bf5890c0

@ -0,0 +1,12 @@
version = 1
test_patterns = ["tests/**"]
exclude_patterns = ["examples/**"]
[[analyzers]]
name = "python"
enabled = true
[analyzers.meta]
runtime_version = "3.x.x"

@ -1,3 +1,15 @@
### RQ 1.4.1 (2020-05-16)
* Default serializer now uses `pickle.HIGHEST_PROTOCOL` for backward compatibility reasons. Thanks @bbayles!
* Avoid deprecation warnings on redis-py >= 3.5.0. Thanks @bbayles!
### RQ 1.4.0 (2020-05-13)
* Custom serializer is now supported. Thanks @solababs!
* `delay()` now accepts `job_id` argument. Thanks @grayshirt!
* Fixed a bug that may cause early termination of scheduled or requeued jobs. Thanks @rmartin48!
* When a job is scheduled, always add queue name to a set containing active RQ queue names. Thanks @mdawar!
* Added `--sentry-ca-certs` and `--sentry-debug` parameters to `rq worker` CLI. Thanks @kichawa!
* Jobs cleaned up by `StartedJobRegistry` are given an exception info. Thanks @selwin!
### RQ 1.3.0 (2020-03-09) ### RQ 1.3.0 (2020-03-09)
* Support for infinite job timeout. Thanks @theY4Kman! * Support for infinite job timeout. Thanks @theY4Kman!
* Added `__main__` file so you can now do `python -m rq.cli`. Thanks @bbayles! * Added `__main__` file so you can now do `python -m rq.cli`. Thanks @bbayles!

@ -1,2 +1,3 @@
include LICENSE include LICENSE
include *.toml
recursive-exclude tests * recursive-exclude tests *

@ -53,7 +53,7 @@ from somewhere import say_hello
queue = Queue(name='default', connection=Redis()) queue = Queue(name='default', connection=Redis())
# Schedules job to be run in 10 seconds # Schedules job to be run in 10 seconds
job = queue.enqueue_at(timedelta(seconds=10), say_hello) job = queue.enqueue_in(timedelta(seconds=10), say_hello)
``` ```
Jobs that are scheduled for execution are not placed in the queue, but they are Jobs that are scheduled for execution are not placed in the queue, but they are

@ -143,7 +143,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
queue_dict[queue] = worker_class.all(queue=queue) queue_dict[queue] = worker_class.all(queue=queue)
if queue_dict: if queue_dict:
max_length = max([len(q.name) for q, in queue_dict.keys()]) max_length = max(len(q.name) for q, in queue_dict.keys())
else: else:
max_length = 0 max_length = 0

@ -104,3 +104,12 @@ except ImportError:
return timedelta(0) return timedelta(0)
utc = UTC() utc = UTC()
def hmset(pipe_or_connection, name, mapping):
# redis-py versions 3.5.0 and above accept a mapping parameter for hset
try:
return pipe_or_connection.hset(name, mapping=mapping)
# earlier versions require hmset to be used
except TypeError:
return pipe_or_connection.hmset(name, mapping)

@ -51,6 +51,7 @@ class job(object): # noqa
queue = self.queue queue = self.queue
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False) at_front = kwargs.pop('at_front', False)
if not depends_on: if not depends_on:
@ -61,7 +62,7 @@ class job(object): # noqa
return queue.enqueue_call(f, args=args, kwargs=kwargs, return queue.enqueue_call(f, args=args, kwargs=kwargs,
timeout=self.timeout, result_ttl=self.result_ttl, timeout=self.timeout, result_ttl=self.result_ttl,
ttl=self.ttl, depends_on=depends_on, at_front=at_front, ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front,
meta=self.meta, description=self.description, failure_ttl=self.failure_ttl) meta=self.meta, description=self.description, failure_ttl=self.failure_ttl)
f.delay = delay f.delay = delay
return f return f

@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function,
import inspect import inspect
import warnings import warnings
import zlib
from functools import partial from functools import partial
from uuid import uuid4 from uuid import uuid4
import zlib from rq.compat import (as_text, decode_redis_hash, hmset, string_types,
text_type)
from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError from .exceptions import NoSuchJobError
from .local import LocalStack from .local import LocalStack
@ -557,7 +558,7 @@ class Job(object):
key = self.key key = self.key
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict(include_meta=include_meta)) hmset(connection, key, self.to_dict(include_meta=include_meta))
def save_meta(self): def save_meta(self):
"""Stores job meta from the job instance to the corresponding Redis key.""" """Stores job meta from the job instance to the corresponding Redis key."""

@ -191,25 +191,25 @@ class Queue(object):
@property @property
def started_job_registry(self): def started_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class) return StartedJobRegistry(queue=self, job_class=self.job_class)
@property @property
def finished_job_registry(self): def finished_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry from rq.registry import FinishedJobRegistry
return FinishedJobRegistry(queue=self) return FinishedJobRegistry(queue=self)
@property @property
def deferred_job_registry(self): def deferred_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class) return DeferredJobRegistry(queue=self, job_class=self.job_class)
@property @property
def scheduled_job_registry(self): def scheduled_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class) return ScheduledJobRegistry(queue=self, job_class=self.job_class)
@ -400,6 +400,8 @@ nd
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
# Add Queue key set
pipeline.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline) registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute() pipeline.execute()

@ -40,7 +40,7 @@ class RQScheduler(object):
def __init__(self, queues, connection, interval=1): def __init__(self, queues, connection, interval=1):
self._queue_names = set(parse_names(queues)) self._queue_names = set(parse_names(queues))
self._acquired_locks = set([]) self._acquired_locks = set()
self._scheduled_job_registries = [] self._scheduled_job_registries = []
self.lock_acquisition_time = None self.lock_acquisition_time = None
self.connection = connection self.connection = connection
@ -68,7 +68,7 @@ class RQScheduler(object):
def acquire_locks(self, auto_start=False): def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on""" """Returns names of queue it successfully acquires lock on"""
successful_locks = set([]) successful_locks = set()
pid = os.getpid() pid = os.getpid()
logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names: for name in self._queue_names:

@ -1,9 +1,15 @@
from functools import partial
import pickle import pickle
from .compat import string_types from .compat import string_types
from .utils import import_attribute from .utils import import_attribute
class DefaultSerializer:
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
def resolve_serializer(serializer): def resolve_serializer(serializer):
"""This function checks the user defined serializer for ('dumps', 'loads') methods """This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer It returns a default pickle serializer if not found else it returns a MySerializer
@ -11,7 +17,7 @@ def resolve_serializer(serializer):
Also accepts a string path to serializer that will be loaded as the serializer Also accepts a string path to serializer that will be loaded as the serializer
""" """
if not serializer: if not serializer:
return pickle return DefaultSerializer
if isinstance(serializer, string_types): if isinstance(serializer, string_types):
serializer = import_attribute(serializer) serializer = import_attribute(serializer)

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '1.3.0' VERSION = '1.4.1'

@ -23,7 +23,7 @@ except ImportError:
from redis import WatchError from redis import WatchError
from . import worker_registration from . import worker_registration
from .compat import PY2, as_text, string_types, text_type from .compat import PY2, as_text, hmset, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL, from .defaults import (DEFAULT_RESULT_TTL,
@ -268,7 +268,7 @@ class Worker(object):
now = utcnow() now = utcnow()
now_in_string = utcformat(now) now_in_string = utcformat(now)
self.birth_date = now self.birth_date = now
p.hmset(key, { hmset(p, key, mapping={
'birth': now_in_string, 'birth': now_in_string,
'last_heartbeat': now_in_string, 'last_heartbeat': now_in_string,
'queues': queues, 'queues': queues,
@ -680,7 +680,7 @@ class Worker(object):
""" """
ret_val = None ret_val = None
job.started_at = job.started_at or utcnow() job.started_at = utcnow()
while True: while True:
try: try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
@ -689,7 +689,7 @@ class Worker(object):
except HorseMonitorTimeoutException: except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running. # Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive. # Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5) self.heartbeat(self.job_monitoring_interval + 30)
# Kill the job from this side if something is really wrong (interpreter lock/etc). # Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1):
@ -747,15 +747,10 @@ class Worker(object):
# that are different from the worker. # that are different from the worker.
random.seed() random.seed()
try: self.setup_work_horse_signals()
self.setup_work_horse_signals() self._is_horse = True
self._is_horse = True self.log = logger
self.log = logger self.perform_job(job, queue)
self.perform_job(job, queue)
except Exception as e: # noqa
# Horse does not terminate properly
raise e
os._exit(1)
# os._exit() is the way to exit from childs after a fork(), in # os._exit() is the way to exit from childs after a fork(), in
# contrast to the regular sys.exit() # contrast to the regular sys.exit()

@ -35,7 +35,7 @@ setup(
'redis >= 3.0.0', 'redis >= 3.0.0',
'click >= 5.0' 'click >= 5.0'
], ],
python_requires='>=2.7', python_requires='>=3.4',
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'rq = rq.cli:main', 'rq = rq.cli:main',
@ -65,8 +65,6 @@ setup(
'Operating System :: MacOS', 'Operating System :: MacOS',
'Operating System :: Unix', 'Operating System :: Unix',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',

@ -4,10 +4,11 @@ from __future__ import (absolute_import, division, print_function,
import json import json
import pickle import pickle
import pickletools
import queue import queue
import unittest import unittest
from rq.serializers import resolve_serializer from rq.serializers import DefaultSerializer, resolve_serializer
class TestSerializers(unittest.TestCase): class TestSerializers(unittest.TestCase):
@ -15,7 +16,16 @@ class TestSerializers(unittest.TestCase):
"""Ensure function resolve_serializer works correctly""" """Ensure function resolve_serializer works correctly"""
serializer = resolve_serializer(None) serializer = resolve_serializer(None)
self.assertIsNotNone(serializer) self.assertIsNotNone(serializer)
self.assertEqual(serializer, pickle) self.assertEqual(serializer, DefaultSerializer)
# Test round trip with default serializer
test_data = {'test': 'data'}
serialized_data = serializer.dumps(test_data)
self.assertEqual(serializer.loads(serialized_data), test_data)
self.assertEqual(
next(pickletools.genops(serialized_data))[1],
pickle.HIGHEST_PROTOCOL
)
# Test using json serializer # Test using json serializer
serializer = resolve_serializer(json) serializer = resolve_serializer(json)

Loading…
Cancel
Save