Merge pull request #415 from foxx/feature/custom-job-ids

Allow job ID to be set on enqueue/enqueue_call() - fixes #412
main
Selwin Ong 10 years ago
commit 202be75b21

3
.gitignore vendored

@ -7,3 +7,6 @@
/.tox /.tox
/dist /dist
/build /build
.tox
.vagrant
Vagrantfile

@ -2,6 +2,7 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import types
import inspect import inspect
import warnings import warnings
from functools import partial from functools import partial
@ -92,7 +93,8 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None): result_ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -107,6 +109,8 @@ class Job(object):
raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs))
job = cls(connection=connection) job = cls(connection=connection)
if id is not None:
job.set_id(id)
# Set the core job tuple properties # Set the core job tuple properties
job._instance = None job._instance = None
@ -326,6 +330,8 @@ class Job(object):
def set_id(self, value): def set_id(self, value):
"""Sets a job ID for the given job.""" """Sets a job ID for the given job."""
if not isinstance(value, string_types):
raise TypeError('id must be a string, not {0}.'.format(type(value)))
self._id = value self._id = value
id = property(get_id, set_id) id = property(get_id, set_id)

@ -164,7 +164,8 @@ class Queue(object):
connection.rpush(self.key, job_id) connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None): result_ttl=None, description=None, depends_on=None,
job_id=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -177,7 +178,8 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout) description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it. # parent's dependents instead of enqueueing it.
@ -223,6 +225,7 @@ class Queue(object):
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
@ -231,7 +234,8 @@ class Queue(object):
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl,
description=description, depends_on=depends_on) description=description, depends_on=depends_on,
job_id=job_id)
def enqueue_job(self, job, set_meta_data=True): def enqueue_job(self, job, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.

@ -339,3 +339,12 @@ class TestJob(RQTestCase):
self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertFalse(self.testconn.exists(job.dependents_key))
self.assertNotIn(job.id, queue.get_job_ids()) self.assertNotIn(job.id, queue.get_job_ids())
def test_create_job_with_id(self):
"""test creating jobs with a custom ID"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello, job_id="1234")
self.assertEqual(job.id, "1234")
job.perform()
self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234)

Loading…
Cancel
Save