From e6c32edad9da044b66deb9d9a612f4acb02e2090 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Fri, 19 Dec 2014 14:17:17 -0500 Subject: [PATCH 01/14] add birth_date and death_date properties to Worker --- rq/worker.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 7283d56..73347c7 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,7 +21,7 @@ from .job import Job, Status from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty -from .utils import import_attribute, make_colorizer, utcformat, utcnow +from .utils import import_attribute, make_colorizer, utcformat, utcnow, utcparse from .version import VERSION from .registry import FinishedJobRegistry, StartedJobRegistry @@ -235,6 +235,21 @@ class Worker(object): p.expire(self.key, 60) p.execute() + @property + def birth_date(self): + """Fetches birth date from Redis.""" + birth_timestamp = self.connection.hget(self.key, 'birth') + if birth_timestamp: + return utcparse(birth_timestamp) + + @property + def death_date(self): + """Fetches death date from Redis.""" + death_timestamp = self.connection.hget(self.key, 'death') + if death_timestamp: + return utcparse(death_timestamp) + + def set_state(self, state, pipeline=None): self._state = state connection = pipeline if pipeline is not None else self.connection From 008d72ff09e77e8186b81955ebac9509fd85b701 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 30 Jan 2015 14:39:42 +0700 Subject: [PATCH 02/14] Documented new features in 0.5.0. --- CHANGES.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 9097cb9..12f273a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,25 @@ +### 0.5.0 +(Jan 30th, 2015) + +- RQ workers can now be paused and resumed using `rq suspend` and + `rq resume` commands. Thanks Jonathan Tushman! +- Jobs that are being performed are now stored in `StartedJobRegistry` + for monitoring purposes. This also prevents currently active jobs from + being orphaned/lost in the case of hard shutdowns. +- You can now monitor finished jobs by checking `FinishedJobRegistry`. + Thanks Nic Cope for helping! +- Jobs with unmet dependencies are now created with `deferred` as their + status. You can monitor deferred jobs by checking `DeferredJobRegistry`. +- It is now possible to enqueue a job at the beginning of queue using + `queue.enqueue(func, at_front=True)`. Thanks Travis Johnson! +- Command line scripts have all been refactored to use `click`. Thanks Lyon Zhang! +- Added a new `SimpleWorker` that does not fork when executing jobs. + Useful for testing purposes. Thanks Cal Leeming! +- Added `--queue-class` and `--job-class` arguments to `rqworker` script. + Thanks David Bonner! +- Many other minor bug fixes and enhancements. + + ### 0.4.6 (May 21st, 2014) From e8c3b96a5a9f4f28a3fc8a42eb330b91af627b7f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 30 Jan 2015 09:17:34 +0100 Subject: [PATCH 03/14] Bump to 0.5.0. --- rq/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/version.py b/rq/version.py index 28d738f..30d837a 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1,4 +1,4 @@ # -*- coding: utf-8 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.4.6' +VERSION = '0.5.0' From fac2b10309a7148b13cd259e6770f2695c126628 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 30 Jan 2015 09:22:53 +0100 Subject: [PATCH 04/14] Auto-sort imports using isort. --- rq/cli/helpers.py | 3 +-- rq/job.py | 2 +- rq/queue.py | 11 +++++------ rq/utils.py | 4 ++-- rq/worker.py | 6 +++--- tests/test_job.py | 12 ++++++------ tests/test_queue.py | 8 ++++---- tests/test_worker.py | 14 +++++++------- 8 files changed, 29 insertions(+), 31 deletions(-) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 9ea4b58..0730d17 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -8,9 +8,8 @@ from functools import partial import click from rq import Queue, Worker -from rq.worker import WorkerStatus from rq.logutils import setup_loghandlers -from rq.suspension import is_suspended +from rq.worker import WorkerStatus red = partial(click.style, fg='red') green = partial(click.style, fg='green') diff --git a/rq/job.py b/rq/job.py index d815c5b..981694c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError, UnpickleError from .local import LocalStack -from .utils import import_attribute, utcformat, utcnow, utcparse, enum +from .utils import enum, import_attribute, utcformat, utcnow, utcparse try: import cPickle as pickle diff --git a/rq/queue.py b/rq/queue.py index e2c358c..9815f7e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,15 +4,14 @@ from __future__ import (absolute_import, division, print_function, import uuid -from .connections import resolve_connection -from .job import Job, JobStatus -from .utils import import_attribute, utcnow +from redis import WatchError +from .compat import as_text, string_types, total_ordering +from .connections import resolve_connection from .exceptions import (DequeueTimeout, InvalidJobOperationError, NoSuchJobError, UnpickleError) -from .compat import total_ordering, string_types, as_text - -from redis import WatchError +from .job import Job, JobStatus +from .utils import import_attribute, utcnow def get_failed_queue(connection=None): diff --git a/rq/utils.py b/rq/utils.py index db56020..7bcb392 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -9,12 +9,12 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import calendar -import importlib import datetime +import importlib import logging import sys -from .compat import is_python_version, as_text +from .compat import as_text, is_python_version class _Colorizer(object): diff --git a/rq/worker.py b/rq/worker.py index b971a24..0c53409 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,11 +21,11 @@ from .exceptions import DequeueTimeout, NoQueueError from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue -from .timeouts import UnixSignalDeathPenalty -from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum -from .version import VERSION from .registry import FinishedJobRegistry, StartedJobRegistry from .suspension import is_suspended +from .timeouts import UnixSignalDeathPenalty +from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow +from .version import VERSION try: from procname import setprocname diff --git a/tests/test_job.py b/tests/test_job.py index b7854b3..89adf6d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,18 +4,18 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime +from tests import RQTestCase +from tests.fixtures import (access_self, CallableObject, Number, say_hello, + some_calculation) +from tests.helpers import strip_microseconds + from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job -from rq.registry import DeferredJobRegistry from rq.queue import Queue +from rq.registry import DeferredJobRegistry from rq.utils import utcformat -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation) -from tests.helpers import strip_microseconds - try: from cPickle import loads, dumps except ImportError: diff --git a/tests/test_queue.py b/tests/test_queue.py index 369b3f1..175a1c0 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,16 +2,16 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from tests import RQTestCase +from tests.fixtures import (div_by_zero, echo, Number, say_hello, + some_calculation) + from rq import get_failed_queue, Queue from rq.exceptions import InvalidJobOperationError from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker -from tests import RQTestCase -from tests.fixtures import (div_by_zero, echo, Number, say_hello, - some_calculation) - class CustomJob(Job): pass diff --git a/tests/test_worker.py b/tests/test_worker.py index d3b8f41..8a35d03 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,17 +5,17 @@ from __future__ import (absolute_import, division, print_function, import os from time import sleep -from rq import get_failed_queue, Queue, Worker, SimpleWorker -from rq.compat import as_text -from rq.job import Job, JobStatus -from rq.registry import StartedJobRegistry -from rq.suspension import suspend, resume - from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, say_hello, say_pid, do_nothing) + div_by_zero, do_nothing, say_hello, say_pid) from tests.helpers import strip_microseconds +from rq import get_failed_queue, Queue, SimpleWorker, Worker +from rq.compat import as_text +from rq.job import Job, JobStatus +from rq.registry import StartedJobRegistry +from rq.suspension import resume, suspend + class CustomJob(Job): pass From b8d425b318567073a8343b8f884547b62ced810d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 30 Jan 2015 09:23:07 +0100 Subject: [PATCH 05/14] Various PEP8 fixes. --- rq/queue.py | 1 - rq/suspension.py | 2 +- rq/utils.py | 2 +- rq/version.py | 1 + rq/worker.py | 7 ++----- tests/test_cli.py | 6 ------ tests/test_job.py | 2 +- tests/test_queue.py | 2 +- tests/test_worker.py | 2 +- 9 files changed, 8 insertions(+), 17 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 9815f7e..3f59a50 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -253,7 +253,6 @@ class Queue(object): If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: # Add Queue key set self.connection.sadd(self.redis_queues_keys, self.key) diff --git a/rq/suspension.py b/rq/suspension.py index b734acd..93152b9 100644 --- a/rq/suspension.py +++ b/rq/suspension.py @@ -15,4 +15,4 @@ def suspend(connection, ttl=None): def resume(connection): - return connection.delete(WORKERS_SUSPENDED) \ No newline at end of file + return connection.delete(WORKERS_SUSPENDED) diff --git a/rq/utils.py b/rq/utils.py index 7bcb392..3e44a98 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -217,4 +217,4 @@ def enum(name, *sequential, **named): # On Python 2 type() requires a byte string (which is str() on Python 2). # On Python 3 it does not matter, so we'll use str(), which acts as # a no-op. - return type(str(name), (), values) \ No newline at end of file + return type(str(name), (), values) diff --git a/rq/version.py b/rq/version.py index 30d837a..0d96ce8 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) + VERSION = '0.5.0' diff --git a/rq/worker.py b/rq/worker.py index 0c53409..201dfed 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -12,7 +12,6 @@ import sys import time import traceback import warnings -from datetime import datetime from rq.compat import as_text, string_types, text_type @@ -54,8 +53,8 @@ def compact(l): return [x for x in l if x is not None] _signames = dict((getattr(signal, signame), signame) - for signame in dir(signal) - if signame.startswith('SIG') and '_' not in signame) + for signame in dir(signal) + if signame.startswith('SIG') and '_' not in signame) def signal_name(signum): @@ -367,7 +366,6 @@ class Worker(object): if before_state: self.set_state(before_state) - def work(self, burst=False): """Starts the work loop. @@ -416,7 +414,6 @@ class Worker(object): self.register_death() return did_perform_work - def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() diff --git a/tests/test_cli.py b/tests/test_cli.py index 3977006..f1b7fd4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -101,9 +101,3 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 1) self.assertIn("Duration must be an integer greater than 1", result.output) - - - - - - diff --git a/tests/test_job.py b/tests/test_job.py index 89adf6d..e117ea9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -339,7 +339,7 @@ class TestJob(RQTestCase): job = Job.create(func=say_hello, origin=origin) job._dependency_id = 'id' job.save() - + self.assertEqual(registry.get_job_ids(), []) job.register_dependency() self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) diff --git a/tests/test_queue.py b/tests/test_queue.py index 175a1c0..e4e9253 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -340,7 +340,7 @@ class TestQueue(RQTestCase): self.assertFalse(self.testconn.exists(parent_job.dependents_key)) # DeferredJobRegistry should also be empty - self.assertEqual(registry.get_job_ids(), []) + self.assertEqual(registry.get_job_ids(), []) def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 8a35d03..c47a62e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -334,7 +334,7 @@ class TestWorker(RQTestCase): raise q = Queue() - job = q.enqueue(create_file, SENTINEL_FILE) + q.enqueue(create_file, SENTINEL_FILE) w = Worker([q]) From 1018330cea8ac5e6fe407860f012c6c90e279e62 Mon Sep 17 00:00:00 2001 From: Leonid Shvechikov Date: Sun, 1 Feb 2015 10:37:21 +0300 Subject: [PATCH 06/14] Tiny fix of the indefinite article before "RQ" "An" should be used here because "RQ" sounds like "ar-kiu". --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8b83f68..2b1fa71 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ def count_words_at_url(url): You do use the excellent [requests][r] package, don't you? -Then, create a RQ queue: +Then, create an RQ queue: ```python from rq import Queue, use_connection From af5a8624a69fc3f528a0489984a272ff79a4ef0b Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:05:10 -0500 Subject: [PATCH 07/14] add tests for birth and death worker methods --- tests/test_worker.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index c47a62e..e89f39b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -379,3 +379,23 @@ class TestWorker(RQTestCase): w3 = Worker([q], name="worker1") worker_set = set([w1, w2, w3]) self.assertEquals(len(worker_set), 2) + + def test_worker_sets_birth(self): + """Ensure worker correctly sets worker birth date.""" + q = Queue() + w = Worker([q]) + + birth_date = w.birth_date + self.assertIsNotNone(birth_date) + self.assertEquals(type(birth_date).__name__, 'datetime') + + def test_worker_sets_death(self): + """Ensure worker correctly sets worker birth date.""" + q = Queue() + w = Worker([q]) + + w.register_death() + + death_date = w.death_date + self.assertIsNotNone(death_date) + self.assertEquals(type(death_date).__name__, 'datetime') From 450c5969aa4afd0550c130e12c55213b111a1108 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:12:33 -0500 Subject: [PATCH 08/14] call register birth in the register birth test --- tests/test_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index e89f39b..65edead 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -385,6 +385,8 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) + w.register_birth() + birth_date = w.birth_date self.assertIsNotNone(birth_date) self.assertEquals(type(birth_date).__name__, 'datetime') From a4ca4704ace1c1e027543ecaf1f154ad561016c6 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:17:41 -0500 Subject: [PATCH 09/14] convert redis returned bytes into text/string --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c863b56..6c351c8 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -250,14 +250,14 @@ class Worker(object): """Fetches birth date from Redis.""" birth_timestamp = self.connection.hget(self.key, 'birth') if birth_timestamp: - return utcparse(birth_timestamp) + return utcparse(as_text(birth_timestamp)) @property def death_date(self): """Fetches death date from Redis.""" death_timestamp = self.connection.hget(self.key, 'death') if death_timestamp: - return utcparse(death_timestamp) + return utcparse(as_text(death_timestamp)) def set_state(self, state, pipeline=None): From c155918d6de1a781a3805e6f1e1b6f866c9a3a88 Mon Sep 17 00:00:00 2001 From: glaslos Date: Wed, 18 Feb 2015 15:36:39 +0100 Subject: [PATCH 10/14] lrem only first occurrence of the job id (we expect only one) --- rq/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 3f59a50..988a54e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -142,9 +142,9 @@ class Queue(object): job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id if pipeline is not None: - pipeline.lrem(self.key, 0, job_id) + pipeline.lrem(self.key, 1, job_id) - return self.connection._lrem(self.key, 0, job_id) + return self.connection._lrem(self.key, 1, job_id) def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while From 100522237187d3aef45a54b9bdf4e3ff095fa173 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 23 Feb 2015 17:28:51 +0700 Subject: [PATCH 11/14] Restored the ability to specify connection params in config --- rq/cli/cli.py | 20 ++++++++++++-------- rq/cli/helpers.py | 15 +++++++++++++++ tests/test_helpers.py | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) create mode 100644 tests/test_helpers.py diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 8dd0b2b..d0a5eff 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -16,22 +16,27 @@ from rq import Connection, get_failed_queue, Queue from rq.contrib.legacy import cleanup_ghosts from rq.exceptions import InvalidJobOperationError from rq.utils import import_attribute -from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended +from rq.suspension import (suspend as connection_suspend, + resume as connection_resume, is_suspended) -from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, - show_both, show_queues, show_workers) +from .helpers import (get_redis_from_config, read_config_file, refresh, + setup_loghandlers_from_args, show_both, show_queues, + show_workers) url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.') -config_option = click.option('--config', '-c', help='Module containing RQ settings.') +config_option = click.option('--config', '-c', + help='Module containing RQ settings.') def connect(url, config=None): + if url: + return StrictRedis.from_url(url) + settings = read_config_file(config) if config else {} - url = url or settings.get('REDIS_URL') - return StrictRedis.from_url(url or 'redis://localhost:6379/0') + return get_redis_from_config(settings) @click.group() @@ -148,7 +153,6 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, settings = read_config_file(config) if config else {} # Worker specific default arguments - url = url or settings.get('REDIS_URL') queues = queues or settings.get('QUEUES', ['default']) sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN') @@ -158,7 +162,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, setup_loghandlers_from_args(verbose, quiet) - conn = connect(url) + conn = connect(url, config) cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 0730d17..d5c54a1 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -7,6 +7,8 @@ import time from functools import partial import click +from redis import StrictRedis + from rq import Queue, Worker from rq.logutils import setup_loghandlers from rq.worker import WorkerStatus @@ -24,6 +26,19 @@ def read_config_file(module): if k.upper() == k]) +def get_redis_from_config(settings): + """Returns a StrictRedis instance from a dictionary of settings.""" + if settings.get('REDIS_URL') is not None: + return StrictRedis.from_url(settings['REDIS_URL']) + + return StrictRedis( + host=settings.get('REDIS_HOST', 'localhost'), + port=settings.get('REDIS_PORT', 6379), + db=settings.get('REDIS_DB', 0), + password=settings.get('REDIS_PASSWORD', None), + ) + + def pad(s, pad_to_length): """Pads the given string to the given length.""" return ('%-' + '%ds' % pad_to_length) % (s,) diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..b43f13b --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,41 @@ +from rq.cli.helpers import get_redis_from_config + +from tests import RQTestCase + + +class TestHelpers(RQTestCase): + + def test_get_redis_from_config(self): + """Ensure Redis connection params are properly parsed""" + settings = { + 'REDIS_URL': 'redis://localhost:1/1' + } + + # Ensure REDIS_URL is read + redis = get_redis_from_config(settings) + connection_kwargs = redis.connection_pool.connection_kwargs + self.assertEqual(connection_kwargs['db'], 1) + self.assertEqual(connection_kwargs['port'], 1) + + settings = { + 'REDIS_URL': 'redis://localhost:1/1', + 'REDIS_HOST': 'foo', + 'REDIS_DB': 2, + 'REDIS_PORT': 2, + 'REDIS_PASSWORD': 'bar' + } + + # Ensure REDIS_URL is preferred + redis = get_redis_from_config(settings) + connection_kwargs = redis.connection_pool.connection_kwargs + self.assertEqual(connection_kwargs['db'], 1) + self.assertEqual(connection_kwargs['port'], 1) + + # Ensure fall back to regular connection parameters + settings['REDIS_URL'] = None + redis = get_redis_from_config(settings) + connection_kwargs = redis.connection_pool.connection_kwargs + self.assertEqual(connection_kwargs['host'], 'foo') + self.assertEqual(connection_kwargs['db'], 2) + self.assertEqual(connection_kwargs['port'], 2) + self.assertEqual(connection_kwargs['password'], 'bar') From 219f21b6378bbd66a13e997876c7c7416b27ef46 Mon Sep 17 00:00:00 2001 From: alternativshik Date: Thu, 25 Sep 2014 10:47:17 +0700 Subject: [PATCH 12/14] Allow non-ASCII characters in arguments get_call_string() failed if any arguments contained non-ASCII strings. Fixes #406 --- rq/job.py | 11 +++++++++-- tests/test_job.py | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/rq/job.py b/rq/job.py index 981694c..5dbbcf5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -514,8 +514,15 @@ class Job(object): if self.func_name is None: return None - arg_list = [repr(arg) for arg in self.args] - arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] + # Python 2/3 compatibility + try: + arg_list = [repr(arg).decode('utf-8') for arg in self.args] + except AttributeError: + arg_list = [repr(arg) for arg in self.args] + + kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()] + # Sort here because python 3.3 & 3.4 makes different call_string + arg_list += sorted(kwargs) args = ', '.join(arg_list) return '%s(%s)' % (self.func_name, args) diff --git a/tests/test_job.py b/tests/test_job.py index e117ea9..d2e7f44 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -23,6 +23,26 @@ except ImportError: class TestJob(RQTestCase): + def test_unicode(self): + """Unicode in job description [issue405]""" + job = Job.create( + 'myfunc', + args=[12, "☃"], + kwargs=dict(snowman="☃", null=None), + ) + + try: + # Python 2 + test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') + except AttributeError: + # Python 3 + test_string = "myfunc(12, '☃', null=None, snowman='☃')" + + self.assertEquals( + job.description, + test_string, + ) + def test_create_empty_job(self): """Creation of new empty jobs.""" job = Job() From 3dfd044767706ff6e2d0169a13d237d66e1154a6 Mon Sep 17 00:00:00 2001 From: Taras Semenenko Date: Wed, 25 Feb 2015 14:32:37 +0300 Subject: [PATCH 13/14] Add missed `multi` command after `watch` `watch` command should be used in conjunction with `multi` command which was missed in enqueuing of job with dependencies. Fix #487 --- rq/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/queue.py b/rq/queue.py index 988a54e..f5a8887 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -199,6 +199,7 @@ class Queue(object): try: pipe.watch(depends_on.key) if depends_on.get_status() != JobStatus.FINISHED: + pipe.multi() job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) From 636a537fa7bb4fe31e3c5e30d79e2dfe4a460e45 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 26 Feb 2015 09:41:20 -0500 Subject: [PATCH 14/14] updates addressing @selwin comments for PR #465 --- rq/worker.py | 4 ++-- tests/test_worker.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 6c351c8..a32c6d9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -249,14 +249,14 @@ class Worker(object): def birth_date(self): """Fetches birth date from Redis.""" birth_timestamp = self.connection.hget(self.key, 'birth') - if birth_timestamp: + if birth_timestamp is not None: return utcparse(as_text(birth_timestamp)) @property def death_date(self): """Fetches death date from Redis.""" death_timestamp = self.connection.hget(self.key, 'death') - if death_timestamp: + if death_timestamp is not None: return utcparse(as_text(death_timestamp)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 65edead..6f89f4a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -392,7 +392,7 @@ class TestWorker(RQTestCase): self.assertEquals(type(birth_date).__name__, 'datetime') def test_worker_sets_death(self): - """Ensure worker correctly sets worker birth date.""" + """Ensure worker correctly sets worker death date.""" q = Queue() w = Worker([q])