diff --git a/rq/job.py b/rq/job.py index 61b359f..9bc08f1 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1224,7 +1224,7 @@ class Job: """ return default_ttl if self.ttl is None else self.ttl - def get_result_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]: + def get_result_ttl(self, default_ttl: int) -> int: """Returns ttl for a job that determines how long a jobs result will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. @@ -1287,6 +1287,52 @@ class Job: self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer ) + @property + def finished_job_registry(self): + from .registry import FinishedJobRegistry + + return FinishedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) + + def _handle_success(self, result_ttl: int, pipeline: 'Pipeline'): + """Saves and cleanup job after successful execution""" + # self.log.debug('Setting job %s status to finished', job.id) + self.set_status(JobStatus.FINISHED, pipeline=pipeline) + # Result should be saved in job hash only if server + # doesn't support Redis streams + include_result = not self.supports_redis_streams + # Don't clobber user's meta dictionary! + self.save(pipeline=pipeline, include_meta=False, include_result=include_result) + # Result creation should eventually be moved to job.save() after support + # for Redis < 5.0 is dropped. job.save(include_result=...) is used to test + # for backward compatibility + if self.supports_redis_streams: + from .results import Result + Result.create( + self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline + ) + + if result_ttl != 0: + finished_job_registry = self.finished_job_registry + finished_job_registry.add(self, result_ttl, pipeline) + + def _handle_failure(self, exc_string: str, pipeline: 'Pipeline'): + failed_job_registry = self.failed_job_registry + # Exception should be saved in job hash if server + # doesn't support Redis streams + _save_exc_to_job = not self.supports_redis_streams + failed_job_registry.add( + self, + ttl=self.failure_ttl, + exc_string=exc_string, + pipeline=pipeline, + _save_exc_to_job=_save_exc_to_job, + ) + if self.supports_redis_streams: + from .results import Result + Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) + def get_retry_interval(self) -> int: """Returns the desired retry interval. If number of retries is bigger than length of intervals, the first diff --git a/rq/queue.py b/rq/queue.py index 1d98492..3b3e7b0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,9 @@ -import uuid +import logging import sys +import traceback +import uuid import warnings -import logging + from collections import namedtuple from datetime import datetime, timezone, timedelta from functools import total_ordering @@ -769,9 +771,11 @@ class Queue: Job: _description_ """ job.perform() - job.set_status(JobStatus.FINISHED) - job.save(include_meta=False) - job.cleanup(job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL)) + result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL) + with self.connection.pipeline() as pipeline: + job._handle_success(result_ttl=result_ttl, pipeline=pipeline) + job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() return job @classmethod @@ -1024,7 +1028,12 @@ class Queue: try: job = self.run_job(job) except: # noqa - job.set_status(JobStatus.FAILED) + with self.connection.pipeline() as pipeline: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + exc_string = ''.join(traceback.format_exception(*sys.exc_info())) + job._handle_failure(exc_string, pipeline) + pipeline.execute() + if job.failure_callback: job.failure_callback(job, self.connection, *sys.exc_info()) else: diff --git a/rq/worker.py b/rq/worker.py index 87f3e26..6d8cc5e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -9,18 +9,15 @@ import time import traceback import warnings - -from typing import TYPE_CHECKING, Type, List, Dict, Any - -if TYPE_CHECKING: - from redis import Redis - from redis.client import Pipeline - from datetime import timedelta from enum import Enum from uuid import uuid4 from random import shuffle -from typing import Callable, List, Optional +from typing import Callable, List, Optional, TYPE_CHECKING, Type + +if TYPE_CHECKING: + from redis import Redis + from redis.client import Pipeline try: from signal import SIGKILL @@ -47,8 +44,7 @@ from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentEx from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue -from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries -from .results import Result +from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty @@ -139,7 +135,7 @@ class Worker: worker_keys = get_keys(queue=queue, connection=connection) workers = [ cls.find_by_key( - as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer + key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer ) for key in worker_keys ] @@ -1081,21 +1077,7 @@ class Worker: started_job_registry.remove(job, pipeline=pipeline) if not self.disable_default_exception_handler and not retry: - failed_job_registry = FailedJobRegistry( - job.origin, job.connection, job_class=self.job_class, serializer=job.serializer - ) - # Exception should be saved in job hash if server - # doesn't support Redis streams - _save_exc_to_job = not self.supports_redis_streams - failed_job_registry.add( - job, - ttl=job.failure_ttl, - exc_string=exc_string, - pipeline=pipeline, - _save_exc_to_job=_save_exc_to_job, - ) - if self.supports_redis_streams: - Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) + job._handle_failure(exc_string, pipeline=pipeline) with suppress(redis.exceptions.ConnectionError): pipeline.execute() @@ -1142,19 +1124,8 @@ class Worker: result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - self.log.debug('Setting job %s status to finished', job.id) - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - # Result should be saved in job hash only if server - # doesn't support Redis streams - include_result = not self.supports_redis_streams - # Don't clobber user's meta dictionary! - job.save(pipeline=pipeline, include_meta=False, include_result=include_result) - if self.supports_redis_streams: - Result.create( - job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline - ) - finished_job_registry = queue.finished_job_registry - finished_job_registry.add(job, result_ttl, pipeline) + self.log.debug(f"Saving job {job.id}'s successful execution result") + job._handle_success(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) self.log.debug('Removing job %s from StartedJobRegistry', job.id) diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 94c41a1..0f31f1d 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -51,7 +51,7 @@ def unregister(worker: 'Worker', pipeline: Optional['Pipeline'] = None): connection.execute() -def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[Any]: +def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[str]: """Returns a list of worker keys for a given queue. Args: diff --git a/tests/test_job.py b/tests/test_job.py index 46fbe73..9d1ceae 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -529,11 +529,9 @@ class TestJob(RQTestCase): job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl) job.save() self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) - self.assertEqual(job.get_result_ttl(), job_result_ttl) job = Job.create(func=fixtures.say_hello) job.save() self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) - self.assertEqual(job.get_result_ttl(), None) def test_get_job_ttl(self): """Getting job TTL.""" diff --git a/tests/test_results.py b/tests/test_results.py index 526fd66..4f705f5 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -14,7 +14,7 @@ from rq.results import Result, get_key from rq.utils import get_version, utcnow from rq.worker import Worker -from .fixtures import say_hello +from .fixtures import say_hello, div_by_zero @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') @@ -123,23 +123,25 @@ class TestScheduledJobRegistry(RQTestCase): self.assertEqual(job.result, 'Success') with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock: - mock.return_value = False - worker = Worker([queue]) - worker.register_birth() - job = queue.enqueue(say_hello) - job._result = 'Success' - job.started_at = utcnow() - job.ended_at = job.started_at + timedelta(seconds=0.75) - - # If `save_result_to_job` = True, result will be saved to job - # hash, simulating older versions of RQ - - worker.handle_job_success(job, queue, registry) - payload = self.connection.hgetall(job.key) - self.assertTrue(b'result' in payload.keys()) - # Delete all new result objects so we only have result stored in job hash, - # this should simulate a job that was executed in an earlier RQ version - self.assertEqual(job.result, 'Success') + with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock: + job_mock.return_value = False + mock.return_value = False + worker = Worker([queue]) + worker.register_birth() + job = queue.enqueue(say_hello) + job._result = 'Success' + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + + # If `save_result_to_job` = True, result will be saved to job + # hash, simulating older versions of RQ + + worker.handle_job_success(job, queue, registry) + payload = self.connection.hgetall(job.key) + self.assertTrue(b'result' in payload.keys()) + # Delete all new result objects so we only have result stored in job hash, + # this should simulate a job that was executed in an earlier RQ version + self.assertEqual(job.result, 'Success') def test_job_failed_result_fallback(self): """Changes to job.result failure handling should be backwards compatible.""" @@ -164,26 +166,28 @@ class TestScheduledJobRegistry(RQTestCase): self.assertEqual(job.exc_info, 'Error') with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock: - mock.return_value = False - worker = Worker([queue]) - worker.register_birth() - - job = queue.enqueue(say_hello) - job.started_at = utcnow() - job.ended_at = job.started_at + timedelta(seconds=0.75) - - # If `save_result_to_job` = True, result will be saved to job - # hash, simulating older versions of RQ - - worker.handle_job_failure(job, exc_string='Error', queue=queue, - started_job_registry=registry) - payload = self.connection.hgetall(job.key) - self.assertTrue(b'exc_info' in payload.keys()) - # Delete all new result objects so we only have result stored in job hash, - # this should simulate a job that was executed in an earlier RQ version - Result.delete_all(job) - job = Job.fetch(job.id, connection=self.connection) - self.assertEqual(job.exc_info, 'Error') + with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock: + job_mock.return_value = False + mock.return_value = False + worker = Worker([queue]) + worker.register_birth() + + job = queue.enqueue(say_hello) + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + + # If `save_result_to_job` = True, result will be saved to job + # hash, simulating older versions of RQ + + worker.handle_job_failure(job, exc_string='Error', queue=queue, + started_job_registry=registry) + payload = self.connection.hgetall(job.key) + self.assertTrue(b'exc_info' in payload.keys()) + # Delete all new result objects so we only have result stored in job hash, + # this should simulate a job that was executed in an earlier RQ version + Result.delete_all(job) + job = Job.fetch(job.id, connection=self.connection) + self.assertEqual(job.exc_info, 'Error') def test_job_return_value(self): """Test job.return_value""" @@ -199,3 +203,14 @@ class TestScheduledJobRegistry(RQTestCase): # Returns None if latest result is a failure Result.create_failure(job, ttl=10, exc_string='exception') self.assertIsNone(job.return_value(refresh=True)) + + def test_job_return_value_sync(self): + """Test job.return_value when queue.is_async=False""" + queue = Queue(connection=self.connection, is_async=False) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNotNone(job.return_value()) + + job = queue.enqueue(div_by_zero) + self.assertEqual(job.latest_result().type, Result.Type.FAILED)