Multiple results using Redis Streams (#1725)

* WIP job results

* Result can now be saved

* Successfully saved and restored result

* result.save() should accept pipeline

* Successful results are saved

* Failures are now saved properly too.

* Added test for Result.get_latest()

* Checkpoint

* Got Result.all() to work

* Added Result.count(), Result.delete()

* Backward compatibility for job.result and job.exc_info

* Added some typing

* More typing stuff

* Fixed typing in job.py

* More typing updates

* Only keep the last 10 results

* Documented job.results()

* Got results test to pass

* Don't run test_results.py on Redis server < 5.0

* Fixed mock import on some Python versions

* Remove Redis 3 from test matrix

* Jobs should never use the new Result implementation if server is < 5.0

* Results should only be created is Redis stream is supported.

* Added back Redis 3 to test matrix

* Fixed job.supports_redis_streams

* Fixed worker test

* Updated docs.
main
Selwin Ong 2 years ago committed by GitHub
parent 09856f9924
commit 0691b4d46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -39,11 +39,11 @@ q = Queue(connection=redis_conn) # no args implies the default queue
# Delay execution of count_words_at_url('http://nvie.com')
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print(job.result) # => None
print(job.result) # => None # Changed to job.return_value() in RQ >= 1.12.0
# Now, wait a while, until the worker is finished
time.sleep(2)
print(job.result) # => 889
print(job.result) # => 889 # Changed to job.return_value() in RQ >= 1.12.0
```
If you want to put the work on a specific queue, simply specify its name:
@ -377,7 +377,7 @@ def add(x, y):
job = add.delay(3, 4)
time.sleep(1)
print(job.result)
print(job.result) # Changed to job.return_value() in RQ >= 1.12.0
```

@ -11,19 +11,19 @@ solving a problem, but are getting back a few in return.
Python functions may have return values, so jobs can have them, too. If a job
returns a non-`None` return value, the worker will write that return value back
to the job's Redis hash under the `result` key. The job's Redis hash itself
to the job's Redis hash under the `result` key. The job's Redis hash itself
will expire after 500 seconds by default after the job is finished.
The party that enqueued the job gets back a `Job` instance as a result of the
enqueueing itself. Such a `Job` object is a proxy object that is tied to the
enqueueing itself. Such a `Job` object is a proxy object that is tied to the
job's ID, to be able to poll for results.
**On the return value's TTL**
### Return Value TTL
Return values are written back to Redis with a limited lifetime (via a Redis
expiring key), which is merely to avoid ever-growing Redis databases.
From RQ >= 0.3.1, The TTL value of the job result can be specified using the
The TTL value of the job result can be specified using the
`result_ttl` keyword argument to `enqueue()` and `enqueue_call()` calls. It
can also be used to disable the expiry altogether. You then are responsible
for cleaning up jobs yourself, though, so be careful to use that.
@ -113,3 +113,44 @@ low.enqueue(really_really_slow, job_timeout=3600) # 1 hr
Individual jobs can still specify an alternative timeout, as workers will
respect these.
## Job Results
_New in version 1.12.0._
If a job is executed multiple times, you can access its execution history by calling
`job.results()`. RQ will store up to 10 latest execution results.
Calling `job.latest_result()` will return the latest `Result` object, which has the
following attributes:
* `type` - an enum of `SUCCESSFUL`, `FAILED` or `STOPPED`
* `created_at` - the time at which result is created
* `return_value` - job's return value, only present if result type is `SUCCESSFUL`
* `exc_string` - the exception raised by job, only present if result type is `FAILED`
* `job_id`
```python
job = Job.fetch(id='my_id', connection=redis)
result = job.latest_result() # returns Result(id=uid, type=SUCCESSFUL)
if result == result.Type.SUCCESSFUL:
print(result.return_value)
else:
print(result.exc_string)
```
Alternatively, you can also use `job.return_value()` as a shortcut to accessing
the return value of the latest result. Note that `job.return_value` will only
return a not-`None` object if the latest result is a successful execution.
```python
job = Job.fetch(id='my_id', connection=redis)
print(job.return_value()) # Shortcut for job.latest_result().return_value
```
To access multiple results, use `job.results()`.
```python
job = Job.fetch(id='my_id', connection=redis)
for result in job.results():
print(result.created_at, result.type)
```

@ -10,11 +10,13 @@ from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from enum import Enum
from functools import partial
from uuid import uuid4
from redis import WatchError
from typing import Any, List, Optional
from uuid import uuid4
if t.TYPE_CHECKING:
from rq.queue import Queue
from .results import Result
from .queue import Queue
from redis import Redis
from redis.client import Pipeline
@ -65,14 +67,14 @@ class Dependency:
UNEVALUATED = object()
def cancel_job(job_id: str, connection: t.Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
"""Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later).
"""
Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents)
def get_current_job(connection: t.Optional['Redis'] = None, job_class: t.Optional['Job'] = None):
def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None):
"""Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context, None is returned.
"""
@ -94,8 +96,8 @@ class Job:
# Job construction
@classmethod
def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: t.Optional['Redis'] = None,
result_ttl=None, ttl=None, status=None, description=None,
def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: Optional['Redis'] = None,
result_ttl=None, ttl=None, status: JobStatus = None, description=None,
depends_on=None, timeout=None, id=None, origin=None, meta=None,
failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
@ -182,7 +184,7 @@ class Job:
return self._status
def set_status(self, status: str, pipeline: t.Optional['Pipeline'] = None):
def set_status(self, status: str, pipeline: Optional['Pipeline'] = None):
self._status = status
connection: 'Redis' = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
@ -235,7 +237,7 @@ class Job:
return self._dependency_ids[0]
@property
def dependency(self) -> t.Optional['Job']:
def dependency(self) -> Optional['Job']:
"""Returns a job's first dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency.
"""
@ -363,13 +365,13 @@ class Job:
self._data = UNEVALUATED
@classmethod
def exists(cls, job_id: str, connection: t.Optional['Redis'] = None) -> int:
def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> int:
"""Returns whether a job hash exists for the given job ID."""
conn = resolve_connection(connection)
return conn.exists(cls.key_for(job_id))
@classmethod
def fetch(cls, id: str, connection: t.Optional['Redis'] = None, serializer=None) -> 'Job':
def fetch(cls, id: str, connection: Optional['Redis'] = None, serializer=None) -> 'Job':
"""Fetches a persisted job from its corresponding Redis key and
instantiates it.
"""
@ -378,7 +380,7 @@ class Job:
return job
@classmethod
def fetch_many(cls, job_ids: t.List[str], connection: 'Redis', serializer=None):
def fetch_many(cls, job_ids: t.Iterable[str], connection: 'Redis', serializer=None):
"""
Bulk version of Job.fetch
@ -390,7 +392,7 @@ class Job:
pipeline.hgetall(cls.key_for(job_id))
results = pipeline.execute()
jobs: t.List[t.Optional['Job']] = []
jobs: t.List[Optional['Job']] = []
for i, job_id in enumerate(job_ids):
if results[i]:
job = cls(job_id, connection=connection, serializer=serializer)
@ -401,7 +403,7 @@ class Job:
return jobs
def __init__(self, id: str = None, connection: t.Optional['Redis'] = None, serializer=None):
def __init__(self, id: str = None, connection: Optional['Redis'] = None, serializer=None):
self.connection = resolve_connection(connection)
self._id = id
self.created_at = utcnow()
@ -416,26 +418,29 @@ class Job:
self._failure_callback = UNEVALUATED
self.description = None
self.origin = None
self.enqueued_at: t.Optional[datetime] = None
self.started_at: t.Optional[datetime] = None
self.ended_at: t.Optional[datetime] = None
self.enqueued_at: Optional[datetime] = None
self.started_at: Optional[datetime] = None
self.ended_at: Optional[datetime] = None
self._result = None
self.exc_info = None
self.timeout = None
self.result_ttl: t.Optional[int] = None
self.failure_ttl: t.Optional[int] = None
self.ttl: t.Optional[int] = None
self.worker_name: t.Optional[str] = None
self._exc_info = None
self.timeout: Optional[float] = None
self.result_ttl: Optional[int] = None
self.failure_ttl: Optional[int] = None
self.ttl: Optional[int] = None
self.worker_name: Optional[str] = None
self._status = None
self._dependency_ids: t.List[str] = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
self.retry_intervals: t.Optional[t.List[int]] = None
self.retry_intervals: Optional[t.List[int]] = None
self.redis_server_version = None
self.last_heartbeat: t.Optional[datetime] = None
self.allow_dependency_failures: t.Optional[bool] = None
self.enqueue_at_front: t.Optional[bool] = None
self.last_heartbeat: Optional[datetime] = None
self.allow_dependency_failures: Optional[bool] = None
self.enqueue_at_front: Optional[bool] = None
from .results import Result
self._cached_result: Optional[Result] = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -468,7 +473,7 @@ class Job:
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
def heartbeat(self, timestamp: datetime, ttl: int, pipeline: t.Optional['Pipeline'] = None, xx: bool = False):
def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False):
self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
@ -500,7 +505,7 @@ class Job:
def dependencies_key(self):
return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id)
def fetch_dependencies(self, watch: bool = False, pipeline: t.Optional['Pipeline'] = None):
def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None):
"""
Fetch all of a job's dependencies. If a pipeline is supplied, and
watch is true, then set WATCH on all the keys of all dependencies.
@ -522,7 +527,39 @@ class Job:
return jobs
@property
def result(self):
def exc_info(self) -> Optional[str]:
"""
Get the latest result and returns `exc_info` only if the latest result is a failure.
"""
warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.",
DeprecationWarning)
from .results import Result
if self.supports_redis_streams:
if not self._cached_result:
self._cached_result = self.latest_result()
if self._cached_result and self._cached_result.type == Result.Type.FAILED:
return self._cached_result.exc_string
return self._exc_info
def return_value(self, refresh=False) -> Any:
"""Returns the return value of the latest execution, if it was successful"""
from .results import Result
if refresh:
self._cached_result = None
if self.supports_redis_streams:
if not self._cached_result:
self._cached_result = self.latest_result()
if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL:
return self._cached_result.return_value
@property
def result(self) -> Any:
"""Returns the return value of the job.
Initially, right after enqueueing a job, the return value will be
@ -538,6 +575,20 @@ class Job:
written back to Redis will expire after a given amount of time (500
seconds by default).
"""
warnings.warn("job.result is deprecated, use job.return_value instead.",
DeprecationWarning)
from .results import Result
if self.supports_redis_streams:
if not self._cached_result:
self._cached_result = self.latest_result()
if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL:
return self._cached_result.return_value
# Fallback to old behavior of getting result from job hash
if self._result is None:
rv = self.connection.hget(self.key, 'result')
if rv is not None:
@ -545,8 +596,15 @@ class Job:
self._result = self.serializer.loads(rv)
return self._result
"""Backwards-compatibility accessor property `return_value`."""
return_value = result
def results(self) -> List['Result']:
"""Returns all Result objects"""
from .results import Result
return Result.all(self, serializer=self.serializer)
def latest_result(self) -> Optional['Result']:
"""Returns the latest Result object"""
from .results import Result
return Result.fetch_latest(self, serializer=self.serializer)
def restore(self, raw_data):
"""Overwrite properties with the provided values stored in Redis"""
@ -573,7 +631,7 @@ class Job:
result = obj.get('result')
if result:
try:
self._result = self.serializer.loads(obj.get('result'))
self._result = self.serializer.loads(result)
except Exception:
self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
@ -591,7 +649,8 @@ class Job:
dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [])
self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None
allow_failures = obj.get('allow_dependency_failures')
self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None
self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@ -603,10 +662,10 @@ class Job:
raw_exc_info = obj.get('exc_info')
if raw_exc_info:
try:
self.exc_info = as_text(zlib.decompress(raw_exc_info))
self._exc_info = as_text(zlib.decompress(raw_exc_info))
except zlib.error:
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)
self._exc_info = as_text(raw_exc_info)
# Persistence
def refresh(self): # noqa
@ -620,7 +679,7 @@ class Job:
raise NoSuchJobError('No such job: {0}'.format(self.key))
self.restore(data)
def to_dict(self, include_meta: bool = True) -> dict:
def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dict:
"""
Returns a serialization of the current job instance
@ -649,13 +708,13 @@ class Job:
if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at)
if self._result is not None:
if self._result is not None and include_result:
try:
obj['result'] = self.serializer.dumps(self._result)
except: # noqa
obj['result'] = "Unserializable return value"
if self.exc_info is not None:
obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
if self._exc_info is not None and include_result:
obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8'))
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
@ -681,7 +740,8 @@ class Job:
return obj
def save(self, pipeline: t.Optional['Pipeline'] = None, include_meta: bool = True):
def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True,
include_result=True):
"""
Dumps the current job instance to its corresponding Redis key.
@ -694,13 +754,18 @@ class Job:
key = self.key
connection = pipeline if pipeline is not None else self.connection
mapping = self.to_dict(include_meta=include_meta)
mapping = self.to_dict(include_meta=include_meta, include_result=include_result)
if self.get_redis_server_version() >= (4, 0, 0):
connection.hset(key, mapping=mapping)
else:
connection.hmset(key, mapping)
@property
def supports_redis_streams(self) -> bool:
"""Only supported by Redis server >= 5.0 is required."""
return self.get_redis_server_version() >= (5, 0, 0)
def get_redis_server_version(self):
"""Return Redis server version of connection"""
if not self.redis_server_version:
@ -713,7 +778,7 @@ class Job:
meta = self.serializer.dumps(self.meta)
self.connection.hset(self.key, 'meta', meta)
def cancel(self, pipeline: t.Optional['Pipeline'] = None, enqueue_dependents: bool = False):
def cancel(self, pipeline: Optional['Pipeline'] = None, enqueue_dependents: bool = False):
"""Cancels the given job, which will prevent the job from ever being
ran (or inspected).
@ -773,12 +838,13 @@ class Job:
"""Requeues job."""
return self.failed_job_registry.requeue(self, at_front=at_front)
def _remove_from_registries(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True):
def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
from .registry import BaseRegistry
if remove_from_queue:
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline)
registry: BaseRegistry
if self.is_finished:
from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin,
@ -821,7 +887,7 @@ class Job:
serializer=self.serializer)
registry.remove(self, pipeline=pipeline)
def delete(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True,
def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
@ -835,7 +901,7 @@ class Job:
connection.delete(self.key, self.dependents_key, self.dependencies_key)
def delete_dependents(self, pipeline: t.Optional['Pipeline'] = None):
def delete_dependents(self, pipeline: Optional['Pipeline'] = None):
"""Delete jobs depending on this job."""
connection = pipeline if pipeline is not None else self.connection
for dependent_id in self.dependent_ids:
@ -866,9 +932,9 @@ class Job:
self.started_at = self.last_heartbeat
self._status = JobStatus.STARTED
mapping = {
'last_heartbeat': utcformat(self.last_heartbeat),
'last_heartbeat': utcformat(self.last_heartbeat), # type: ignore
'status': self._status,
'started_at': utcformat(self.started_at),
'started_at': utcformat(self.started_at), # type: ignore
'worker_name': worker_name
}
if self.get_redis_server_version() >= (4, 0, 0):
@ -884,14 +950,14 @@ class Job:
return coro_result
return result
def get_ttl(self, default_ttl: t.Optional[int] = None):
def get_ttl(self, default_ttl: Optional[int] = None):
"""Returns ttl for a job that determines how long a job will be
persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl: t.Optional[int] = None):
def get_result_ttl(self, default_ttl: Optional[int] = None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
@ -905,7 +971,7 @@ class Job:
"""
return get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
def cleanup(self, ttl: t.Optional[int] = None, pipeline: t.Optional['Pipeline'] = None,
def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None,
remove_from_queue: bool = True):
"""Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its
@ -961,7 +1027,7 @@ class Job:
else:
queue.enqueue_job(self, pipeline=pipeline)
def register_dependency(self, pipeline: t.Optional['Pipeline'] = None):
def register_dependency(self, pipeline: Optional['Pipeline'] = None):
"""Jobs may have dependencies. Jobs are enqueued only if the jobs they
depend on are successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something
@ -993,8 +1059,8 @@ class Job:
return [Job.key_for(_id.decode())
for _id in dependencies]
def dependencies_are_met(self, parent_job: t.Optional['Job'] = None,
pipeline: t.Optional['Pipeline'] = None, exclude_job_id: str = None):
def dependencies_are_met(self, parent_job: Optional['Job'] = None,
pipeline: Optional['Pipeline'] = None, exclude_job_id: str = None):
"""Returns a boolean indicating if all of this job's dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
@ -1015,7 +1081,7 @@ class Job:
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
if parent_job.id == exclude_job_id:
if parent_job and parent_job.id == exclude_job_id:
parent_job = None
if parent_job:

@ -15,7 +15,7 @@ from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus, Dependency
from .job import Job, JobStatus
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow

@ -105,7 +105,7 @@ class BaseRegistry:
job_instance.delete()
return result
def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None):
def get_expired_job_ids(self, timestamp: t.Optional[float] = None):
"""Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
@ -164,7 +164,7 @@ class BaseRegistry:
job_class=self.job_class, serializer=serializer)
job.started_at = None
job.ended_at = None
job.exc_info = ''
job._exc_info = ''
job.save()
job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front)
pipeline.execute()
@ -182,7 +182,7 @@ class StartedJobRegistry(BaseRegistry):
"""
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp: t.Optional[datetime] = None):
def cleanup(self, timestamp: t.Optional[float] = None):
"""Remove expired jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -215,7 +215,7 @@ class StartedJobRegistry(BaseRegistry):
else:
job.set_status(JobStatus.FAILED)
job.exc_info = "Moved to FailedJobRegistry at %s" % datetime.now()
job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now()
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl)
@ -233,7 +233,7 @@ class FinishedJobRegistry(BaseRegistry):
"""
key_template = 'rq:finished:{0}'
def cleanup(self, timestamp: t.Optional[datetime] = None):
def cleanup(self, timestamp: t.Optional[float] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -250,7 +250,7 @@ class FailedJobRegistry(BaseRegistry):
"""
key_template = 'rq:failed:{0}'
def cleanup(self, timestamp: t.Optional[datetime] = None):
def cleanup(self, timestamp: t.Optional[float] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -260,7 +260,8 @@ class FailedJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None):
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None,
_save_exc_to_job: bool = False):
"""
Adds a job to a registry with expiry time of now + ttl.
`ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
@ -274,8 +275,8 @@ class FailedJobRegistry(BaseRegistry):
else:
p = self.connection.pipeline()
job.exc_info = exc_string
job.save(pipeline=p, include_meta=False)
job._exc_info = exc_string
job.save(pipeline=p, include_meta=False, include_result=_save_exc_to_job)
job.cleanup(ttl=ttl, pipeline=p)
p.zadd(self.key, {job.id: score})
@ -315,13 +316,7 @@ class ScheduledJobRegistry(BaseRegistry):
If datetime has no tzinfo, it will assume localtimezone.
"""
# If datetime has no timezone, assume server's local timezone
# if we're on Python 3. If we're on Python 2.7, raise an
# exception since Python < 3.2 has no builtin `timezone` class
if not scheduled_datetime.tzinfo:
try:
from datetime import timezone
except ImportError:
raise ValueError('datetime object with no timezone')
tz = timezone(timedelta(seconds=-(time.timezone if time.daylight == 0 else time.altzone)))
scheduled_datetime = scheduled_datetime.replace(tzinfo=tz)

@ -0,0 +1,173 @@
import json
from typing import Any, Optional
import zlib
from base64 import b64decode, b64encode
from datetime import datetime, timezone
from enum import Enum
from uuid import uuid4
from redis import Redis
from redis.client import Pipeline
from .compat import decode_redis_hash
from .job import Job
from .serializers import resolve_serializer
from .utils import now
def get_key(job_id):
return 'rq:results:%s' % job_id
class Result(object):
class Type(Enum):
SUCCESSFUL = 1
FAILED = 2
STOPPED = 3
def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None,
created_at: Optional[datetime] = None, return_value: Optional[Any] = None,
exc_string: Optional[str] = None, serializer=None):
self.return_value = return_value
self.exc_string = exc_string
self.type = type
self.created_at = created_at if created_at else now()
self.serializer = resolve_serializer(serializer)
self.connection = connection
self.job_id = job_id
self.id = id
def __repr__(self):
return f'Result(id={self.id}, type={self.Type(self.type).name})'
def __eq__(self, other):
try:
return self.id == other.id
except AttributeError:
return False
def __bool__(self):
return bool(self.id)
@classmethod
def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None):
result = cls(job_id=job.id, type=type, connection=job.connection,
return_value=return_value,
exc_string=exc_string, serializer=job.serializer)
result.save(ttl=ttl, pipeline=pipeline)
return result
@classmethod
def create_failure(cls, job, ttl, exc_string, pipeline=None):
result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection,
exc_string=exc_string, serializer=job.serializer)
result.save(ttl=ttl, pipeline=pipeline)
return result
@classmethod
def all(cls, job: Job, serializer=None):
"""Returns all results for job"""
# response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-')
results = []
for (result_id, payload) in response:
results.append(
cls.restore(job.id, result_id.decode(), payload,
connection=job.connection, serializer=serializer)
)
return results
@classmethod
def count(cls, job: Job) -> int:
"""Returns the number of job results"""
return job.connection.xlen(cls.get_key(job.id))
@classmethod
def delete_all(cls, job: Job) -> None:
"""Delete all job results"""
job.connection.delete(cls.get_key(job.id))
@classmethod
def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result':
"""Create a Result object from given Redis payload"""
created_at = datetime.fromtimestamp(
int(result_id.split('-')[0]) / 1000, tz=timezone.utc
)
payload = decode_redis_hash(payload)
# data, timestamp = payload
# result_data = json.loads(data)
# created_at = datetime.fromtimestamp(timestamp, tz=timezone.utc)
serializer = resolve_serializer(serializer)
return_value = payload.get('return_value')
if return_value is not None:
return_value = serializer.loads(b64decode(return_value.decode()))
exc_string = payload.get('exc_string')
if exc_string:
exc_string = zlib.decompress(b64decode(exc_string)).decode()
return Result(job_id, Result.Type(int(payload['type'])), connection=connection,
id=result_id,
created_at=created_at,
return_value=return_value,
exc_string=exc_string)
@classmethod
def fetch(cls, job: Job, serializer=None) -> Optional['Result']:
"""Fetch a result that matches a given job ID. The current sorted set
based implementation does not allow us to fetch a given key by ID
so we need to iterate through results, deserialize the payload and
look for a matching ID.
Future Redis streams based implementation may make this more efficient
and scalable.
"""
return None
@classmethod
def fetch_latest(cls, job: Job, serializer=None) -> Optional['Result']:
"""Returns the latest result for given job instance or ID"""
# response = job.connection.zrevrangebyscore(cls.get_key(job.id), '+inf', '-inf',
# start=0, num=1, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1)
if not response:
return None
result_id, payload = response[0]
return cls.restore(job.id, result_id.decode(), payload,
connection=job.connection, serializer=serializer)
@classmethod
def get_key(cls, job_id):
return 'rq:results:%s' % job_id
def save(self, ttl, pipeline=None):
"""Save result data to Redis"""
key = self.get_key(self.job_id)
connection = pipeline if pipeline is not None else self.connection
# result = connection.zadd(key, {self.serialize(): self.created_at.timestamp()})
result = connection.xadd(key, self.serialize(), maxlen=10)
# If xadd() is called in a pipeline, it returns a pipeline object instead of stream ID
if pipeline is None:
self.id = result.decode()
if ttl is not None:
connection.expire(key, ttl)
return self.id
def serialize(self):
data = {'type': self.type.value}
if self.exc_string is not None:
data['exc_string'] = b64encode(zlib.compress(self.exc_string.encode())).decode()
serialized = self.serializer.dumps(self.return_value)
if self.return_value is not None:
data['return_value'] = b64encode(serialized).decode()
# return json.dumps(data)
return data

@ -182,6 +182,11 @@ def utcnow():
return datetime.datetime.utcnow()
def now():
"""Return now in UTC"""
return datetime.datetime.now(datetime.timezone.utc)
_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'

@ -18,6 +18,7 @@ from datetime import timedelta
from enum import Enum
from uuid import uuid4
from random import shuffle
from typing import Callable, List, Optional
try:
from signal import SIGKILL
@ -39,6 +40,7 @@ from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
from .results import Result
from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
@ -198,7 +200,7 @@ class Worker:
self.queues = queues
self.validate_queues()
self._ordered_queues = self.queues[:]
self._exc_handlers = []
self._exc_handlers: List[Callable] = []
self.default_result_ttl = default_result_ttl
self.default_worker_ttl = default_worker_ttl
@ -225,8 +227,8 @@ class Worker:
self.disable_default_exception_handler = disable_default_exception_handler
if prepare_for_work:
self.hostname = socket.gethostname()
self.pid = os.getpid()
self.hostname: Optional[str] = socket.gethostname()
self.pid: Optional[int] = os.getpid()
try:
connection.client_setname(self.name)
except redis.exceptions.ResponseError:
@ -252,7 +254,7 @@ class Worker:
else:
self.hostname = None
self.pid = None
self.ip_address = None
self.ip_address = 'unknown'
if isinstance(exception_handlers, (list, tuple)):
for handler in exception_handlers:
@ -290,6 +292,11 @@ class Worker:
"""Returns the worker's Redis hash key."""
return PUBSUB_CHANNEL_TEMPLATE % self.name
@property
def supports_redis_streams(self) -> bool:
"""Only supported by Redis server >= 5.0 is required."""
return self.get_redis_server_version() >= (5, 0, 0)
@property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
@ -823,7 +830,7 @@ class Worker:
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60):
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
@ -956,11 +963,13 @@ class Worker:
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None,
exc_string=''):
"""Handles the failure or an executing job by:
"""
Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
`save_exc_to_job` should only be used for testing purposes
"""
self.log.debug('Handling failed execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
@ -991,8 +1000,14 @@ class Worker:
if not self.disable_default_exception_handler and not retry:
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class, serializer=job.serializer)
# Exception should be saved in job hash if server
# doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
exc_string=exc_string, pipeline=pipeline,
_save_exc_to_job=_save_exc_to_job)
if self.supports_redis_streams:
Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
pipeline.execute()
@ -1018,7 +1033,7 @@ class Worker:
# even if Redis is down
pass
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry):
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry):
self.log.debug('Handling successful execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
@ -1038,7 +1053,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time(
job.ended_at - job.started_at, pipeline
job.ended_at - job.started_at, pipeline # type: ignore
)
result_ttl = job.get_result_ttl(self.default_result_ttl)
@ -1046,8 +1061,16 @@ class Worker:
self.log.debug('Setting job %s status to finished', job.id)
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.worker_name = None
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
# Result should be saved in job hash only if server
# doesn't support Redis streams
include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False,
include_result=include_result)
if self.supports_redis_streams:
Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result,
ttl=result_ttl, pipeline=pipeline)
finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)

@ -50,6 +50,7 @@ class RQTestCase(unittest.TestCase):
# Store the connection (for sanity checking)
cls.testconn = testconn
cls.connection = testconn
# Shut up logging
logging.disable(logging.ERROR)

@ -298,7 +298,7 @@ class TestJob(RQTestCase):
exception_string = 'Some exception'
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.exc_info = exception_string
job._exc_info = exception_string
job.save()
# exc_info is stored in compressed format

@ -397,7 +397,6 @@ class TestFailedJobRegistry(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.started_at, None)
self.assertEqual(job.ended_at, None)
self.assertEqual(job.exc_info, '')
worker.work(burst=True)
self.assertTrue(job in registry)

@ -0,0 +1,201 @@
import unittest
from datetime import timedelta
from unittest.mock import patch, PropertyMock
from redis import Redis
from tests import RQTestCase
from rq.job import Job
from rq.queue import Queue
from rq.registry import StartedJobRegistry
from rq.results import Result, get_key
from rq.utils import get_version, utcnow
from rq.worker import Worker
from .fixtures import say_hello
@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
class TestScheduledJobRegistry(RQTestCase):
def test_save_and_get_result(self):
"""Ensure data is saved properly"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
result = Result.fetch_latest(job)
self.assertIsNone(result)
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
result = Result.fetch_latest(job)
self.assertEqual(result.return_value, 1)
self.assertEqual(job.latest_result().return_value, 1)
# Check that ttl is properly set
key = get_key(job.id)
ttl = self.connection.pttl(key)
self.assertTrue(5000 < ttl <= 10000)
# Check job with None return value
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=None)
result = Result.fetch_latest(job)
self.assertIsNone(result.return_value)
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2)
result = Result.fetch_latest(job)
self.assertEqual(result.return_value, 2)
def test_create_failure(self):
"""Ensure data is saved properly"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
Result.create_failure(job, ttl=10, exc_string='exception')
result = Result.fetch_latest(job)
self.assertEqual(result.exc_string, 'exception')
# Check that ttl is properly set
key = get_key(job.id)
ttl = self.connection.pttl(key)
self.assertTrue(5000 < ttl <= 10000)
def test_getting_results(self):
"""Check getting all execution results"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
# latest_result() returns None when there's no result
self.assertIsNone(job.latest_result())
result_1 = Result.create_failure(job, ttl=10, exc_string='exception')
result_2 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
result_3 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
# Result.fetch_latest() returns the latest result
result = Result.fetch_latest(job)
self.assertEqual(result, result_3)
self.assertEqual(job.latest_result(), result_3)
# Result.all() and job.results() returns all results, newest first
results = Result.all(job)
self.assertEqual(results, [result_3, result_2, result_1])
self.assertEqual(job.results(), [result_3, result_2, result_1])
def test_count(self):
"""Result.count(job) returns number of results"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
self.assertEqual(Result.count(job), 0)
Result.create_failure(job, ttl=10, exc_string='exception')
self.assertEqual(Result.count(job), 1)
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
self.assertEqual(Result.count(job), 2)
def test_delete_all(self):
"""Result.delete_all(job) deletes all results from Redis"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
Result.create_failure(job, ttl=10, exc_string='exception')
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
Result.delete_all(job)
self.assertEqual(Result.count(job), 0)
def test_job_successful_result_fallback(self):
"""Changes to job.result handling should be backwards compatible."""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
worker = Worker([queue])
worker.register_birth()
self.assertEqual(worker.failed_job_count, 0)
self.assertEqual(worker.successful_job_count, 0)
self.assertEqual(worker.total_working_time, 0)
# These should only run on workers that supports Redis streams
registry = StartedJobRegistry(connection=self.connection)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
job._result = 'Success'
worker.handle_job_success(job, queue, registry)
payload = self.connection.hgetall(job.key)
self.assertFalse(b'result' in payload.keys())
self.assertEqual(job.result, 'Success')
with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job._result = 'Success'
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_success(job, queue, registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'result' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
self.assertEqual(job.result, 'Success')
def test_job_failed_result_fallback(self):
"""Changes to job.result failure handling should be backwards compatible."""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
worker = Worker([queue])
worker.register_birth()
self.assertEqual(worker.failed_job_count, 0)
self.assertEqual(worker.successful_job_count, 0)
self.assertEqual(worker.total_working_time, 0)
registry = StartedJobRegistry(connection=self.connection)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
worker.handle_job_failure(job, exc_string='Error', queue=queue,
started_job_registry=registry)
job = Job.fetch(job.id, connection=self.connection)
payload = self.connection.hgetall(job.key)
self.assertFalse(b'exc_info' in payload.keys())
self.assertEqual(job.exc_info, 'Error')
with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
mock.return_value = False
worker = Worker([queue])
worker.register_birth()
job = queue.enqueue(say_hello)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
# If `save_result_to_job` = True, result will be saved to job
# hash, simulating older versions of RQ
worker.handle_job_failure(job, exc_string='Error', queue=queue,
started_job_registry=registry)
payload = self.connection.hgetall(job.key)
self.assertTrue(b'exc_info' in payload.keys())
# Delete all new result objects so we only have result stored in job hash,
# this should simulate a job that was executed in an earlier RQ version
Result.delete_all(job)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.exc_info, 'Error')
def test_job_return_value(self):
"""Test job.return_value"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
# Returns None when there's no result
self.assertIsNone(job.return_value())
Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
self.assertEqual(job.return_value(), 1)
# Returns None if latest result is a failure
Result.create_failure(job, ttl=10, exc_string='exception')
self.assertIsNone(job.return_value(refresh=True))

@ -31,6 +31,7 @@ from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text
from rq.job import Job, JobStatus, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.results import Result
from rq.suspension import resume, suspend
from rq.utils import utcnow
from rq.version import VERSION
@ -178,7 +179,11 @@ class TestWorker(RQTestCase):
w.work(burst=True), True,
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Frank!')
expected_result = 'Hi there, Frank!'
self.assertEqual(job.result, expected_result)
# Only run if Redis server supports streams
if job.supports_redis_streams:
self.assertEqual(Result.fetch_latest(job).return_value, expected_result)
self.assertIsNone(job.worker_name)
def test_job_times(self):
@ -380,6 +385,10 @@ class TestWorker(RQTestCase):
# to the failed queue
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertTrue(job.exc_info) # should contain exc_info
if job.supports_redis_streams:
result = Result.fetch_latest(job)
self.assertEqual(result.exc_string, job.exc_info)
self.assertEqual(result.type, Result.Type.FAILED)
def test_horse_fails(self):
"""Tests that job status is set to FAILED even if horse unexpectedly fails"""
@ -414,7 +423,7 @@ class TestWorker(RQTestCase):
def test_statistics(self):
"""Successful and failed job counts are saved properly"""
queue = Queue()
queue = Queue(connection=self.connection)
job = queue.enqueue(div_by_zero)
worker = Worker([queue])
worker.register_birth()

Loading…
Cancel
Save