From f3e924cdd160cb99f138782b0c4a67620184e0a2 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 8 Nov 2020 21:02:21 +0700 Subject: [PATCH] Added job.worker_name (#1375) * Added job.worker_name * Fix compatibility with Redis server 3.x * Document job.worker_name * Removed some Python 2 compatibility stuff. * Remove unused codes --- docs/docs/jobs.md | 1 + rq/compat/__init__.py | 68 +++++++++-------------------------------- rq/job.py | 26 ++++++++++++++-- rq/queue.py | 7 +++-- rq/registry.py | 6 ++-- rq/worker.py | 37 +++++----------------- tests/fixtures.py | 7 ++--- tests/test_cli.py | 5 ++- tests/test_job.py | 33 ++++++++++---------- tests/test_queue.py | 9 ++---- tests/test_scheduler.py | 67 ++++++++++++++++++---------------------- tests/test_worker.py | 24 +++++++++------ 12 files changed, 122 insertions(+), 168 deletions(-) diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index 7ede815..3bc1ab0 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -122,6 +122,7 @@ Some interesting job attributes include: * `job.ended_at` * `job.exc_info` stores exception information if job doesn't finish successfully. * `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active. +* `job.worker_name` returns the worker name currently executing this job. If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`. diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 0871dec..fee9c3f 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -45,62 +45,22 @@ except ImportError: PY2 = sys.version_info[0] == 2 -if not PY2: - # Python 3.x and up - text_type = str - string_types = (str,) - def as_text(v): - if v is None: - return None - elif isinstance(v, bytes): - return v.decode('utf-8') - elif isinstance(v, str): - return v - else: - raise ValueError('Unknown type %r' % type(v)) +# Python 3.x and up +text_type = str +string_types = (str,) - def decode_redis_hash(h): - return dict((as_text(k), h[k]) for k in h) -else: - # Python 2.x - def text_type(v): - try: - return unicode(v) # noqa - except Exception: - return unicode(v, "utf-8", errors="ignore") # noqa - string_types = (str, unicode) # noqa +def as_text(v): + if v is None: + return None + elif isinstance(v, bytes): + return v.decode('utf-8') + elif isinstance(v, str): + return v + else: + raise ValueError('Unknown type %r' % type(v)) - def as_text(v): - if v is None: - return None - elif isinstance(v, str): - return v.decode('utf-8') - elif isinstance(v, unicode): # noqa - return v - else: - raise Exception("Input cannot be decoded into literal thing.") - def decode_redis_hash(h): - return h - - -try: - from datetime import timezone - utc = timezone.utc -except ImportError: - # Python 2.x workaround - from datetime import timedelta, tzinfo - - class UTC(tzinfo): - def utcoffset(self, dt): - return timedelta(0) - - def tzname(self, dt): - return "UTC" - - def dst(self, dt): - return timedelta(0) - - utc = UTC() +def decode_redis_hash(h): + return dict((as_text(k), h[k]) for k in h) diff --git a/rq/job.py b/rq/job.py index 868a44b..0f16d3f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -343,6 +343,7 @@ class Job(object): self.result_ttl = None self.failure_ttl = None self.ttl = None + self.worker_name = None self._status = None self._dependency_ids = [] self.meta = {} @@ -479,6 +480,7 @@ class Job(object): self.created_at = str_to_date(obj.get('created_at')) self.origin = as_text(obj.get('origin')) + self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None self.description = as_text(obj.get('description')) self.enqueued_at = str_to_date(obj.get('enqueued_at')) self.started_at = str_to_date(obj.get('started_at')) @@ -500,7 +502,7 @@ class Job(object): self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} - + self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None if obj.get('retry_intervals'): self.retry_intervals = json.loads(obj.get('retry_intervals').decode()) @@ -538,8 +540,9 @@ class Job(object): 'started_at': utcformat(self.started_at) if self.started_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', + 'worker_name': self.worker_name or '' } - + if self.retries_left is not None: obj['retries_left'] = self.retries_left if self.retry_intervals is not None: @@ -554,7 +557,7 @@ class Job(object): if self._result is not None: try: obj['result'] = self.serializer.dumps(self._result) - except Exception as e: + except: # noqa obj['result'] = "Unserializable return value" if self.exc_info is not None: obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8')) @@ -694,6 +697,23 @@ class Job(object): assert self is _job_stack.pop() return self._result + def prepare_for_execution(self, worker_name, pipeline): + """Set job metadata before execution begins""" + self.worker_name = worker_name + self.last_heartbeat = utcnow() + self.started_at = self.last_heartbeat + self._status = JobStatus.STARTED + mapping = { + 'last_heartbeat': utcformat(self.last_heartbeat), + 'status': self._status, + 'started_at': utcformat(self.started_at), + 'worker_name': worker_name + } + if self.get_redis_server_version() >= StrictVersion("4.0.0"): + pipeline.hset(self.key, mapping=mapping) + else: + pipeline.hmset(self.key, mapping) + def _execute(self): return self.func(*self.args, **self.kwargs) diff --git a/rq/queue.py b/rq/queue.py index e470dfe..5e98bcd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings -from datetime import datetime + +from datetime import datetime, timezone from distutils.version import StrictVersion from redis import WatchError -from .compat import as_text, string_types, total_ordering, utc +from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError @@ -451,7 +452,7 @@ nd def enqueue_in(self, time_delta, func, *args, **kwargs): """Schedules a job to be executed in a given `timedelta` object""" - return self.enqueue_at(datetime.now(utc) + time_delta, + return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job, pipeline=None, at_front=False): diff --git a/rq/registry.py b/rq/registry.py index 5a5c078..76cf96e 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,8 +1,8 @@ import calendar import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone -from .compat import as_text, utc +from .compat import as_text from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL from .exceptions import InvalidJobOperation, NoSuchJobError @@ -303,7 +303,7 @@ class ScheduledJobRegistry(BaseRegistry): if not score: raise NoSuchJobError - return datetime.fromtimestamp(score, tz=utc) + return datetime.fromtimestamp(score, tz=timezone.utc) def clean_registries(queue): diff --git a/rq/worker.py b/rq/worker.py index 3b4aade..cbe7073 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -26,7 +26,7 @@ from redis import WatchError from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE -from .compat import PY2, as_text, string_types, text_type +from .compat import as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import (DEFAULT_RESULT_TTL, @@ -859,9 +859,7 @@ class Worker(object): registry = StartedJobRegistry(job.origin, self.connection, job_class=self.job_class) registry.add(job, timeout, pipeline=pipeline) - job.set_status(JobStatus.STARTED, pipeline=pipeline) - job.heartbeat(utcnow(), pipeline=pipeline) - pipeline.hset(job.key, 'started_at', utcformat(utcnow())) + job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() msg = 'Processing {0} from {1} since {2}' @@ -883,7 +881,7 @@ class Worker(object): self.connection, job_class=self.job_class ) - + job.worker_name = None # Requeue/reschedule if retry is configured if job.retries_left and job.retries_left > 0: retry = True @@ -943,6 +941,7 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.worker_name = None # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) @@ -985,9 +984,7 @@ class Worker(object): except: # NOQA job.ended_at = utcnow() exc_info = sys.exc_info() - exc_string = self._get_safe_exception_string( - traceback.format_exception(*exc_info) - ) + exc_string = ''.join(traceback.format_exception(*exc_info)) self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry) self.handle_exception(job, *exc_info) @@ -1014,9 +1011,7 @@ class Worker(object): def handle_exception(self, job, *exc_info): """Walks the exception handler stack to delegate exception handling.""" - exc_string = Worker._get_safe_exception_string( - traceback.format_exception(*exc_info) - ) + exc_string = ''.join(traceback.format_exception(*exc_info)) self.log.error(exc_string, exc_info=True, extra={ 'func': job.func_name, 'arguments': job.args, @@ -1037,16 +1032,6 @@ class Worker(object): if not fallthrough: break - @staticmethod - def _get_safe_exception_string(exc_strings): - """Ensure list of exception strings is decoded on Python 2 and joined as one string safely.""" - if sys.version_info[0] < 3: - try: - exc_strings = [exc.decode("utf-8") for exc in exc_strings] - except ValueError: - exc_strings = [exc.decode("latin-1") for exc in exc_strings] - return ''.join(exc_strings) - def push_exc_handler(self, handler_func): """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func) @@ -1101,16 +1086,14 @@ class Worker(object): class SimpleWorker(Worker): - def main_work_horse(self, *args, **kwargs): - raise NotImplementedError("Test worker does not implement this method") def execute_job(self, job, queue): """Execute job in same thread/process, do not fork()""" # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL. if job.timeout == -1: - timeout = DEFAULT_WORKER_TTL + timeout = DEFAULT_WORKER_TTL else: - timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60 + timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60 return self.perform_job(job, queue, heartbeat_ttl=timeout) @@ -1124,10 +1107,6 @@ class HerokuWorker(Worker): imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] - if PY2: - frame_properties.extend( - ['f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_restricted'] - ) def setup_work_horse_signals(self): """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN""" diff --git a/tests/fixtures.py b/tests/fixtures.py index 5d8e8bc..e50ecb0 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -14,7 +14,7 @@ import subprocess from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job -from rq.compat import PY2, text_type +from rq.compat import text_type from rq.worker import HerokuWorker @@ -120,10 +120,7 @@ class CallableObject(object): class UnicodeStringObject(object): def __repr__(self): - if PY2: - return u'é'.encode('utf-8') - else: - return u'é' + return u'é' with Connection(): diff --git a/tests/test_cli.py b/tests/test_cli.py index 1dea1e3..51d4e60 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,13 +2,12 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from datetime import datetime +from datetime import datetime, timezone from click.testing import CliRunner from redis import Redis from rq import Queue -from rq.compat import utc from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig from rq.job import Job @@ -250,7 +249,7 @@ class TestRQCli(RQTestCase): def test_worker_with_scheduler(self): """rq worker -u --with-scheduler""" queue = Queue(connection=self.connection) - queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) registry = ScheduledJobRegistry(queue=queue) runner = CliRunner() diff --git a/tests/test_job.py b/tests/test_job.py index 2987c5b..f990afd 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -10,7 +10,7 @@ from datetime import datetime, timedelta from redis import WatchError -from rq.compat import PY2, as_text +from rq.compat import as_text from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry from rq.queue import Queue @@ -32,18 +32,9 @@ class TestJob(RQTestCase): args=[12, "☃"], kwargs=dict(snowman="☃", null=None), ) - - if not PY2: - # Python 3 - expected_string = "myfunc(12, '☃', null=None, snowman='☃')" - else: - # Python 2 - expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode( - 'utf-8') - self.assertEqual( job.description, - expected_string, + "myfunc(12, '☃', null=None, snowman='☃')", ) def test_create_empty_job(self): @@ -222,7 +213,7 @@ class TestJob(RQTestCase): # ... and no other keys are stored self.assertEqual( sorted(self.testconn.hkeys(job.key)), - [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at']) + [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', b'worker_name']) self.assertEqual(job.last_heartbeat, None) self.assertEqual(job.last_heartbeat, None) @@ -439,10 +430,20 @@ class TestJob(RQTestCase): job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.save() Job.fetch(job.id, connection=self.testconn) - if PY2: - self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')") - else: - self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + + def test_prepare_for_execution(self): + """job.prepare_for_execution works properly""" + job = Job.create(func=fixtures.say_hello) + job.save() + with self.testconn.pipeline() as pipeline: + job.prepare_for_execution("worker_name", pipeline) + pipeline.execute() + job.refresh() + self.assertEqual(job.worker_name, "worker_name") + self.assertEqual(job.get_status(), JobStatus.STARTED) + self.assertIsNotNone(job.last_heartbeat) + self.assertIsNotNone(job.started_at) def test_job_access_outside_job_fails(self): """The current job is accessible only within a job context.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 6425c87..9b61b5a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -3,13 +3,10 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import json -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from mock.mock import patch from rq import Retry, Queue -from rq.compat import utc -from rq.exceptions import NoSuchJobError - from rq.job import Job, JobStatus from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, @@ -364,7 +361,7 @@ class TestQueue(RQTestCase): ) # Explicit args and kwargs should also work with enqueue_at - time = datetime.now(utc) + timedelta(seconds=10) + time = datetime.now(timezone.utc) + timedelta(seconds=10) job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs) self.assertEqual(job.timeout, 2) self.assertEqual(job.result_ttl, 2) @@ -671,7 +668,7 @@ class TestJobScheduling(RQTestCase): def test_enqueue_at(self): """enqueue_at() creates a job in ScheduledJobRegistry""" queue = Queue(connection=self.testconn) - scheduled_time = datetime.now(utc) + timedelta(seconds=10) + scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10) job = queue.enqueue_at(scheduled_time, say_hello) registry = ScheduledJobRegistry(queue=queue) self.assertIn(job, registry) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index b1de0e7..348236c 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,11 +1,11 @@ import os import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from multiprocessing import Process from rq import Queue -from rq.compat import utc, PY2 +from rq.compat import PY2 from rq.exceptions import NoSuchJobError from rq.job import Job, Retry from rq.registry import FinishedJobRegistry, ScheduledJobRegistry @@ -57,8 +57,8 @@ class TestScheduledJobRegistry(RQTestCase): job = Job.create('myfunc', connection=self.testconn) job.save() - dt = datetime(2019, 1, 1, tzinfo=utc) - registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + dt = datetime(2019, 1, 1, tzinfo=timezone.utc) + registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc)) self.assertEqual(registry.get_scheduled_time(job), dt) # get_scheduled_time() should also work with job ID self.assertEqual(registry.get_scheduled_time(job.id), dt) @@ -74,35 +74,28 @@ class TestScheduledJobRegistry(RQTestCase): job.save() registry = ScheduledJobRegistry(queue=queue) - if PY2: - # On Python 2, datetime needs to have timezone - self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1)) - registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + from datetime import timezone + # If we pass in a datetime with no timezone, `schedule()` + # assumes local timezone so depending on your local timezone, + # the timestamp maybe different + + # we need to account for the difference between a timezone + # with DST active and without DST active. The time.timezone + # property isn't accurate when time.daylight is non-zero, + # we'll test both. + + # first, time.daylight == 0 (not in DST). + # mock the sitatuoin for American/New_York not in DST (UTC - 5) + # time.timezone = 18000 + # time.daylight = 0 + # time.altzone = 14400 + mock_day = mock.patch('time.daylight', 0) + mock_tz = mock.patch('time.timezone', 18000) + mock_atz = mock.patch('time.altzone', 14400) + with mock_tz, mock_day, mock_atz: + registry.schedule(job, datetime(2019, 1, 1)) self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800) # 2019-01-01 UTC in Unix timestamp - else: - from datetime import timezone - # If we pass in a datetime with no timezone, `schedule()` - # assumes local timezone so depending on your local timezone, - # the timestamp maybe different - - # we need to account for the difference between a timezone - # with DST active and without DST active. The time.timezone - # property isn't accurate when time.daylight is non-zero, - # we'll test both. - - # first, time.daylight == 0 (not in DST). - # mock the sitatuoin for American/New_York not in DST (UTC - 5) - # time.timezone = 18000 - # time.daylight = 0 - # time.altzone = 14400 - mock_day = mock.patch('time.daylight', 0) - mock_tz = mock.patch('time.timezone', 18000) - mock_atz = mock.patch('time.altzone', 14400) - with mock_tz, mock_day, mock_atz: - registry.schedule(job, datetime(2019, 1, 1)) - self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp + 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp # second, time.daylight != 0 (in DST) # mock the sitatuoin for American/New_York not in DST (UTC - 4) @@ -227,7 +220,7 @@ class TestScheduler(RQTestCase): registry = ScheduledJobRegistry(queue=queue) job = Job.create('myfunc', connection=self.testconn) job.save() - registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc)) scheduler = RQScheduler([queue], connection=self.testconn) scheduler.acquire_locks() scheduler.enqueue_scheduled_jobs() @@ -237,7 +230,7 @@ class TestScheduler(RQTestCase): self.assertEqual(len(registry), 0) # Jobs scheduled in the far future should not be affected - registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc)) + registry.schedule(job, datetime(2100, 1, 1, tzinfo=timezone.utc)) scheduler.enqueue_scheduled_jobs() self.assertEqual(len(queue), 1) @@ -294,7 +287,7 @@ class TestWorker(RQTestCase): p = Process(target=kill_worker, args=(os.getpid(), False, 5)) p.start() - queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) worker.work(burst=False, with_scheduler=True) p.join(1) self.assertIsNotNone(worker.scheduler) @@ -311,7 +304,7 @@ class TestQueue(RQTestCase): scheduler = RQScheduler([queue], connection=self.testconn) scheduler.acquire_locks() # Jobs created using enqueue_at is put in the ScheduledJobRegistry - job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello) self.assertEqual(len(queue), 0) self.assertEqual(len(registry), 1) @@ -330,7 +323,7 @@ class TestQueue(RQTestCase): registry = ScheduledJobRegistry(queue=queue) job = queue.enqueue_in(timedelta(seconds=30), say_hello) - now = datetime.now(utc) + now = datetime.now(timezone.utc) scheduled_time = registry.get_scheduled_time(job) # Ensure that job is scheduled roughly 30 seconds from now self.assertTrue( diff --git a/tests/test_worker.py b/tests/test_worker.py index 1ab5865..bf11b92 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -181,6 +181,7 @@ class TestWorker(RQTestCase): 'Expected at least some work done.' ) self.assertEqual(job.result, 'Hi there, Frank!') + self.assertIsNone(job.worker_name) def test_job_times(self): """job times are set correctly.""" @@ -296,7 +297,7 @@ class TestWorker(RQTestCase): enqueued_at_date = str(job.enqueued_at) w = Worker([q]) - w.work(burst=True) # should silently pass + w.work(burst=True) # Postconditions self.assertEqual(q.count, 0) @@ -307,6 +308,7 @@ class TestWorker(RQTestCase): # Check the job job = Job.fetch(job.id) self.assertEqual(job.origin, q.name) + self.assertIsNone(job.worker_name) # Worker name is cleared after failures # Should be the original enqueued_at date, not the date of enqueueing # to the failed queue @@ -373,7 +375,7 @@ class TestWorker(RQTestCase): self.assertEqual(worker.failed_job_count, 2) self.assertEqual(worker.successful_job_count, 2) self.assertEqual(worker.total_working_time, 3.0) - + def test_handle_retry(self): """handle_job_failure() handles retry properly""" connection = self.testconn @@ -409,7 +411,7 @@ class TestWorker(RQTestCase): self.assertEqual([], queue.job_ids) # If a job is no longer retries, it's put in FailedJobRegistry self.assertTrue(job in registry) - + def test_retry_interval(self): """Retries with intervals are scheduled""" connection = self.testconn @@ -586,26 +588,26 @@ class TestWorker(RQTestCase): q = Queue() job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) - self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertNotEqual(self.testconn.ttl(job.key), 0) - self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) - self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertEqual(self.testconn.ttl(job.key), -1) - self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) w = Worker([q]) - self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) self.assertEqual(self.testconn.get(job.key), None) - self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) + self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1)) def test_worker_sets_job_status(self): """Ensure that worker correctly sets job status.""" @@ -736,6 +738,10 @@ class TestWorker(RQTestCase): self.assertEqual(worker.get_state(), 'busy') self.assertEqual(worker.get_current_job_id(), job.id) + # job status is also updated + self.assertEqual(job._status, JobStatus.STARTED) + self.assertEqual(job.worker_name, worker.name) + def test_prepare_job_execution_inf_timeout(self): """Prepare job execution handles infinite job timeout""" queue = Queue(connection=self.testconn)