From ce3e501a6292811d36edc7ebb33898210c4c19f4 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 2 Sep 2012 11:32:06 +0200 Subject: [PATCH 1/6] Add change to changelog. --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 461ccaa..ab70df7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ ### 0.3.2 (not released yet) +- Access the current job from within the job function (`rq.current_job()`). ### 0.3.1 From 372de4b45a080f9e4a58dfb6d1f0512d92d736b6 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 2 Sep 2012 22:34:11 +0200 Subject: [PATCH 2/6] Implement the get_current_job() function. This fixes #125. --- rq/__init__.py | 3 ++- rq/job.py | 20 +++++++++++++++++--- tests/fixtures.py | 6 ++++++ tests/test_job.py | 13 +++++++++++-- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/rq/__init__.py b/rq/__init__.py index 40ea226..94e1dd1 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -3,6 +3,7 @@ from .connections import use_connection, push_connection, pop_connection from .connections import Connection from .queue import Queue, get_failed_queue from .job import cancel_job, requeue_job +from .job import get_current_job from .worker import Worker from .version import VERSION @@ -11,5 +12,5 @@ __all__ = [ 'use_connection', 'get_current_connection', 'push_connection', 'pop_connection', 'Connection', 'Queue', 'get_failed_queue', 'Worker', - 'cancel_job', 'requeue_job'] + 'cancel_job', 'requeue_job', 'get_current_job'] __version__ = VERSION diff --git a/rq/job.py b/rq/job.py index a8c7937..380783f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,6 +4,7 @@ import times from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError +from .local import LocalStack from .connections import get_current_connection from .exceptions import UnpickleError, NoSuchJobError @@ -52,6 +53,13 @@ def requeue_job(job_id, connection=None): fq.requeue(job_id) +def get_current_job(): + """Returns the Job instance that is currently being executed. If this + function is invoked from outside a job context, None is returned. + """ + return _job_stack.top + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ @@ -323,9 +331,12 @@ class Job(object): # Job execution def perform(self): # noqa - """Invokes the job function with the job arguments. - """ - self._result = self.func(*self.args, **self.kwargs) + """Invokes the job function with the job arguments.""" + _job_stack.push(self) + try: + self._result = self.func(*self.args, **self.kwargs) + finally: + assert self == _job_stack.pop() return self._result @@ -352,3 +363,6 @@ class Job(object): def __hash__(self): return hash(self.id) + + +_job_stack = LocalStack() diff --git a/tests/fixtures.py b/tests/fixtures.py index da3c13e..77774d9 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -5,6 +5,7 @@ fixtures has a slighty different characteristics. import time from rq import Connection from rq.decorators import job +from rq import get_current_job def say_hello(name=None): @@ -43,6 +44,11 @@ def create_file_after_timeout(path, timeout): create_file(path) +def access_self(): + job = get_current_job() + return job.id + + class Calculator(object): """Test instance methods.""" def __init__(self, denominator): diff --git a/tests/test_job.py b/tests/test_job.py index 5c914b9..0d67a17 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,10 +1,10 @@ import times from datetime import datetime from tests import RQTestCase -from tests.fixtures import Calculator, some_calculation, say_hello +from tests.fixtures import Calculator, some_calculation, say_hello, access_self from tests.helpers import strip_milliseconds from cPickle import loads -from rq.job import Job +from rq.job import Job, get_current_job from rq.exceptions import NoSuchJobError, UnpickleError @@ -201,3 +201,12 @@ class TestJob(RQTestCase): job.save() job_from_queue = Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, None) + + def test_job_access_within_job_function(self): + """The current job is accessible within the job function.""" + # Executing the job function from outside of RQ throws an exception + self.assertIsNone(get_current_job()) + + # Executing the job function from outside of RQ throws an exception + job = Job.create(func=access_self) + self.assertEqual(job.perform(), job.id) From 95d3aed98ee2b5cba6a0e92550c29efcbf245788 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 2 Sep 2012 23:00:12 +0200 Subject: [PATCH 3/6] Store the job ID on the internal stack. It does so instead of the instance itself. Still returns the job---the interface hasn't changed. --- rq/job.py | 9 ++++++--- tests/test_job.py | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/rq/job.py b/rq/job.py index 380783f..6faefa9 100644 --- a/rq/job.py +++ b/rq/job.py @@ -57,7 +57,10 @@ def get_current_job(): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. """ - return _job_stack.top + job_id = _job_stack.top + if job_id is None: + return None + return Job.fetch(job_id) class Job(object): @@ -332,11 +335,11 @@ class Job(object): # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" - _job_stack.push(self) + _job_stack.push(self.id) try: self._result = self.func(*self.args, **self.kwargs) finally: - assert self == _job_stack.pop() + assert self.id == _job_stack.pop() return self._result diff --git a/tests/test_job.py b/tests/test_job.py index 0d67a17..f75d878 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -209,4 +209,7 @@ class TestJob(RQTestCase): # Executing the job function from outside of RQ throws an exception job = Job.create(func=access_self) - self.assertEqual(job.perform(), job.id) + job.save() + id = job.perform() + self.assertEqual(job.id, id) + self.assertEqual(job.func, access_self) From 5e80aa27ebb3ad5770266209cfe115294db137f7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 2 Sep 2012 23:10:35 +0200 Subject: [PATCH 4/6] Fix comment. --- tests/test_job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_job.py b/tests/test_job.py index f75d878..7216f22 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -207,7 +207,8 @@ class TestJob(RQTestCase): # Executing the job function from outside of RQ throws an exception self.assertIsNone(get_current_job()) - # Executing the job function from outside of RQ throws an exception + # Executing the job function from within the job works (and in + # this case leads to the job ID being returned) job = Job.create(func=access_self) job.save() id = job.perform() From fcb5453fb9797137f43d9dccffe3b5c3d0fe0bdd Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 3 Sep 2012 08:16:48 +0200 Subject: [PATCH 5/6] Fix changelog text. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index ab70df7..e56c71f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,7 @@ ### 0.3.2 (not released yet) -- Access the current job from within the job function (`rq.current_job()`). +- Access the current job from within the job function (`rq.get_current_job()`). ### 0.3.1 From 67ca942cb5f475b41acb47d45efcbd0b172ea066 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 3 Sep 2012 13:54:39 +0200 Subject: [PATCH 6/6] Update changelog. --- CHANGES.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index d8b6da3..cb83f75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,11 @@ ### 0.3.3 (not released yet) -- Access the current job from within the job function (`rq.get_current_job()`). +- Jobs can now access the current `Job` instance from within. Relevant + documentation [here](http://python-rq.org/docs/jobs/). + +- Custom properties can be set by modifying the `job.meta` dict. Relevant + documentation [here](http://python-rq.org/docs/jobs/). ### 0.3.2