Implemented Job.fetch_many (#1072)

main
Selwin Ong 6 years ago committed by GitHub
parent 07317b62f3
commit 7021cedaf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -33,7 +33,15 @@ Some interesting job attributes include:
* `job.ended_at`
* `job.exc_info`
## Accessing the "current" job
If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.
```python
jobs = Job.fetch_many(['foo_id', 'bar_id'], connection=redis)
for job in jobs:
print('Job %s: %s' % (job.id, job.func_name))
```
## Accessing The "current" Job
Since job functions are regular Python functions, you have to ask RQ for the
current job ID, if any. To do this, you can use:

@ -60,6 +60,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--url` or `-u`: URL describing Redis connection details (e.g `rq worker --url redis://:secrets@example.com:1234/9`)
* `--path` or `-P`: multiple import paths are supported (e.g `rq worker --path foo --path bar`)
* `--config` or `-c`: path to module containing RQ settings.
* `--results-ttl`: job results will be kept for this number of seconds (defaults to 500).
* `--worker-class` or `-w`: RQ Worker class to use (e.g `rq worker --worker-class 'foo.bar.MyWorker'`)
* `--job-class` or `-j`: RQ Job class to use.
* `--queue-class`: RQ Queue class to use.

@ -13,7 +13,8 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import enum, import_attribute, utcformat, utcnow, utcparse, parse_timeout
from .utils import (enum, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow)
try:
import cPickle as pickle
@ -287,6 +288,25 @@ class Job(object):
job.refresh()
return job
@classmethod
def fetch_many(cls, job_ids, connection=None):
"""Bulk version of Job.fetch"""
with connection.pipeline() as pipeline:
for job_id in job_ids:
pipeline.hgetall(cls.key_for(job_id))
results = pipeline.execute()
jobs = []
for i, job_id in enumerate(job_ids):
if results[i]:
job = cls(job_id, connection=connection)
job.restore(results[i])
jobs.append(job)
else:
jobs.append(None)
return jobs
def __init__(self, id=None, connection=None):
self.connection = resolve_connection(connection)
self._id = id
@ -392,24 +412,9 @@ class Job(object):
"""Backwards-compatibility accessor property `return_value`."""
return_value = result
# Persistence
def refresh(self): # noqa
"""Overwrite the current instance's properties with the values in the
corresponding Redis key.
Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
key = self.key
obj = decode_redis_hash(self.connection.hgetall(key))
if len(obj) == 0:
raise NoSuchJobError('No such job: {0}'.format(key))
def to_date(date_str):
if date_str is None:
return
else:
return utcparse(as_text(date_str))
def restore(self, raw_data):
"""Overwrite properties with the provided values stored in Redis"""
obj = decode_redis_hash(raw_data)
try:
raw_data = obj['data']
except KeyError:
@ -421,17 +426,17 @@ class Job(object):
# Fallback to uncompressed string
self.data = raw_data
self.created_at = to_date(as_text(obj.get('created_at')))
self.created_at = str_to_date(obj.get('created_at'))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
self.ended_at = str_to_date(obj.get('ended_at'))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._status = obj.get('status') if obj.get('status') else None
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
@ -444,6 +449,18 @@ class Job(object):
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)
# Persistence
def refresh(self): # noqa
"""Overwrite the current instance's properties with the values in the
corresponding Redis key.
Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
data = self.connection.hgetall(self.key)
if not data:
raise NoSuchJobError('No such job: {0}'.format(self.key))
self.restore(data)
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance

@ -252,6 +252,13 @@ def backend_class(holder, default_name, override=None):
return override
def str_to_date(date_str):
if date_str is None:
return
else:
return utcparse(as_text(date_str))
def parse_timeout(timeout):
"""Transfer all kinds of timeout format to an integer representing seconds"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:

@ -183,6 +183,23 @@ class TestJob(RQTestCase):
self.assertEqual(job.kwargs, dict(z=2))
self.assertEqual(job.created_at, datetime(2012, 2, 7, 22, 13, 24, 123456))
def test_fetch_many(self):
"""Fetching many jobs at once."""
data = {
'func': fixtures.some_calculation,
'args': (3, 4),
'kwargs': dict(z=2),
'connection': self.testconn,
}
job = Job.create(**data)
job.save()
job2 = Job.create(**data)
job2.save()
jobs = Job.fetch_many([job.id, job2.id, 'invalid_id'], self.testconn)
self.assertEqual(jobs, [job, job2, None])
def test_persistence_of_empty_jobs(self): # noqa
"""Storing empty jobs."""
job = Job()

Loading…
Cancel
Save