diff --git a/.deepsource.toml b/.deepsource.toml new file mode 100644 index 0000000..def09e2 --- /dev/null +++ b/.deepsource.toml @@ -0,0 +1,12 @@ +version = 1 + +test_patterns = ["tests/**"] + +exclude_patterns = ["examples/**"] + +[[analyzers]] +name = "python" +enabled = true + + [analyzers.meta] + runtime_version = "3.x.x" \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index 5b71d17..3204f99 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,15 @@ +### RQ 1.4.1 (2020-05-16) +* Default serializer now uses `pickle.HIGHEST_PROTOCOL` for backward compatibility reasons. Thanks @bbayles! +* Avoid deprecation warnings on redis-py >= 3.5.0. Thanks @bbayles! + +### RQ 1.4.0 (2020-05-13) +* Custom serializer is now supported. Thanks @solababs! +* `delay()` now accepts `job_id` argument. Thanks @grayshirt! +* Fixed a bug that may cause early termination of scheduled or requeued jobs. Thanks @rmartin48! +* When a job is scheduled, always add queue name to a set containing active RQ queue names. Thanks @mdawar! +* Added `--sentry-ca-certs` and `--sentry-debug` parameters to `rq worker` CLI. Thanks @kichawa! +* Jobs cleaned up by `StartedJobRegistry` are given an exception info. Thanks @selwin! + ### RQ 1.3.0 (2020-03-09) * Support for infinite job timeout. Thanks @theY4Kman! * Added `__main__` file so you can now do `python -m rq.cli`. Thanks @bbayles! diff --git a/MANIFEST.in b/MANIFEST.in index 537813f..bba9d9a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ include LICENSE +include *.toml recursive-exclude tests * diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md index 11fb7f1..7ecff03 100644 --- a/docs/docs/scheduling.md +++ b/docs/docs/scheduling.md @@ -53,7 +53,7 @@ from somewhere import say_hello queue = Queue(name='default', connection=Redis()) # Schedules job to be run in 10 seconds -job = queue.enqueue_at(timedelta(seconds=10), say_hello) +job = queue.enqueue_in(timedelta(seconds=10), say_hello) ``` Jobs that are scheduled for execution are not placed in the queue, but they are diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index d27fbb0..8d3357c 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -143,7 +143,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): queue_dict[queue] = worker_class.all(queue=queue) if queue_dict: - max_length = max([len(q.name) for q, in queue_dict.keys()]) + max_length = max(len(q.name) for q, in queue_dict.keys()) else: max_length = 0 diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 3f8b3aa..d62c89c 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -103,4 +103,13 @@ except ImportError: def dst(self, dt): return timedelta(0) - utc = UTC() \ No newline at end of file + utc = UTC() + + +def hmset(pipe_or_connection, name, mapping): + # redis-py versions 3.5.0 and above accept a mapping parameter for hset + try: + return pipe_or_connection.hset(name, mapping=mapping) + # earlier versions require hmset to be used + except TypeError: + return pipe_or_connection.hmset(name, mapping) diff --git a/rq/decorators.py b/rq/decorators.py index a84289f..e8c1f37 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -51,6 +51,7 @@ class job(object): # noqa queue = self.queue depends_on = kwargs.pop('depends_on', None) + job_id = kwargs.pop('job_id', None) at_front = kwargs.pop('at_front', False) if not depends_on: @@ -61,7 +62,7 @@ class job(object): # noqa return queue.enqueue_call(f, args=args, kwargs=kwargs, timeout=self.timeout, result_ttl=self.result_ttl, - ttl=self.ttl, depends_on=depends_on, at_front=at_front, + ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front, meta=self.meta, description=self.description, failure_ttl=self.failure_ttl) f.delay = delay return f diff --git a/rq/job.py b/rq/job.py index f0f28ec..c7a67d1 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings +import zlib + from functools import partial from uuid import uuid4 -import zlib - -from rq.compat import as_text, decode_redis_hash, string_types, text_type +from rq.compat import (as_text, decode_redis_hash, hmset, string_types, + text_type) from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -557,7 +558,7 @@ class Job(object): key = self.key connection = pipeline if pipeline is not None else self.connection - connection.hmset(key, self.to_dict(include_meta=include_meta)) + hmset(connection, key, self.to_dict(include_meta=include_meta)) def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" diff --git a/rq/queue.py b/rq/queue.py index bfaa0fa..0b7afd7 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -191,25 +191,25 @@ class Queue(object): @property def started_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's StartedJobRegistry.""" from rq.registry import StartedJobRegistry return StartedJobRegistry(queue=self, job_class=self.job_class) @property def finished_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's FinishedJobRegistry.""" from rq.registry import FinishedJobRegistry return FinishedJobRegistry(queue=self) @property def deferred_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's DeferredJobRegistry.""" from rq.registry import DeferredJobRegistry return DeferredJobRegistry(queue=self, job_class=self.job_class) @property def scheduled_job_registry(self): - """Returns this queue's FailedJobRegistry.""" + """Returns this queue's ScheduledJobRegistry.""" from rq.registry import ScheduledJobRegistry return ScheduledJobRegistry(queue=self, job_class=self.job_class) @@ -400,6 +400,8 @@ nd registry = ScheduledJobRegistry(queue=self) with self.connection.pipeline() as pipeline: + # Add Queue key set + pipeline.sadd(self.redis_queues_keys, self.key) job.save(pipeline=pipeline) registry.schedule(job, datetime, pipeline=pipeline) pipeline.execute() diff --git a/rq/scheduler.py b/rq/scheduler.py index f269ea9..cc1b999 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -40,7 +40,7 @@ class RQScheduler(object): def __init__(self, queues, connection, interval=1): self._queue_names = set(parse_names(queues)) - self._acquired_locks = set([]) + self._acquired_locks = set() self._scheduled_job_registries = [] self.lock_acquisition_time = None self.connection = connection @@ -68,7 +68,7 @@ class RQScheduler(object): def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" - successful_locks = set([]) + successful_locks = set() pid = os.getpid() logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: diff --git a/rq/serializers.py b/rq/serializers.py index c4b0e54..27f892f 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,9 +1,15 @@ +from functools import partial import pickle from .compat import string_types from .utils import import_attribute +class DefaultSerializer: + dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) + loads = pickle.loads + + def resolve_serializer(serializer): """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer @@ -11,7 +17,7 @@ def resolve_serializer(serializer): Also accepts a string path to serializer that will be loaded as the serializer """ if not serializer: - return pickle + return DefaultSerializer if isinstance(serializer, string_types): serializer = import_attribute(serializer) diff --git a/rq/version.py b/rq/version.py index f75766e..4b20c03 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.3.0' +VERSION = '1.4.1' diff --git a/rq/worker.py b/rq/worker.py index 7e090f5..91ffc23 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ except ImportError: from redis import WatchError from . import worker_registration -from .compat import PY2, as_text, string_types, text_type +from .compat import PY2, as_text, hmset, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import (DEFAULT_RESULT_TTL, @@ -268,7 +268,7 @@ class Worker(object): now = utcnow() now_in_string = utcformat(now) self.birth_date = now - p.hmset(key, { + hmset(p, key, mapping={ 'birth': now_in_string, 'last_heartbeat': now_in_string, 'queues': queues, @@ -680,7 +680,7 @@ class Worker(object): """ ret_val = None - job.started_at = job.started_at or utcnow() + job.started_at = utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -689,7 +689,7 @@ class Worker(object): except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 5) + self.heartbeat(self.job_monitoring_interval + 30) # Kill the job from this side if something is really wrong (interpreter lock/etc). if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): @@ -747,15 +747,10 @@ class Worker(object): # that are different from the worker. random.seed() - try: - self.setup_work_horse_signals() - self._is_horse = True - self.log = logger - self.perform_job(job, queue) - except Exception as e: # noqa - # Horse does not terminate properly - raise e - os._exit(1) + self.setup_work_horse_signals() + self._is_horse = True + self.log = logger + self.perform_job(job, queue) # os._exit() is the way to exit from childs after a fork(), in # contrast to the regular sys.exit() diff --git a/setup.py b/setup.py index 8df1b72..c35e40d 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ setup( 'redis >= 3.0.0', 'click >= 5.0' ], - python_requires='>=2.7', + python_requires='>=3.4', entry_points={ 'console_scripts': [ 'rq = rq.cli:main', @@ -65,8 +65,6 @@ setup( 'Operating System :: MacOS', 'Operating System :: Unix', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 58d093f..1e3e671 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -4,10 +4,11 @@ from __future__ import (absolute_import, division, print_function, import json import pickle +import pickletools import queue import unittest -from rq.serializers import resolve_serializer +from rq.serializers import DefaultSerializer, resolve_serializer class TestSerializers(unittest.TestCase): @@ -15,7 +16,16 @@ class TestSerializers(unittest.TestCase): """Ensure function resolve_serializer works correctly""" serializer = resolve_serializer(None) self.assertIsNotNone(serializer) - self.assertEqual(serializer, pickle) + self.assertEqual(serializer, DefaultSerializer) + + # Test round trip with default serializer + test_data = {'test': 'data'} + serialized_data = serializer.dumps(test_data) + self.assertEqual(serializer.loads(serialized_data), test_data) + self.assertEqual( + next(pickletools.genops(serialized_data))[1], + pickle.HIGHEST_PROTOCOL + ) # Test using json serializer serializer = resolve_serializer(json)