Sync jobs should save job exceptions and results (#1799)

* Sync jobs should save job exceptions and results

* Make job.handle_success() and job.handle_failure() private methods
main
Selwin Ong 2 years ago committed by GitHub
parent 27cbf48ad4
commit 83fa0adf15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1224,7 +1224,7 @@ class Job:
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl: Optional[int] = None) -> Optional[int]:
def get_result_ttl(self, default_ttl: int) -> int:
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
@ -1287,6 +1287,52 @@ class Job:
self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
@property
def finished_job_registry(self):
from .registry import FinishedJobRegistry
return FinishedJobRegistry(
self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
def _handle_success(self, result_ttl: int, pipeline: 'Pipeline'):
"""Saves and cleanup job after successful execution"""
# self.log.debug('Setting job %s status to finished', job.id)
self.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Result should be saved in job hash only if server
# doesn't support Redis streams
include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary!
self.save(pipeline=pipeline, include_meta=False, include_result=include_result)
# Result creation should eventually be moved to job.save() after support
# for Redis < 5.0 is dropped. job.save(include_result=...) is used to test
# for backward compatibility
if self.supports_redis_streams:
from .results import Result
Result.create(
self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline
)
if result_ttl != 0:
finished_job_registry = self.finished_job_registry
finished_job_registry.add(self, result_ttl, pipeline)
def _handle_failure(self, exc_string: str, pipeline: 'Pipeline'):
failed_job_registry = self.failed_job_registry
# Exception should be saved in job hash if server
# doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams
failed_job_registry.add(
self,
ttl=self.failure_ttl,
exc_string=exc_string,
pipeline=pipeline,
_save_exc_to_job=_save_exc_to_job,
)
if self.supports_redis_streams:
from .results import Result
Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline)
def get_retry_interval(self) -> int:
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first

@ -1,7 +1,9 @@
import uuid
import logging
import sys
import traceback
import uuid
import warnings
import logging
from collections import namedtuple
from datetime import datetime, timezone, timedelta
from functools import total_ordering
@ -769,9 +771,11 @@ class Queue:
Job: _description_
"""
job.perform()
job.set_status(JobStatus.FINISHED)
job.save(include_meta=False)
job.cleanup(job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL))
result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL)
with self.connection.pipeline() as pipeline:
job._handle_success(result_ttl=result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
return job
@classmethod
@ -1024,7 +1028,12 @@ class Queue:
try:
job = self.run_job(job)
except: # noqa
job.set_status(JobStatus.FAILED)
with self.connection.pipeline() as pipeline:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
exc_string = ''.join(traceback.format_exception(*sys.exc_info()))
job._handle_failure(exc_string, pipeline)
pipeline.execute()
if job.failure_callback:
job.failure_callback(job, self.connection, *sys.exc_info())
else:

@ -9,18 +9,15 @@ import time
import traceback
import warnings
from typing import TYPE_CHECKING, Type, List, Dict, Any
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from datetime import timedelta
from enum import Enum
from uuid import uuid4
from random import shuffle
from typing import Callable, List, Optional
from typing import Callable, List, Optional, TYPE_CHECKING, Type
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
try:
from signal import SIGKILL
@ -47,8 +44,7 @@ from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentEx
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
from .results import Result
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
@ -139,7 +135,7 @@ class Worker:
worker_keys = get_keys(queue=queue, connection=connection)
workers = [
cls.find_by_key(
as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
)
for key in worker_keys
]
@ -1081,21 +1077,7 @@ class Worker:
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
failed_job_registry = FailedJobRegistry(
job.origin, job.connection, job_class=self.job_class, serializer=job.serializer
)
# Exception should be saved in job hash if server
# doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams
failed_job_registry.add(
job,
ttl=job.failure_ttl,
exc_string=exc_string,
pipeline=pipeline,
_save_exc_to_job=_save_exc_to_job,
)
if self.supports_redis_streams:
Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline)
job._handle_failure(exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
pipeline.execute()
@ -1142,19 +1124,8 @@ class Worker:
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
self.log.debug('Setting job %s status to finished', job.id)
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Result should be saved in job hash only if server
# doesn't support Redis streams
include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False, include_result=include_result)
if self.supports_redis_streams:
Result.create(
job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline
)
finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)
self.log.debug(f"Saving job {job.id}'s successful execution result")
job._handle_success(result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
self.log.debug('Removing job %s from StartedJobRegistry', job.id)

@ -51,7 +51,7 @@ def unregister(worker: 'Worker', pipeline: Optional['Pipeline'] = None):
connection.execute()
def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[Any]:
def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[str]:
"""Returns a list of worker keys for a given queue.
Args:

@ -529,11 +529,9 @@ class TestJob(RQTestCase):
job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""

@ -14,7 +14,7 @@ from rq.results import Result, get_key
from rq.utils import get_version, utcnow
from rq.worker import Worker
from .fixtures import say_hello
from .fixtures import say_hello, div_by_zero
@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
@ -123,23 +123,25 @@ class TestScheduledJobRegistry(RQTestCase):
self.assertEqual(job.result, 'Success')
with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job._result = 'Success'
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_success(job, queue, registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'result' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
self.assertEqual(job.result, 'Success')
with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
job_mock.return_value = False
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job._result = 'Success'
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_success(job, queue, registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'result' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
self.assertEqual(job.result, 'Success')
def test_job_failed_result_fallback(self):
"""Changes to job.result failure handling should be backwards compatible."""
@ -164,26 +166,28 @@ class TestScheduledJobRegistry(RQTestCase):
self.assertEqual(job.exc_info, 'Error')
with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_failure(job, exc_string='Error', queue=queue,
started_job_registry=registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'exc_info' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
Result.delete_all(job)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.exc_info, 'Error')
with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
job_mock.return_value = False
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_failure(job, exc_string='Error', queue=queue,
started_job_registry=registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'exc_info' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
Result.delete_all(job)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.exc_info, 'Error')
def test_job_return_value(self):
"""Test job.return_value"""
@ -199,3 +203,14 @@ class TestScheduledJobRegistry(RQTestCase):
# Returns None if latest result is a failure
Result.create_failure(job, ttl=10, exc_string='exception')
self.assertIsNone(job.return_value(refresh=True))
def test_job_return_value_sync(self):
"""Test job.return_value when queue.is_async=False"""
queue = Queue(connection=self.connection, is_async=False)
job = queue.enqueue(say_hello)
# Returns None when there's no result
self.assertIsNotNone(job.return_value())
job = queue.enqueue(div_by_zero)
self.assertEqual(job.latest_result().type, Result.Type.FAILED)

Loading…
Cancel
Save