diff --git a/rq/job.py b/rq/job.py index 42653e0..fdc4b57 100644 --- a/rq/job.py +++ b/rq/job.py @@ -68,7 +68,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None): + result_ttl=None, status=None, description=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -88,7 +88,7 @@ class Job(object): job._func_name = func job._args = args job._kwargs = kwargs - job.description = job.get_call_string() + job.description = description or job.get_call_string() job.result_ttl = result_ttl job._status = status return job diff --git a/rq/queue.py b/rq/queue.py index 3f9cd96..2404eea 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -129,7 +129,8 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): # noqa + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, + result_ttl=None, description=None): """Creates a job to represent the delayed function call and enqueues it. @@ -138,7 +139,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection, + job = Job.create(func, args, kwargs, description=description, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED) return self.enqueue_job(job, timeout=timeout) @@ -157,22 +158,23 @@ class Queue(object): meaningful to the import context of the workers) """ if not isinstance(f, string_types) and f.__module__ == '__main__': - raise ValueError( - 'Functions from the __main__ module cannot be processed ' - 'by workers.') + raise ValueError('Functions from the __main__ module cannot be processed ' + 'by workers.') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) timeout = None + description = None result_ttl = None if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) + description = kwargs.pop('description', None) args = kwargs.pop('args', None) result_ttl = kwargs.pop('result_ttl', None) kwargs = kwargs.pop('kwargs', None) - return self.enqueue_call(func=f, args=args, kwargs=kwargs, + return self.enqueue_call(func=f, args=args, kwargs=kwargs, description=description, timeout=timeout, result_ttl=result_ttl) def enqueue_job(self, job, timeout=None, set_meta_data=True): diff --git a/tests/test_job.py b/tests/test_job.py index 0f376ec..d25f60f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -88,9 +88,9 @@ class TestJob(RQTestCase): """Fetching jobs.""" # Prepare test self.testconn.hset('rq:job:some_id', 'data', - "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa + "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") self.testconn.hset('rq:job:some_id', 'created_at', - "2012-02-07 22:13:24+0000") + '2012-02-07 22:13:24+0000') # Fetch returns a job job = Job.fetch('some_id') @@ -110,13 +110,13 @@ class TestJob(RQTestCase): expected_date = strip_milliseconds(job.created_at) stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8') self.assertEquals( - times.to_universal(stored_date), - expected_date) + times.to_universal(stored_date), + expected_date) # ... and no other keys are stored self.assertEqual( - self.testconn.hkeys(job.key), - [b'created_at']) + self.testconn.hkeys(job.key), + [b'created_at']) def test_persistence_of_typical_jobs(self): """Storing typical jobs.""" @@ -126,13 +126,13 @@ class TestJob(RQTestCase): expected_date = strip_milliseconds(job.created_at) stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8') self.assertEquals( - times.to_universal(stored_date), - expected_date) + times.to_universal(stored_date), + expected_date) # ... and no other keys are stored self.assertEqual( - sorted(self.testconn.hkeys(job.key)), - [b'created_at', b'data', b'description']) + sorted(self.testconn.hkeys(job.key)), + [b'created_at', b'data', b'description']) def test_store_then_fetch(self): """Store, then fetch.""" @@ -195,14 +195,27 @@ class TestJob(RQTestCase): """Ensure that job's result_ttl is set properly""" job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) job.save() - job_from_queue = Job.fetch(job.id, connection=self.testconn) + Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, 10) job = Job.create(func=say_hello, args=('Lionel',)) job.save() - job_from_queue = Job.fetch(job.id, connection=self.testconn) + Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, None) + def test_description_is_persisted(self): + """Ensure that job's custom description is set properly""" + job = Job.create(func=say_hello, args=('Lionel',), description=u'Say hello!') + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.description, u'Say hello!') + + # Ensure job description is constructed from function call string + job = Job.create(func=say_hello, args=('Lionel',)) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + def test_job_access_within_job_function(self): """The current job is accessible within the job function.""" # Executing the job function from outside of RQ throws an exception @@ -240,12 +253,12 @@ class TestJob(RQTestCase): """Test that jobs and results are expired properly.""" job = Job.create(func=say_hello) job.save() - + # Jobs with negative TTLs don't expire job.cleanup(ttl=-1) self.assertEqual(self.testconn.ttl(job.key), -1) - # Jobs with positive TTLs are eventually deleted + # Jobs with positive TTLs are eventually deleted job.cleanup(ttl=100) self.assertEqual(self.testconn.ttl(job.key), 100)