From 6bb8b26114e6ab23a310124e49d254218f280850 Mon Sep 17 00:00:00 2001 From: foxx Date: Fri, 5 Sep 2014 11:19:18 +0100 Subject: [PATCH] Allow job ID to be set on enqueue/enqueue_call() - fixes #412 --- rq/job.py | 12 ++++++++++-- rq/queue.py | 10 +++++++--- tests/test_job.py | 9 +++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index ef7a266..292d0ed 100644 --- a/rq/job.py +++ b/rq/job.py @@ -6,6 +6,7 @@ import inspect import warnings from functools import partial from uuid import uuid4 +from uuid import UUID from rq.compat import as_text, decode_redis_hash, string_types, text_type @@ -92,7 +93,8 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None): + result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + job_id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -107,6 +109,8 @@ class Job(object): raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) job = cls(connection=connection) + if job_id is not None: + job.set_id(job_id) # Set the core job tuple properties job._instance = None @@ -325,7 +329,11 @@ class Job(object): def set_id(self, value): """Sets a job ID for the given job.""" - self._id = value + try: + self.key_for(text_type(value)) + except: + raise ValueError("Job ID invalid, failed to encode to string") + self._id = text_type(value) id = property(get_id, set_id) diff --git a/rq/queue.py b/rq/queue.py index 22aa160..608f609 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -163,7 +163,8 @@ class Queue(object): self.connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None): + result_ttl=None, description=None, depends_on=None, + job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -176,7 +177,8 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, - description=description, depends_on=depends_on, timeout=timeout) + description=description, depends_on=depends_on, timeout=timeout, + job_id=job_id) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. @@ -222,6 +224,7 @@ class Queue(object): description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) + job_id = kwargs.pop('job_id', None) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -230,7 +233,8 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, - description=description, depends_on=depends_on) + description=description, depends_on=depends_on, + job_id=job_id) def enqueue_job(self, job, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/tests/test_job.py b/tests/test_job.py index 7e990c0..1184b8d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from datetime import datetime +from uuid import uuid4 from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError @@ -339,3 +340,11 @@ class TestJob(RQTestCase): self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertNotIn(job.id, queue.get_job_ids()) + + def test_create_job_with_id(self): + # try a bunch of different ID types + queue = Queue(connection=self.testconn) + ids = [1234, uuid4(), "somejobid"] + for job_id in ids: + job = queue.enqueue(say_hello, job_id=job_id) + job.perform()