diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index bbea32a..f15689d 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -33,7 +33,15 @@ Some interesting job attributes include: * `job.ended_at` * `job.exc_info` -## Accessing the "current" job +If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`. + +```python +jobs = Job.fetch_many(['foo_id', 'bar_id'], connection=redis) +for job in jobs: + print('Job %s: %s' % (job.id, job.func_name)) +``` + +## Accessing The "current" Job Since job functions are regular Python functions, you have to ask RQ for the current job ID, if any. To do this, you can use: diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 4e3b3e7..739ba6b 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -60,6 +60,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: * `--url` or `-u`: URL describing Redis connection details (e.g `rq worker --url redis://:secrets@example.com:1234/9`) * `--path` or `-P`: multiple import paths are supported (e.g `rq worker --path foo --path bar`) * `--config` or `-c`: path to module containing RQ settings. +* `--results-ttl`: job results will be kept for this number of seconds (defaults to 500). * `--worker-class` or `-w`: RQ Worker class to use (e.g `rq worker --worker-class 'foo.bar.MyWorker'`) * `--job-class` or `-j`: RQ Job class to use. * `--queue-class`: RQ Queue class to use. diff --git a/rq/job.py b/rq/job.py index c411fb2..7c97e20 100644 --- a/rq/job.py +++ b/rq/job.py @@ -13,7 +13,8 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError, UnpickleError from .local import LocalStack -from .utils import enum, import_attribute, utcformat, utcnow, utcparse, parse_timeout +from .utils import (enum, import_attribute, parse_timeout, str_to_date, + utcformat, utcnow) try: import cPickle as pickle @@ -287,6 +288,25 @@ class Job(object): job.refresh() return job + @classmethod + def fetch_many(cls, job_ids, connection=None): + """Bulk version of Job.fetch""" + with connection.pipeline() as pipeline: + for job_id in job_ids: + pipeline.hgetall(cls.key_for(job_id)) + results = pipeline.execute() + + jobs = [] + for i, job_id in enumerate(job_ids): + if results[i]: + job = cls(job_id, connection=connection) + job.restore(results[i]) + jobs.append(job) + else: + jobs.append(None) + + return jobs + def __init__(self, id=None, connection=None): self.connection = resolve_connection(connection) self._id = id @@ -392,24 +412,9 @@ class Job(object): """Backwards-compatibility accessor property `return_value`.""" return_value = result - # Persistence - def refresh(self): # noqa - """Overwrite the current instance's properties with the values in the - corresponding Redis key. - - Will raise a NoSuchJobError if no corresponding Redis key exists. - """ - key = self.key - obj = decode_redis_hash(self.connection.hgetall(key)) - if len(obj) == 0: - raise NoSuchJobError('No such job: {0}'.format(key)) - - def to_date(date_str): - if date_str is None: - return - else: - return utcparse(as_text(date_str)) - + def restore(self, raw_data): + """Overwrite properties with the provided values stored in Redis""" + obj = decode_redis_hash(raw_data) try: raw_data = obj['data'] except KeyError: @@ -421,17 +426,17 @@ class Job(object): # Fallback to uncompressed string self.data = raw_data - self.created_at = to_date(as_text(obj.get('created_at'))) + self.created_at = str_to_date(obj.get('created_at')) self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) - self.enqueued_at = to_date(as_text(obj.get('enqueued_at'))) - self.started_at = to_date(as_text(obj.get('started_at'))) - self.ended_at = to_date(as_text(obj.get('ended_at'))) + self.enqueued_at = str_to_date(obj.get('enqueued_at')) + self.started_at = str_to_date(obj.get('started_at')) + self.ended_at = str_to_date(obj.get('ended_at')) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa - self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None + self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa - self._status = as_text(obj.get('status') if obj.get('status') else None) + self._status = obj.get('status') if obj.get('status') else None self._dependency_id = as_text(obj.get('dependency_id', None)) self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} @@ -444,6 +449,18 @@ class Job(object): # Fallback to uncompressed string self.exc_info = as_text(raw_exc_info) + # Persistence + def refresh(self): # noqa + """Overwrite the current instance's properties with the values in the + corresponding Redis key. + + Will raise a NoSuchJobError if no corresponding Redis key exists. + """ + data = self.connection.hgetall(self.key) + if not data: + raise NoSuchJobError('No such job: {0}'.format(self.key)) + self.restore(data) + def to_dict(self, include_meta=True): """ Returns a serialization of the current job instance diff --git a/rq/utils.py b/rq/utils.py index fb3faf6..b479ec7 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -252,6 +252,13 @@ def backend_class(holder, default_name, override=None): return override +def str_to_date(date_str): + if date_str is None: + return + else: + return utcparse(as_text(date_str)) + + def parse_timeout(timeout): """Transfer all kinds of timeout format to an integer representing seconds""" if not isinstance(timeout, numbers.Integral) and timeout is not None: diff --git a/tests/test_job.py b/tests/test_job.py index 4e34574..a990dd7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -183,6 +183,23 @@ class TestJob(RQTestCase): self.assertEqual(job.kwargs, dict(z=2)) self.assertEqual(job.created_at, datetime(2012, 2, 7, 22, 13, 24, 123456)) + def test_fetch_many(self): + """Fetching many jobs at once.""" + data = { + 'func': fixtures.some_calculation, + 'args': (3, 4), + 'kwargs': dict(z=2), + 'connection': self.testconn, + } + job = Job.create(**data) + job.save() + + job2 = Job.create(**data) + job2.save() + + jobs = Job.fetch_many([job.id, job2.id, 'invalid_id'], self.testconn) + self.assertEqual(jobs, [job, job2, None]) + def test_persistence_of_empty_jobs(self): # noqa """Storing empty jobs.""" job = Job()