Allow job ID to be set on enqueue/enqueue_call() - fixes #412

main
foxx 10 years ago
parent 21620e98ca
commit 6bb8b26114

@ -6,6 +6,7 @@ import inspect
import warnings import warnings
from functools import partial from functools import partial
from uuid import uuid4 from uuid import uuid4
from uuid import UUID
from rq.compat import as_text, decode_redis_hash, string_types, text_type from rq.compat import as_text, decode_redis_hash, string_types, text_type
@ -92,7 +93,8 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None): result_ttl=None, status=None, description=None, depends_on=None, timeout=None,
job_id=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -107,6 +109,8 @@ class Job(object):
raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs))
job = cls(connection=connection) job = cls(connection=connection)
if job_id is not None:
job.set_id(job_id)
# Set the core job tuple properties # Set the core job tuple properties
job._instance = None job._instance = None
@ -325,7 +329,11 @@ class Job(object):
def set_id(self, value): def set_id(self, value):
"""Sets a job ID for the given job.""" """Sets a job ID for the given job."""
self._id = value try:
self.key_for(text_type(value))
except:
raise ValueError("Job ID invalid, failed to encode to string")
self._id = text_type(value)
id = property(get_id, set_id) id = property(get_id, set_id)

@ -163,7 +163,8 @@ class Queue(object):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None): result_ttl=None, description=None, depends_on=None,
job_id=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -176,7 +177,8 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout) description=description, depends_on=depends_on, timeout=timeout,
job_id=job_id)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it. # parent's dependents instead of enqueueing it.
@ -222,6 +224,7 @@ class Queue(object):
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
@ -230,7 +233,8 @@ class Queue(object):
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl,
description=description, depends_on=depends_on) description=description, depends_on=depends_on,
job_id=job_id)
def enqueue_job(self, job, set_meta_data=True): def enqueue_job(self, job, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import datetime from datetime import datetime
from uuid import uuid4
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
@ -339,3 +340,11 @@ class TestJob(RQTestCase):
self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertFalse(self.testconn.exists(job.dependents_key))
self.assertNotIn(job.id, queue.get_job_ids()) self.assertNotIn(job.id, queue.get_job_ids())
def test_create_job_with_id(self):
# try a bunch of different ID types
queue = Queue(connection=self.testconn)
ids = [1234, uuid4(), "somejobid"]
for job_id in ids:
job = queue.enqueue(say_hello, job_id=job_id)
job.perform()

Loading…
Cancel
Save