From 0691b4d46eb721b5fb5395a03fd770db7e1651f4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 19 Nov 2022 15:17:35 +0700 Subject: [PATCH] Multiple results using Redis Streams (#1725) * WIP job results * Result can now be saved * Successfully saved and restored result * result.save() should accept pipeline * Successful results are saved * Failures are now saved properly too. * Added test for Result.get_latest() * Checkpoint * Got Result.all() to work * Added Result.count(), Result.delete() * Backward compatibility for job.result and job.exc_info * Added some typing * More typing stuff * Fixed typing in job.py * More typing updates * Only keep the last 10 results * Documented job.results() * Got results test to pass * Don't run test_results.py on Redis server < 5.0 * Fixed mock import on some Python versions * Remove Redis 3 from test matrix * Jobs should never use the new Result implementation if server is < 5.0 * Results should only be created is Redis stream is supported. * Added back Redis 3 to test matrix * Fixed job.supports_redis_streams * Fixed worker test * Updated docs. --- docs/docs/index.md | 6 +- docs/docs/results.md | 49 +++++++++- rq/job.py | 176 +++++++++++++++++++++++++----------- rq/queue.py | 2 +- rq/registry.py | 25 ++--- rq/results.py | 173 +++++++++++++++++++++++++++++++++++ rq/utils.py | 5 + rq/worker.py | 45 ++++++--- tests/__init__.py | 1 + tests/test_job.py | 2 +- tests/test_registry.py | 1 - tests/test_results.py | 201 +++++++++++++++++++++++++++++++++++++++++ tests/test_worker.py | 13 ++- 13 files changed, 606 insertions(+), 93 deletions(-) create mode 100644 rq/results.py create mode 100644 tests/test_results.py diff --git a/docs/docs/index.md b/docs/docs/index.md index fb2a68b..bbf7841 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -39,11 +39,11 @@ q = Queue(connection=redis_conn) # no args implies the default queue # Delay execution of count_words_at_url('http://nvie.com') job = q.enqueue(count_words_at_url, 'http://nvie.com') -print(job.result) # => None +print(job.result) # => None # Changed to job.return_value() in RQ >= 1.12.0 # Now, wait a while, until the worker is finished time.sleep(2) -print(job.result) # => 889 +print(job.result) # => 889 # Changed to job.return_value() in RQ >= 1.12.0 ``` If you want to put the work on a specific queue, simply specify its name: @@ -377,7 +377,7 @@ def add(x, y): job = add.delay(3, 4) time.sleep(1) -print(job.result) +print(job.result) # Changed to job.return_value() in RQ >= 1.12.0 ``` diff --git a/docs/docs/results.md b/docs/docs/results.md index 12059c3..af0f96f 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -11,19 +11,19 @@ solving a problem, but are getting back a few in return. Python functions may have return values, so jobs can have them, too. If a job returns a non-`None` return value, the worker will write that return value back -to the job's Redis hash under the `result` key. The job's Redis hash itself +to the job's Redis hash under the `result` key. The job's Redis hash itself will expire after 500 seconds by default after the job is finished. The party that enqueued the job gets back a `Job` instance as a result of the -enqueueing itself. Such a `Job` object is a proxy object that is tied to the +enqueueing itself. Such a `Job` object is a proxy object that is tied to the job's ID, to be able to poll for results. -**On the return value's TTL** +### Return Value TTL Return values are written back to Redis with a limited lifetime (via a Redis expiring key), which is merely to avoid ever-growing Redis databases. -From RQ >= 0.3.1, The TTL value of the job result can be specified using the +The TTL value of the job result can be specified using the `result_ttl` keyword argument to `enqueue()` and `enqueue_call()` calls. It can also be used to disable the expiry altogether. You then are responsible for cleaning up jobs yourself, though, so be careful to use that. @@ -113,3 +113,44 @@ low.enqueue(really_really_slow, job_timeout=3600) # 1 hr Individual jobs can still specify an alternative timeout, as workers will respect these. + + +## Job Results +_New in version 1.12.0._ + +If a job is executed multiple times, you can access its execution history by calling +`job.results()`. RQ will store up to 10 latest execution results. + +Calling `job.latest_result()` will return the latest `Result` object, which has the +following attributes: +* `type` - an enum of `SUCCESSFUL`, `FAILED` or `STOPPED` +* `created_at` - the time at which result is created +* `return_value` - job's return value, only present if result type is `SUCCESSFUL` +* `exc_string` - the exception raised by job, only present if result type is `FAILED` +* `job_id` + +```python +job = Job.fetch(id='my_id', connection=redis) +result = job.latest_result() # returns Result(id=uid, type=SUCCESSFUL) +if result == result.Type.SUCCESSFUL: + print(result.return_value) +else: + print(result.exc_string) +``` + +Alternatively, you can also use `job.return_value()` as a shortcut to accessing +the return value of the latest result. Note that `job.return_value` will only +return a not-`None` object if the latest result is a successful execution. + +```python +job = Job.fetch(id='my_id', connection=redis) +print(job.return_value()) # Shortcut for job.latest_result().return_value +``` + +To access multiple results, use `job.results()`. + +```python +job = Job.fetch(id='my_id', connection=redis) +for result in job.results(): + print(result.created_at, result.type) +``` diff --git a/rq/job.py b/rq/job.py index f321dd1..84e9345 100644 --- a/rq/job.py +++ b/rq/job.py @@ -10,11 +10,13 @@ from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum from functools import partial -from uuid import uuid4 from redis import WatchError +from typing import Any, List, Optional +from uuid import uuid4 if t.TYPE_CHECKING: - from rq.queue import Queue + from .results import Result + from .queue import Queue from redis import Redis from redis.client import Pipeline @@ -65,14 +67,14 @@ class Dependency: UNEVALUATED = object() -def cancel_job(job_id: str, connection: t.Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): +def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): """Cancels the job with the given job ID, preventing execution. Discards any job info (i.e. it can't be requeued later). """ Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents) -def get_current_job(connection: t.Optional['Redis'] = None, job_class: t.Optional['Job'] = None): +def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['Job'] = None): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. """ @@ -94,8 +96,8 @@ class Job: # Job construction @classmethod - def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: t.Optional['Redis'] = None, - result_ttl=None, ttl=None, status=None, description=None, + def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: Optional['Redis'] = None, + result_ttl=None, ttl=None, status: JobStatus = None, description=None, depends_on=None, timeout=None, id=None, origin=None, meta=None, failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job': """Creates a new Job instance for the given function, arguments, and @@ -182,7 +184,7 @@ class Job: return self._status - def set_status(self, status: str, pipeline: t.Optional['Pipeline'] = None): + def set_status(self, status: str, pipeline: Optional['Pipeline'] = None): self._status = status connection: 'Redis' = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @@ -235,7 +237,7 @@ class Job: return self._dependency_ids[0] @property - def dependency(self) -> t.Optional['Job']: + def dependency(self) -> Optional['Job']: """Returns a job's first dependency. To avoid repeated Redis fetches, we cache job.dependency as job._dependency. """ @@ -363,13 +365,13 @@ class Job: self._data = UNEVALUATED @classmethod - def exists(cls, job_id: str, connection: t.Optional['Redis'] = None) -> int: + def exists(cls, job_id: str, connection: Optional['Redis'] = None) -> int: """Returns whether a job hash exists for the given job ID.""" conn = resolve_connection(connection) return conn.exists(cls.key_for(job_id)) @classmethod - def fetch(cls, id: str, connection: t.Optional['Redis'] = None, serializer=None) -> 'Job': + def fetch(cls, id: str, connection: Optional['Redis'] = None, serializer=None) -> 'Job': """Fetches a persisted job from its corresponding Redis key and instantiates it. """ @@ -378,7 +380,7 @@ class Job: return job @classmethod - def fetch_many(cls, job_ids: t.List[str], connection: 'Redis', serializer=None): + def fetch_many(cls, job_ids: t.Iterable[str], connection: 'Redis', serializer=None): """ Bulk version of Job.fetch @@ -390,7 +392,7 @@ class Job: pipeline.hgetall(cls.key_for(job_id)) results = pipeline.execute() - jobs: t.List[t.Optional['Job']] = [] + jobs: t.List[Optional['Job']] = [] for i, job_id in enumerate(job_ids): if results[i]: job = cls(job_id, connection=connection, serializer=serializer) @@ -401,7 +403,7 @@ class Job: return jobs - def __init__(self, id: str = None, connection: t.Optional['Redis'] = None, serializer=None): + def __init__(self, id: str = None, connection: Optional['Redis'] = None, serializer=None): self.connection = resolve_connection(connection) self._id = id self.created_at = utcnow() @@ -416,26 +418,29 @@ class Job: self._failure_callback = UNEVALUATED self.description = None self.origin = None - self.enqueued_at: t.Optional[datetime] = None - self.started_at: t.Optional[datetime] = None - self.ended_at: t.Optional[datetime] = None + self.enqueued_at: Optional[datetime] = None + self.started_at: Optional[datetime] = None + self.ended_at: Optional[datetime] = None self._result = None - self.exc_info = None - self.timeout = None - self.result_ttl: t.Optional[int] = None - self.failure_ttl: t.Optional[int] = None - self.ttl: t.Optional[int] = None - self.worker_name: t.Optional[str] = None + self._exc_info = None + self.timeout: Optional[float] = None + self.result_ttl: Optional[int] = None + self.failure_ttl: Optional[int] = None + self.ttl: Optional[int] = None + self.worker_name: Optional[str] = None self._status = None self._dependency_ids: t.List[str] = [] self.meta = {} self.serializer = resolve_serializer(serializer) self.retries_left = None - self.retry_intervals: t.Optional[t.List[int]] = None + self.retry_intervals: Optional[t.List[int]] = None self.redis_server_version = None - self.last_heartbeat: t.Optional[datetime] = None - self.allow_dependency_failures: t.Optional[bool] = None - self.enqueue_at_front: t.Optional[bool] = None + self.last_heartbeat: Optional[datetime] = None + self.allow_dependency_failures: Optional[bool] = None + self.enqueue_at_front: Optional[bool] = None + + from .results import Result + self._cached_result: Optional[Result] = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -468,7 +473,7 @@ class Job: raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, timestamp: datetime, ttl: int, pipeline: t.Optional['Pipeline'] = None, xx: bool = False): + def heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -500,7 +505,7 @@ class Job: def dependencies_key(self): return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) - def fetch_dependencies(self, watch: bool = False, pipeline: t.Optional['Pipeline'] = None): + def fetch_dependencies(self, watch: bool = False, pipeline: Optional['Pipeline'] = None): """ Fetch all of a job's dependencies. If a pipeline is supplied, and watch is true, then set WATCH on all the keys of all dependencies. @@ -522,7 +527,39 @@ class Job: return jobs @property - def result(self): + def exc_info(self) -> Optional[str]: + """ + Get the latest result and returns `exc_info` only if the latest result is a failure. + """ + warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", + DeprecationWarning) + + from .results import Result + + if self.supports_redis_streams: + if not self._cached_result: + self._cached_result = self.latest_result() + + if self._cached_result and self._cached_result.type == Result.Type.FAILED: + return self._cached_result.exc_string + + return self._exc_info + + def return_value(self, refresh=False) -> Any: + """Returns the return value of the latest execution, if it was successful""" + from .results import Result + if refresh: + self._cached_result = None + + if self.supports_redis_streams: + if not self._cached_result: + self._cached_result = self.latest_result() + + if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: + return self._cached_result.return_value + + @property + def result(self) -> Any: """Returns the return value of the job. Initially, right after enqueueing a job, the return value will be @@ -538,6 +575,20 @@ class Job: written back to Redis will expire after a given amount of time (500 seconds by default). """ + + warnings.warn("job.result is deprecated, use job.return_value instead.", + DeprecationWarning) + + from .results import Result + + if self.supports_redis_streams: + if not self._cached_result: + self._cached_result = self.latest_result() + + if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: + return self._cached_result.return_value + + # Fallback to old behavior of getting result from job hash if self._result is None: rv = self.connection.hget(self.key, 'result') if rv is not None: @@ -545,8 +596,15 @@ class Job: self._result = self.serializer.loads(rv) return self._result - """Backwards-compatibility accessor property `return_value`.""" - return_value = result + def results(self) -> List['Result']: + """Returns all Result objects""" + from .results import Result + return Result.all(self, serializer=self.serializer) + + def latest_result(self) -> Optional['Result']: + """Returns the latest Result object""" + from .results import Result + return Result.fetch_latest(self, serializer=self.serializer) def restore(self, raw_data): """Overwrite properties with the provided values stored in Redis""" @@ -573,7 +631,7 @@ class Job: result = obj.get('result') if result: try: - self._result = self.serializer.loads(obj.get('result')) + self._result = self.serializer.loads(result) except Exception: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None @@ -591,7 +649,8 @@ class Job: dep_id = obj.get('dependency_id') # for backwards compatibility self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []) - self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None + allow_failures = obj.get('allow_dependency_failures') + self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} @@ -603,10 +662,10 @@ class Job: raw_exc_info = obj.get('exc_info') if raw_exc_info: try: - self.exc_info = as_text(zlib.decompress(raw_exc_info)) + self._exc_info = as_text(zlib.decompress(raw_exc_info)) except zlib.error: # Fallback to uncompressed string - self.exc_info = as_text(raw_exc_info) + self._exc_info = as_text(raw_exc_info) # Persistence def refresh(self): # noqa @@ -620,7 +679,7 @@ class Job: raise NoSuchJobError('No such job: {0}'.format(self.key)) self.restore(data) - def to_dict(self, include_meta: bool = True) -> dict: + def to_dict(self, include_meta: bool = True, include_result: bool = True) -> dict: """ Returns a serialization of the current job instance @@ -649,13 +708,13 @@ class Job: if self.enqueued_at is not None: obj['enqueued_at'] = utcformat(self.enqueued_at) - if self._result is not None: + if self._result is not None and include_result: try: obj['result'] = self.serializer.dumps(self._result) except: # noqa obj['result'] = "Unserializable return value" - if self.exc_info is not None: - obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8')) + if self._exc_info is not None and include_result: + obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8')) if self.timeout is not None: obj['timeout'] = self.timeout if self.result_ttl is not None: @@ -681,7 +740,8 @@ class Job: return obj - def save(self, pipeline: t.Optional['Pipeline'] = None, include_meta: bool = True): + def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, + include_result=True): """ Dumps the current job instance to its corresponding Redis key. @@ -694,13 +754,18 @@ class Job: key = self.key connection = pipeline if pipeline is not None else self.connection - mapping = self.to_dict(include_meta=include_meta) + mapping = self.to_dict(include_meta=include_meta, include_result=include_result) if self.get_redis_server_version() >= (4, 0, 0): connection.hset(key, mapping=mapping) else: connection.hmset(key, mapping) + @property + def supports_redis_streams(self) -> bool: + """Only supported by Redis server >= 5.0 is required.""" + return self.get_redis_server_version() >= (5, 0, 0) + def get_redis_server_version(self): """Return Redis server version of connection""" if not self.redis_server_version: @@ -713,7 +778,7 @@ class Job: meta = self.serializer.dumps(self.meta) self.connection.hset(self.key, 'meta', meta) - def cancel(self, pipeline: t.Optional['Pipeline'] = None, enqueue_dependents: bool = False): + def cancel(self, pipeline: Optional['Pipeline'] = None, enqueue_dependents: bool = False): """Cancels the given job, which will prevent the job from ever being ran (or inspected). @@ -773,12 +838,13 @@ class Job: """Requeues job.""" return self.failed_job_registry.requeue(self, at_front=at_front) - def _remove_from_registries(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True): + def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): + from .registry import BaseRegistry if remove_from_queue: from .queue import Queue q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q.remove(self, pipeline=pipeline) - + registry: BaseRegistry if self.is_finished: from .registry import FinishedJobRegistry registry = FinishedJobRegistry(self.origin, @@ -821,7 +887,7 @@ class Job: serializer=self.serializer) registry.remove(self, pipeline=pipeline) - def delete(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True, + def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents=False): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well.""" @@ -835,7 +901,7 @@ class Job: connection.delete(self.key, self.dependents_key, self.dependencies_key) - def delete_dependents(self, pipeline: t.Optional['Pipeline'] = None): + def delete_dependents(self, pipeline: Optional['Pipeline'] = None): """Delete jobs depending on this job.""" connection = pipeline if pipeline is not None else self.connection for dependent_id in self.dependent_ids: @@ -866,9 +932,9 @@ class Job: self.started_at = self.last_heartbeat self._status = JobStatus.STARTED mapping = { - 'last_heartbeat': utcformat(self.last_heartbeat), + 'last_heartbeat': utcformat(self.last_heartbeat), # type: ignore 'status': self._status, - 'started_at': utcformat(self.started_at), + 'started_at': utcformat(self.started_at), # type: ignore 'worker_name': worker_name } if self.get_redis_server_version() >= (4, 0, 0): @@ -884,14 +950,14 @@ class Job: return coro_result return result - def get_ttl(self, default_ttl: t.Optional[int] = None): + def get_ttl(self, default_ttl: Optional[int] = None): """Returns ttl for a job that determines how long a job will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.ttl is None else self.ttl - def get_result_ttl(self, default_ttl: t.Optional[int] = None): + def get_result_ttl(self, default_ttl: Optional[int] = None): """Returns ttl for a job that determines how long a jobs result will be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. @@ -905,7 +971,7 @@ class Job: """ return get_call_string(self.func_name, self.args, self.kwargs, max_length=75) - def cleanup(self, ttl: t.Optional[int] = None, pipeline: t.Optional['Pipeline'] = None, + def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its @@ -961,7 +1027,7 @@ class Job: else: queue.enqueue_job(self, pipeline=pipeline) - def register_dependency(self, pipeline: t.Optional['Pipeline'] = None): + def register_dependency(self, pipeline: Optional['Pipeline'] = None): """Jobs may have dependencies. Jobs are enqueued only if the jobs they depend on are successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something @@ -993,8 +1059,8 @@ class Job: return [Job.key_for(_id.decode()) for _id in dependencies] - def dependencies_are_met(self, parent_job: t.Optional['Job'] = None, - pipeline: t.Optional['Pipeline'] = None, exclude_job_id: str = None): + def dependencies_are_met(self, parent_job: Optional['Job'] = None, + pipeline: Optional['Pipeline'] = None, exclude_job_id: str = None): """Returns a boolean indicating if all of this job's dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. @@ -1015,7 +1081,7 @@ class Job: if exclude_job_id: dependencies_ids.discard(exclude_job_id) - if parent_job.id == exclude_job_id: + if parent_job and parent_job.id == exclude_job_id: parent_job = None if parent_job: diff --git a/rq/queue.py b/rq/queue.py index d21d097..6628565 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -15,7 +15,7 @@ from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError -from .job import Job, JobStatus, Dependency +from .job import Job, JobStatus from .serializers import resolve_serializer from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow diff --git a/rq/registry.py b/rq/registry.py index 4b8089c..2d484ec 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -105,7 +105,7 @@ class BaseRegistry: job_instance.delete() return result - def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None): + def get_expired_job_ids(self, timestamp: t.Optional[float] = None): """Returns job ids whose score are less than current timestamp. Returns ids for jobs with an expiry time earlier than timestamp, @@ -164,7 +164,7 @@ class BaseRegistry: job_class=self.job_class, serializer=serializer) job.started_at = None job.ended_at = None - job.exc_info = '' + job._exc_info = '' job.save() job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front) pipeline.execute() @@ -182,7 +182,7 @@ class StartedJobRegistry(BaseRegistry): """ key_template = 'rq:wip:{0}' - def cleanup(self, timestamp: t.Optional[datetime] = None): + def cleanup(self, timestamp: t.Optional[float] = None): """Remove expired jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -215,7 +215,7 @@ class StartedJobRegistry(BaseRegistry): else: job.set_status(JobStatus.FAILED) - job.exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() + job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) @@ -233,7 +233,7 @@ class FinishedJobRegistry(BaseRegistry): """ key_template = 'rq:finished:{0}' - def cleanup(self, timestamp: t.Optional[datetime] = None): + def cleanup(self, timestamp: t.Optional[float] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -250,7 +250,7 @@ class FailedJobRegistry(BaseRegistry): """ key_template = 'rq:failed:{0}' - def cleanup(self, timestamp: t.Optional[datetime] = None): + def cleanup(self, timestamp: t.Optional[float] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -260,7 +260,8 @@ class FailedJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) - def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None): + def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None, + _save_exc_to_job: bool = False): """ Adds a job to a registry with expiry time of now + ttl. `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. @@ -274,8 +275,8 @@ class FailedJobRegistry(BaseRegistry): else: p = self.connection.pipeline() - job.exc_info = exc_string - job.save(pipeline=p, include_meta=False) + job._exc_info = exc_string + job.save(pipeline=p, include_meta=False, include_result=_save_exc_to_job) job.cleanup(ttl=ttl, pipeline=p) p.zadd(self.key, {job.id: score}) @@ -315,13 +316,7 @@ class ScheduledJobRegistry(BaseRegistry): If datetime has no tzinfo, it will assume localtimezone. """ # If datetime has no timezone, assume server's local timezone - # if we're on Python 3. If we're on Python 2.7, raise an - # exception since Python < 3.2 has no builtin `timezone` class if not scheduled_datetime.tzinfo: - try: - from datetime import timezone - except ImportError: - raise ValueError('datetime object with no timezone') tz = timezone(timedelta(seconds=-(time.timezone if time.daylight == 0 else time.altzone))) scheduled_datetime = scheduled_datetime.replace(tzinfo=tz) diff --git a/rq/results.py b/rq/results.py new file mode 100644 index 0000000..931907c --- /dev/null +++ b/rq/results.py @@ -0,0 +1,173 @@ +import json +from typing import Any, Optional +import zlib + +from base64 import b64decode, b64encode +from datetime import datetime, timezone +from enum import Enum +from uuid import uuid4 + +from redis import Redis +from redis.client import Pipeline + +from .compat import decode_redis_hash +from .job import Job +from .serializers import resolve_serializer +from .utils import now + + +def get_key(job_id): + return 'rq:results:%s' % job_id + + +class Result(object): + + class Type(Enum): + SUCCESSFUL = 1 + FAILED = 2 + STOPPED = 3 + + def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None, + created_at: Optional[datetime] = None, return_value: Optional[Any] = None, + exc_string: Optional[str] = None, serializer=None): + self.return_value = return_value + self.exc_string = exc_string + self.type = type + self.created_at = created_at if created_at else now() + self.serializer = resolve_serializer(serializer) + self.connection = connection + self.job_id = job_id + self.id = id + + def __repr__(self): + return f'Result(id={self.id}, type={self.Type(self.type).name})' + + def __eq__(self, other): + try: + return self.id == other.id + except AttributeError: + return False + + def __bool__(self): + return bool(self.id) + + @classmethod + def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None): + result = cls(job_id=job.id, type=type, connection=job.connection, + return_value=return_value, + exc_string=exc_string, serializer=job.serializer) + result.save(ttl=ttl, pipeline=pipeline) + return result + + @classmethod + def create_failure(cls, job, ttl, exc_string, pipeline=None): + result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection, + exc_string=exc_string, serializer=job.serializer) + result.save(ttl=ttl, pipeline=pipeline) + return result + + @classmethod + def all(cls, job: Job, serializer=None): + """Returns all results for job""" + # response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True) + response = job.connection.xrevrange(cls.get_key(job.id), '+', '-') + results = [] + for (result_id, payload) in response: + results.append( + cls.restore(job.id, result_id.decode(), payload, + connection=job.connection, serializer=serializer) + ) + + return results + + @classmethod + def count(cls, job: Job) -> int: + """Returns the number of job results""" + return job.connection.xlen(cls.get_key(job.id)) + + @classmethod + def delete_all(cls, job: Job) -> None: + """Delete all job results""" + job.connection.delete(cls.get_key(job.id)) + + @classmethod + def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result': + """Create a Result object from given Redis payload""" + created_at = datetime.fromtimestamp( + int(result_id.split('-')[0]) / 1000, tz=timezone.utc + ) + payload = decode_redis_hash(payload) + # data, timestamp = payload + # result_data = json.loads(data) + # created_at = datetime.fromtimestamp(timestamp, tz=timezone.utc) + + serializer = resolve_serializer(serializer) + return_value = payload.get('return_value') + if return_value is not None: + return_value = serializer.loads(b64decode(return_value.decode())) + + exc_string = payload.get('exc_string') + if exc_string: + exc_string = zlib.decompress(b64decode(exc_string)).decode() + + return Result(job_id, Result.Type(int(payload['type'])), connection=connection, + id=result_id, + created_at=created_at, + return_value=return_value, + exc_string=exc_string) + + @classmethod + def fetch(cls, job: Job, serializer=None) -> Optional['Result']: + """Fetch a result that matches a given job ID. The current sorted set + based implementation does not allow us to fetch a given key by ID + so we need to iterate through results, deserialize the payload and + look for a matching ID. + + Future Redis streams based implementation may make this more efficient + and scalable. + """ + return None + + @classmethod + def fetch_latest(cls, job: Job, serializer=None) -> Optional['Result']: + """Returns the latest result for given job instance or ID""" + # response = job.connection.zrevrangebyscore(cls.get_key(job.id), '+inf', '-inf', + # start=0, num=1, withscores=True) + response = job.connection.xrevrange(cls.get_key(job.id), '+', '-', count=1) + if not response: + return None + + result_id, payload = response[0] + return cls.restore(job.id, result_id.decode(), payload, + connection=job.connection, serializer=serializer) + + @classmethod + def get_key(cls, job_id): + return 'rq:results:%s' % job_id + + def save(self, ttl, pipeline=None): + """Save result data to Redis""" + key = self.get_key(self.job_id) + + connection = pipeline if pipeline is not None else self.connection + # result = connection.zadd(key, {self.serialize(): self.created_at.timestamp()}) + result = connection.xadd(key, self.serialize(), maxlen=10) + # If xadd() is called in a pipeline, it returns a pipeline object instead of stream ID + if pipeline is None: + self.id = result.decode() + if ttl is not None: + connection.expire(key, ttl) + return self.id + + def serialize(self): + data = {'type': self.type.value} + + if self.exc_string is not None: + data['exc_string'] = b64encode(zlib.compress(self.exc_string.encode())).decode() + + serialized = self.serializer.dumps(self.return_value) + if self.return_value is not None: + data['return_value'] = b64encode(serialized).decode() + + # return json.dumps(data) + return data diff --git a/rq/utils.py b/rq/utils.py index abfb24b..7528c10 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -182,6 +182,11 @@ def utcnow(): return datetime.datetime.utcnow() +def now(): + """Return now in UTC""" + return datetime.datetime.now(datetime.timezone.utc) + + _TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' diff --git a/rq/worker.py b/rq/worker.py index 9e940fe..dff9be0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -18,6 +18,7 @@ from datetime import timedelta from enum import Enum from uuid import uuid4 from random import shuffle +from typing import Callable, List, Optional try: from signal import SIGKILL @@ -39,6 +40,7 @@ from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries +from .results import Result from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty @@ -198,7 +200,7 @@ class Worker: self.queues = queues self.validate_queues() self._ordered_queues = self.queues[:] - self._exc_handlers = [] + self._exc_handlers: List[Callable] = [] self.default_result_ttl = default_result_ttl self.default_worker_ttl = default_worker_ttl @@ -225,8 +227,8 @@ class Worker: self.disable_default_exception_handler = disable_default_exception_handler if prepare_for_work: - self.hostname = socket.gethostname() - self.pid = os.getpid() + self.hostname: Optional[str] = socket.gethostname() + self.pid: Optional[int] = os.getpid() try: connection.client_setname(self.name) except redis.exceptions.ResponseError: @@ -252,7 +254,7 @@ class Worker: else: self.hostname = None self.pid = None - self.ip_address = None + self.ip_address = 'unknown' if isinstance(exception_handlers, (list, tuple)): for handler in exception_handlers: @@ -290,6 +292,11 @@ class Worker: """Returns the worker's Redis hash key.""" return PUBSUB_CHANNEL_TEMPLATE % self.name + @property + def supports_redis_streams(self) -> bool: + """Only supported by Redis server >= 5.0 is required.""" + return self.get_redis_server_version() >= (5, 0, 0) + @property def horse_pid(self): """The horse's process ID. Only available in the worker. Will return @@ -823,7 +830,7 @@ class Worker: self.set_current_job_working_time((utcnow() - job.started_at).total_seconds()) # Kill the job from this side if something is really wrong (interpreter lock/etc). - if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): + if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore self.heartbeat(self.job_monitoring_interval + 60) self.kill_horse() self.wait_for_horse() @@ -956,11 +963,13 @@ class Worker: def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): - """Handles the failure or an executing job by: + """ + Handles the failure or an executing job by: 1. Setting the job status to failed 2. Removing the job from StartedJobRegistry 3. Setting the workers current job to None 4. Add the job to FailedJobRegistry + `save_exc_to_job` should only be used for testing purposes """ self.log.debug('Handling failed execution of job %s', job.id) with self.connection.pipeline() as pipeline: @@ -991,8 +1000,14 @@ class Worker: if not self.disable_default_exception_handler and not retry: failed_job_registry = FailedJobRegistry(job.origin, job.connection, job_class=self.job_class, serializer=job.serializer) + # Exception should be saved in job hash if server + # doesn't support Redis streams + _save_exc_to_job = not self.supports_redis_streams failed_job_registry.add(job, ttl=job.failure_ttl, - exc_string=exc_string, pipeline=pipeline) + exc_string=exc_string, pipeline=pipeline, + _save_exc_to_job=_save_exc_to_job) + if self.supports_redis_streams: + Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) with suppress(redis.exceptions.ConnectionError): pipeline.execute() @@ -1018,7 +1033,7 @@ class Worker: # even if Redis is down pass - def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry): + def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): self.log.debug('Handling successful execution of job %s', job.id) with self.connection.pipeline() as pipeline: @@ -1038,7 +1053,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline) self.increment_total_working_time( - job.ended_at - job.started_at, pipeline + job.ended_at - job.started_at, pipeline # type: ignore ) result_ttl = job.get_result_ttl(self.default_result_ttl) @@ -1046,8 +1061,16 @@ class Worker: self.log.debug('Setting job %s status to finished', job.id) job.set_status(JobStatus.FINISHED, pipeline=pipeline) job.worker_name = None - # Don't clobber the user's meta dictionary! - job.save(pipeline=pipeline, include_meta=False) + + # Result should be saved in job hash only if server + # doesn't support Redis streams + include_result = not self.supports_redis_streams + # Don't clobber user's meta dictionary! + job.save(pipeline=pipeline, include_meta=False, + include_result=include_result) + if self.supports_redis_streams: + Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result, + ttl=result_ttl, pipeline=pipeline) finished_job_registry = queue.finished_job_registry finished_job_registry.add(job, result_ttl, pipeline) diff --git a/tests/__init__.py b/tests/__init__.py index cb60399..1f1cc26 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -50,6 +50,7 @@ class RQTestCase(unittest.TestCase): # Store the connection (for sanity checking) cls.testconn = testconn + cls.connection = testconn # Shut up logging logging.disable(logging.ERROR) diff --git a/tests/test_job.py b/tests/test_job.py index 317e2f9..237c5ef 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -298,7 +298,7 @@ class TestJob(RQTestCase): exception_string = 'Some exception' job = Job.create(func=fixtures.say_hello, args=('Lionel',)) - job.exc_info = exception_string + job._exc_info = exception_string job.save() # exc_info is stored in compressed format diff --git a/tests/test_registry.py b/tests/test_registry.py index 9a63430..d7b607f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -397,7 +397,6 @@ class TestFailedJobRegistry(RQTestCase): self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.started_at, None) self.assertEqual(job.ended_at, None) - self.assertEqual(job.exc_info, '') worker.work(burst=True) self.assertTrue(job in registry) diff --git a/tests/test_results.py b/tests/test_results.py new file mode 100644 index 0000000..526fd66 --- /dev/null +++ b/tests/test_results.py @@ -0,0 +1,201 @@ +import unittest + +from datetime import timedelta +from unittest.mock import patch, PropertyMock + +from redis import Redis + +from tests import RQTestCase + +from rq.job import Job +from rq.queue import Queue +from rq.registry import StartedJobRegistry +from rq.results import Result, get_key +from rq.utils import get_version, utcnow +from rq.worker import Worker + +from .fixtures import say_hello + + +@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') +class TestScheduledJobRegistry(RQTestCase): + + def test_save_and_get_result(self): + """Ensure data is saved properly""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + result = Result.fetch_latest(job) + self.assertIsNone(result) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + result = Result.fetch_latest(job) + self.assertEqual(result.return_value, 1) + self.assertEqual(job.latest_result().return_value, 1) + + # Check that ttl is properly set + key = get_key(job.id) + ttl = self.connection.pttl(key) + self.assertTrue(5000 < ttl <= 10000) + + # Check job with None return value + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=None) + result = Result.fetch_latest(job) + self.assertIsNone(result.return_value) + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2) + result = Result.fetch_latest(job) + self.assertEqual(result.return_value, 2) + + def test_create_failure(self): + """Ensure data is saved properly""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + Result.create_failure(job, ttl=10, exc_string='exception') + result = Result.fetch_latest(job) + self.assertEqual(result.exc_string, 'exception') + + # Check that ttl is properly set + key = get_key(job.id) + ttl = self.connection.pttl(key) + self.assertTrue(5000 < ttl <= 10000) + + def test_getting_results(self): + """Check getting all execution results""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + # latest_result() returns None when there's no result + self.assertIsNone(job.latest_result()) + + result_1 = Result.create_failure(job, ttl=10, exc_string='exception') + result_2 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + result_3 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + + # Result.fetch_latest() returns the latest result + result = Result.fetch_latest(job) + self.assertEqual(result, result_3) + self.assertEqual(job.latest_result(), result_3) + + # Result.all() and job.results() returns all results, newest first + results = Result.all(job) + self.assertEqual(results, [result_3, result_2, result_1]) + self.assertEqual(job.results(), [result_3, result_2, result_1]) + + def test_count(self): + """Result.count(job) returns number of results""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + self.assertEqual(Result.count(job), 0) + Result.create_failure(job, ttl=10, exc_string='exception') + self.assertEqual(Result.count(job), 1) + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + self.assertEqual(Result.count(job), 2) + + def test_delete_all(self): + """Result.delete_all(job) deletes all results from Redis""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + Result.create_failure(job, ttl=10, exc_string='exception') + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + Result.delete_all(job) + self.assertEqual(Result.count(job), 0) + + def test_job_successful_result_fallback(self): + """Changes to job.result handling should be backwards compatible.""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.register_birth() + + self.assertEqual(worker.failed_job_count, 0) + self.assertEqual(worker.successful_job_count, 0) + self.assertEqual(worker.total_working_time, 0) + + # These should only run on workers that supports Redis streams + registry = StartedJobRegistry(connection=self.connection) + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + job._result = 'Success' + worker.handle_job_success(job, queue, registry) + + payload = self.connection.hgetall(job.key) + self.assertFalse(b'result' in payload.keys()) + self.assertEqual(job.result, 'Success') + + with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock: + mock.return_value = False + worker = Worker([queue]) + worker.register_birth() + job = queue.enqueue(say_hello) + job._result = 'Success' + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + + # If `save_result_to_job` = True, result will be saved to job + # hash, simulating older versions of RQ + + worker.handle_job_success(job, queue, registry) + payload = self.connection.hgetall(job.key) + self.assertTrue(b'result' in payload.keys()) + # Delete all new result objects so we only have result stored in job hash, + # this should simulate a job that was executed in an earlier RQ version + self.assertEqual(job.result, 'Success') + + def test_job_failed_result_fallback(self): + """Changes to job.result failure handling should be backwards compatible.""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.register_birth() + + self.assertEqual(worker.failed_job_count, 0) + self.assertEqual(worker.successful_job_count, 0) + self.assertEqual(worker.total_working_time, 0) + + registry = StartedJobRegistry(connection=self.connection) + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + worker.handle_job_failure(job, exc_string='Error', queue=queue, + started_job_registry=registry) + + job = Job.fetch(job.id, connection=self.connection) + payload = self.connection.hgetall(job.key) + self.assertFalse(b'exc_info' in payload.keys()) + self.assertEqual(job.exc_info, 'Error') + + with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock: + mock.return_value = False + worker = Worker([queue]) + worker.register_birth() + + job = queue.enqueue(say_hello) + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + + # If `save_result_to_job` = True, result will be saved to job + # hash, simulating older versions of RQ + + worker.handle_job_failure(job, exc_string='Error', queue=queue, + started_job_registry=registry) + payload = self.connection.hgetall(job.key) + self.assertTrue(b'exc_info' in payload.keys()) + # Delete all new result objects so we only have result stored in job hash, + # this should simulate a job that was executed in an earlier RQ version + Result.delete_all(job) + job = Job.fetch(job.id, connection=self.connection) + self.assertEqual(job.exc_info, 'Error') + + def test_job_return_value(self): + """Test job.return_value""" + queue = Queue(connection=self.connection) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNone(job.return_value()) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1) + self.assertEqual(job.return_value(), 1) + + # Returns None if latest result is a failure + Result.create_failure(job, ttl=10, exc_string='exception') + self.assertIsNone(job.return_value(refresh=True)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 5aeffd7..5d4ad34 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -31,6 +31,7 @@ from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.compat import as_text from rq.job import Job, JobStatus, Retry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry +from rq.results import Result from rq.suspension import resume, suspend from rq.utils import utcnow from rq.version import VERSION @@ -178,7 +179,11 @@ class TestWorker(RQTestCase): w.work(burst=True), True, 'Expected at least some work done.' ) - self.assertEqual(job.result, 'Hi there, Frank!') + expected_result = 'Hi there, Frank!' + self.assertEqual(job.result, expected_result) + # Only run if Redis server supports streams + if job.supports_redis_streams: + self.assertEqual(Result.fetch_latest(job).return_value, expected_result) self.assertIsNone(job.worker_name) def test_job_times(self): @@ -380,6 +385,10 @@ class TestWorker(RQTestCase): # to the failed queue self.assertEqual(str(job.enqueued_at), enqueued_at_date) self.assertTrue(job.exc_info) # should contain exc_info + if job.supports_redis_streams: + result = Result.fetch_latest(job) + self.assertEqual(result.exc_string, job.exc_info) + self.assertEqual(result.type, Result.Type.FAILED) def test_horse_fails(self): """Tests that job status is set to FAILED even if horse unexpectedly fails""" @@ -414,7 +423,7 @@ class TestWorker(RQTestCase): def test_statistics(self): """Successful and failed job counts are saved properly""" - queue = Queue() + queue = Queue(connection=self.connection) job = queue.enqueue(div_by_zero) worker = Worker([queue]) worker.register_birth()