From eb92d688a8fc81ac56fb101606f8f2458460f753 Mon Sep 17 00:00:00 2001 From: Pierre Mdawar Date: Fri, 24 Apr 2020 04:06:36 +0300 Subject: [PATCH 01/12] Add the queue to the Redis queues set when scheduling a job (#1238) * Add the queue to the queues set when scheduling a job * Fix the registry properties docstrings --- rq/queue.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 4316653..0aebe8f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -192,25 +192,25 @@ class Queue(object): @property def started_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's StartedJobRegistry.""" from rq.registry import StartedJobRegistry return StartedJobRegistry(queue=self, job_class=self.job_class) @property def finished_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's FinishedJobRegistry.""" from rq.registry import FinishedJobRegistry return FinishedJobRegistry(queue=self) @property def deferred_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's DeferredJobRegistry.""" from rq.registry import DeferredJobRegistry return DeferredJobRegistry(queue=self, job_class=self.job_class) @property def scheduled_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's ScheduledJobRegistry.""" from rq.registry import ScheduledJobRegistry return ScheduledJobRegistry(queue=self, job_class=self.job_class) @@ -380,7 +380,7 @@ class Queue(object): (f, timeout, description, result_ttl, ttl, failure_ttl, depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) - + return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, @@ -401,6 +401,8 @@ class Queue(object): registry = ScheduledJobRegistry(queue=self) with self.connection.pipeline() as pipeline: + # Add Queue key set + pipeline.sadd(self.redis_queues_keys, self.key) job.save(pipeline=pipeline) registry.schedule(job, datetime, pipeline=pipeline) pipeline.execute() From b2be17417f529f51ec079f812762bc0f67526969 Mon Sep 17 00:00:00 2001 From: Prajjwal Nijhara Date: Sun, 3 May 2020 16:05:01 +0530 Subject: [PATCH 02/12] Fix some code quality issues (#1235) --- .deepsource.toml | 12 ++++++++++++ MANIFEST.in | 1 + rq/cli/helpers.py | 2 +- rq/scheduler.py | 4 ++-- 4 files changed, 16 insertions(+), 3 deletions(-) create mode 100644 .deepsource.toml diff --git a/.deepsource.toml b/.deepsource.toml new file mode 100644 index 0000000..def09e2 --- /dev/null +++ b/.deepsource.toml @@ -0,0 +1,12 @@ +version = 1 + +test_patterns = ["tests/**"] + +exclude_patterns = ["examples/**"] + +[[analyzers]] +name = "python" +enabled = true + + [analyzers.meta] + runtime_version = "3.x.x" \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in index 537813f..bba9d9a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ include LICENSE +include *.toml recursive-exclude tests * diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index d27fbb0..8d3357c 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -143,7 +143,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): queue_dict[queue] = worker_class.all(queue=queue) if queue_dict: - max_length = max([len(q.name) for q, in queue_dict.keys()]) + max_length = max(len(q.name) for q, in queue_dict.keys()) else: max_length = 0 diff --git a/rq/scheduler.py b/rq/scheduler.py index f269ea9..cc1b999 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -40,7 +40,7 @@ class RQScheduler(object): def __init__(self, queues, connection, interval=1): self._queue_names = set(parse_names(queues)) - self._acquired_locks = set([]) + self._acquired_locks = set() self._scheduled_job_registries = [] self.lock_acquisition_time = None self.connection = connection @@ -68,7 +68,7 @@ class RQScheduler(object): def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" - successful_locks = set([]) + successful_locks = set() pid = os.getpid() logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: From a922a553cb70e66b740934c86f97529328d54f68 Mon Sep 17 00:00:00 2001 From: rmartin48 Date: Sun, 10 May 2020 20:34:53 +1000 Subject: [PATCH 03/12] Always set job.started_at in monitor_work_horse (#1242) Co-authored-by: Russell Martin --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 7e090f5..d192462 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -680,7 +680,7 @@ class Worker(object): """ ret_val = None - job.started_at = job.started_at or utcnow() + job.started_at = utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): From 9fb80424efe35b615d279c2a5c330e2166e32c71 Mon Sep 17 00:00:00 2001 From: Vincent Jacques Date: Sun, 10 May 2020 12:35:21 +0200 Subject: [PATCH 04/12] Fix typo in scheduling doc (#1245) --- docs/docs/scheduling.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md index 11fb7f1..7ecff03 100644 --- a/docs/docs/scheduling.md +++ b/docs/docs/scheduling.md @@ -53,7 +53,7 @@ from somewhere import say_hello queue = Queue(name='default', connection=Redis()) # Schedules job to be run in 10 seconds -job = queue.enqueue_at(timedelta(seconds=10), say_hello) +job = queue.enqueue_in(timedelta(seconds=10), say_hello) ``` Jobs that are scheduled for execution are not placed in the queue, but they are From d8dea02081bde62c39dbcb7190b9245b468e657b Mon Sep 17 00:00:00 2001 From: grayshirt Date: Sun, 10 May 2020 05:35:52 -0500 Subject: [PATCH 05/12] Parse job_id as keyword argument to delay() (#1236) (#1243) --- rq/decorators.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rq/decorators.py b/rq/decorators.py index a84289f..e8c1f37 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -51,6 +51,7 @@ class job(object): # noqa queue = self.queue depends_on = kwargs.pop('depends_on', None) + job_id = kwargs.pop('job_id', None) at_front = kwargs.pop('at_front', False) if not depends_on: @@ -61,7 +62,7 @@ class job(object): # noqa return queue.enqueue_call(f, args=args, kwargs=kwargs, timeout=self.timeout, result_ttl=self.result_ttl, - ttl=self.ttl, depends_on=depends_on, at_front=at_front, + ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front, meta=self.meta, description=self.description, failure_ttl=self.failure_ttl) f.delay = delay return f From 9d6f38df0e7629962004f82021e0ed45dd488e2c Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 10 May 2020 17:40:07 +0700 Subject: [PATCH 06/12] Slightly increase job key timeout in monitor_work_horse() --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index d192462..d4c423d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -689,7 +689,7 @@ class Worker(object): except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 5) + self.heartbeat(self.job_monitoring_interval + 30) # Kill the job from this side if something is really wrong (interpreter lock/etc). if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): From 6ab6a0a57319cf496bb55028cd0c152692f6ae55 Mon Sep 17 00:00:00 2001 From: Michael Angeletti Date: Tue, 12 May 2020 20:55:11 -0400 Subject: [PATCH 07/12] Remove extraneous try/except (#1247) The exception handling block was raising the caught exception in-place, which caused the original traceback info to be lost. Rather than replace `raise e` with `raise`, I simply removed the whole try / except, since no action was being taken in the except block. --- rq/worker.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index d4c423d..2ebfcec 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -747,15 +747,10 @@ class Worker(object): # that are different from the worker. random.seed() - try: - self.setup_work_horse_signals() - self._is_horse = True - self.log = logger - self.perform_job(job, queue) - except Exception as e: # noqa - # Horse does not terminate properly - raise e - os._exit(1) + self.setup_work_horse_signals() + self._is_horse = True + self.log = logger + self.perform_job(job, queue) # os._exit() is the way to exit from childs after a fork(), in # contrast to the regular sys.exit() From 02eb983e90aeb8e878e5c0f9989b7b701b8ad21a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 13 May 2020 08:10:29 +0700 Subject: [PATCH 08/12] Bump version to 1.4.0 --- CHANGES.md | 8 ++++++++ rq/version.py | 2 +- setup.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5b71d17..3dd3076 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,11 @@ +### RQ 1.4.0 (2020-05-13) +* Custom serializer is now supported. Thanks @solababs! +* `delay()` now accepts `job_id` argument. Thanks @grayshirt! +* Fixed a bug that may cause early termination of scheduled or requeued jobs. Thanks @rmartin48! +* When a job is scheduled, always add queue name to a set containing active RQ queue names. Thanks @mdawar! +* Added `--sentry-ca-certs` and `--sentry-debug` parameters to `rq worker` CLI. Thanks @kichawa! +* Jobs cleaned up by `StartedJobRegistry` are given an exception info. Thanks @selwin! + ### RQ 1.3.0 (2020-03-09) * Support for infinite job timeout. Thanks @theY4Kman! * Added `__main__` file so you can now do `python -m rq.cli`. Thanks @bbayles! diff --git a/rq/version.py b/rq/version.py index f75766e..8199df9 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.3.0' +VERSION = '1.4.0' diff --git a/setup.py b/setup.py index 8df1b72..046b3a8 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ setup( 'redis >= 3.0.0', 'click >= 5.0' ], - python_requires='>=2.7', + python_requires='>=3.4', entry_points={ 'console_scripts': [ 'rq = rq.cli:main', From 5859339a51e5f5d902d1d07603f41d95008e1065 Mon Sep 17 00:00:00 2001 From: Bo Bayles Date: Fri, 15 May 2020 19:35:08 -0500 Subject: [PATCH 09/12] Avoid deprecation warnings on redis-py 3.5.0 hmset (#1253) --- rq/compat/__init__.py | 11 ++++++++++- rq/job.py | 5 +++-- rq/worker.py | 4 ++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 3f8b3aa..d62c89c 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -103,4 +103,13 @@ except ImportError: def dst(self, dt): return timedelta(0) - utc = UTC() \ No newline at end of file + utc = UTC() + + +def hmset(pipe_or_connection, name, mapping): + # redis-py versions 3.5.0 and above accept a mapping parameter for hset + try: + return pipe_or_connection.hset(name, mapping=mapping) + # earlier versions require hmset to be used + except TypeError: + return pipe_or_connection.hmset(name, mapping) diff --git a/rq/job.py b/rq/job.py index 15e706b..2c1251d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -7,7 +7,8 @@ import warnings import zlib from uuid import uuid4 -from rq.compat import as_text, decode_redis_hash, string_types, text_type +from rq.compat import (as_text, decode_redis_hash, hmset, string_types, + text_type) from .connections import resolve_connection from .exceptions import NoSuchJobError @@ -547,7 +548,7 @@ class Job(object): key = self.key connection = pipeline if pipeline is not None else self.connection - connection.hmset(key, self.to_dict(include_meta=include_meta)) + hmset(connection, key, self.to_dict(include_meta=include_meta)) def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" diff --git a/rq/worker.py b/rq/worker.py index 2ebfcec..91ffc23 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ except ImportError: from redis import WatchError from . import worker_registration -from .compat import PY2, as_text, string_types, text_type +from .compat import PY2, as_text, hmset, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import (DEFAULT_RESULT_TTL, @@ -268,7 +268,7 @@ class Worker(object): now = utcnow() now_in_string = utcformat(now) self.birth_date = now - p.hmset(key, { + hmset(p, key, mapping={ 'birth': now_in_string, 'last_heartbeat': now_in_string, 'queues': queues, From f0846a764545228d0551ec51c4c7441f2aa2a65a Mon Sep 17 00:00:00 2001 From: Bo Bayles Date: Sat, 16 May 2020 05:42:07 -0500 Subject: [PATCH 10/12] Use pickle.HIGHEST_PROTOCOL by default (#1254) --- rq/serializers.py | 8 +++++++- tests/test_serializers.py | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/rq/serializers.py b/rq/serializers.py index c4b0e54..27f892f 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,9 +1,15 @@ +from functools import partial import pickle from .compat import string_types from .utils import import_attribute +class DefaultSerializer: + dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) + loads = pickle.loads + + def resolve_serializer(serializer): """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer @@ -11,7 +17,7 @@ def resolve_serializer(serializer): Also accepts a string path to serializer that will be loaded as the serializer """ if not serializer: - return pickle + return DefaultSerializer if isinstance(serializer, string_types): serializer = import_attribute(serializer) diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 58d093f..1e3e671 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -4,10 +4,11 @@ from __future__ import (absolute_import, division, print_function, import json import pickle +import pickletools import queue import unittest -from rq.serializers import resolve_serializer +from rq.serializers import DefaultSerializer, resolve_serializer class TestSerializers(unittest.TestCase): @@ -15,7 +16,16 @@ class TestSerializers(unittest.TestCase): """Ensure function resolve_serializer works correctly""" serializer = resolve_serializer(None) self.assertIsNotNone(serializer) - self.assertEqual(serializer, pickle) + self.assertEqual(serializer, DefaultSerializer) + + # Test round trip with default serializer + test_data = {'test': 'data'} + serialized_data = serializer.dumps(test_data) + self.assertEqual(serializer.loads(serialized_data), test_data) + self.assertEqual( + next(pickletools.genops(serialized_data))[1], + pickle.HIGHEST_PROTOCOL + ) # Test using json serializer serializer = resolve_serializer(json) From 4d450ec94d4c09f28723af20aa1ca88bc109738a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 16 May 2020 17:46:55 +0700 Subject: [PATCH 11/12] Bump version to 1.4.1 --- CHANGES.md | 4 ++++ rq/version.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 3dd3076..3204f99 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,7 @@ +### RQ 1.4.1 (2020-05-16) +* Default serializer now uses `pickle.HIGHEST_PROTOCOL` for backward compatibility reasons. Thanks @bbayles! +* Avoid deprecation warnings on redis-py >= 3.5.0. Thanks @bbayles! + ### RQ 1.4.0 (2020-05-13) * Custom serializer is now supported. Thanks @solababs! * `delay()` now accepts `job_id` argument. Thanks @grayshirt! diff --git a/rq/version.py b/rq/version.py index 8199df9..4b20c03 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.4.0' +VERSION = '1.4.1' From 52e426f40ffee046f06269fc2e7593707230e9f6 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 16 May 2020 20:41:36 +0700 Subject: [PATCH 12/12] Remove Python 2.7 from setup.py --- setup.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/setup.py b/setup.py index 046b3a8..c35e40d 100644 --- a/setup.py +++ b/setup.py @@ -65,8 +65,6 @@ setup( 'Operating System :: MacOS', 'Operating System :: Unix', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5',