diff --git a/.gitignore b/.gitignore index ea8eaca..ff9c616 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ /.tox /dist /build +.tox +.vagrant +Vagrantfile \ No newline at end of file diff --git a/rq/job.py b/rq/job.py index bd91f13..6979180 100644 --- a/rq/job.py +++ b/rq/job.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import types import inspect import warnings from functools import partial @@ -92,7 +93,8 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None): + result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -107,6 +109,8 @@ class Job(object): raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) job = cls(connection=connection) + if id is not None: + job.set_id(id) # Set the core job tuple properties job._instance = None @@ -326,6 +330,8 @@ class Job(object): def set_id(self, value): """Sets a job ID for the given job.""" + if not isinstance(value, string_types): + raise TypeError('id must be a string, not {0}.'.format(type(value))) self._id = value id = property(get_id, set_id) diff --git a/rq/queue.py b/rq/queue.py index 621942a..a87f709 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -164,7 +164,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None): + result_ttl=None, description=None, depends_on=None, + job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -177,7 +178,8 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, - description=description, depends_on=depends_on, timeout=timeout) + description=description, depends_on=depends_on, timeout=timeout, + id=job_id) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. @@ -223,6 +225,7 @@ class Queue(object): description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) + job_id = kwargs.pop('job_id', None) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -231,7 +234,8 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, - description=description, depends_on=depends_on) + description=description, depends_on=depends_on, + job_id=job_id) def enqueue_job(self, job, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/tests/test_job.py b/tests/test_job.py index 7e990c0..78e41e2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -339,3 +339,12 @@ class TestJob(RQTestCase): self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertNotIn(job.id, queue.get_job_ids()) + + def test_create_job_with_id(self): + """test creating jobs with a custom ID""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello, job_id="1234") + self.assertEqual(job.id, "1234") + job.perform() + + self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234)