diff --git a/rq/exceptions.py b/rq/exceptions.py index 0b885f7..813bcde 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,3 +1,6 @@ +class NoSuchJobError(Exception): + pass + class NoQueueError(Exception): pass diff --git a/rq/job.py b/rq/job.py index 0ae5bb1..a17e490 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,51 +1,131 @@ -from datetime import datetime +import times from uuid import uuid4 from pickle import loads, dumps -from .exceptions import UnpickleError +from .proxy import conn +from .exceptions import UnpickleError, NoSuchJobError class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + + # Job construction @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - raise UnpickleError('Could not unpickle Job.', pickle_data) + def for_call(cls, func, *args, **kwargs): + """Creates a new Job instance for the given function, arguments, and + keyword arguments. + """ + job = Job() + job.func = func + job.args = args + job.kwargs = kwargs + return job - def __init__(self, func, *args, **kwargs): - self._id = unicode(uuid4()) - self.func = func - self.args = args - self.kwargs = kwargs + @classmethod + def fetch(cls, id): + """Fetches a persisted job from its corresponding Redis key and + instantiates it. + """ + job = Job(id) + job.refresh() + return job + + def __init__(self, id=None): + self._id = id + self.func = None + self.args = None + self.kwargs = None self.origin = None - self.created_at = datetime.utcnow() + self.created_at = times.now() self.enqueued_at = None + self.result = None self.exc_info = None - def pickle(self): - """Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" - return dumps(self) + + # Data access + def get_id(self): + """The job ID for this job instance. Generates an ID lazily the + first time the ID is requested. + """ + if self._id is None: + self._id = unicode(uuid4()) + return self._id + + def set_id(self, value): + """Sets a job ID for the given job.""" + self._id = value + + id = property(get_id, set_id) @property - def rv_key(self): - """Returns the Redis key under which the Job's result will be stored, if applicable.""" - return 'rq:result:%s' % (self._id,) + def key(self): + """The Redis key that is used to store job data under.""" + return 'rq:job:%s' % (self.id,) + @property - def id(self): - """Returns the Job's internal ID.""" - return self._id + def job_tuple(self): + """Returns the job tuple that encodes the actual function call that this job represents.""" + return (self.func, self.args, self.kwargs) + + @property + def return_value(self): + """Returns the return value of the job. + + Initially, right after enqueueing a job, the return value will be None. + But when the job has been executed, and had a return value or exception, + this will return that value or exception. + + Note that, when the job has no return value (i.e. returns None), the + ReadOnlyJob object is useless, as the result won't be written back to + Redis. + + Also note that you cannot draw the conclusion that a job has _not_ been + executed when its return value is None, since return values written back + to Redis will expire after a given amount of time (500 seconds by + default). + """ + if self._cached_result is None: + rv = conn.hget(self.key, 'result') + if rv is not None: + # cache the result + self._cached_result = loads(rv) + return self._cached_result + + # Persistence + def refresh(self): + """Overwrite the current instance's properties with the values in the + corresponding Redis key. + + Will raise a NoSuchJobError if no corresponding Redis key exists. + """ + key = self.key + pickled_data = conn.hget(key, 'data') + if pickled_data is None: + raise NoSuchJobError('No such job: %s' % (key,)) + + self.func, self.args, self.kwargs = loads(pickled_data) + + self.created_at = times.to_universal(conn.hget(key, 'created_at')) + + def save(self): + """Persists the current job instance to its corresponding Redis key.""" + pickled_data = dumps(self.job_tuple) + + key = self.key + conn.hset(key, 'data', pickled_data) + conn.hset(key, 'created_at', times.format(self.created_at, 'UTC')) + + + # Job execution def perform(self): """Invokes the job function with the job arguments. """ return self.func(*self.args, **self.kwargs) + + # Representation @property def call_string(self): """Returns a string representation of the call, formatted as a regular @@ -59,5 +139,31 @@ class Job(object): def __str__(self): return '' % (self.id, self.call_string) + + # Job equality def __eq__(self, other): - return cmp(self.id, other.id) + return self.id == other.id + + def __hash__(self): + return hash(self.id) + + + # TODO: TO REFACTOR / REMOVE + def pickle(self): + """Returns the pickle'd string represenation of a Job. Suitable for + writing to Redis. + """ + return dumps(self) + + @classmethod + def unpickle(cls, pickle_data): + """Constructs a Job instance form the given pickle'd job tuple data.""" + try: + unpickled_obj = loads(pickle_data) + assert isinstance(unpickled_obj, Job) + return unpickled_obj + except (AssertionError, AttributeError, IndexError, TypeError, KeyError): + + raise UnpickleError('Could not unpickle Job.', pickle_data) + + diff --git a/rq/queue.py b/rq/queue.py index e7f86bd..43d0db0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,44 +1,10 @@ from datetime import datetime from functools import total_ordering -from pickle import loads from .proxy import conn from .job import Job from .exceptions import UnpickleError -class DelayedResult(object): - """Proxy object that is returned as a result of `Queue.enqueue()` calls. - Instances of DelayedResult can be polled for their return values. - """ - def __init__(self, key): - self.key = key - self._rv = None - - @property - def return_value(self): - """Returns the return value of the job. - - Initially, right after enqueueing a job, the return value will be None. - But when the job has been executed, and had a return value or exception, - this will return that value or exception. - - Note that, when the job has no return value (i.e. returns None), the - DelayedResult object is useless, as the result won't be written back to - Redis. - - Also note that you cannot draw the conclusion that a job has _not_ been - executed when its return value is None, since return values written back - to Redis will expire after a given amount of time (500 seconds by - default). - """ - if self._rv is None: - rv = conn.get(self.key) - if rv is not None: - # cache the result - self._rv = loads(rv) - return self._rv - - @total_ordering class Queue(object): redis_queue_namespace_prefix = 'rq:queue:' @@ -99,13 +65,14 @@ class Queue(object): if f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed by workers.') - job = Job(f, *args, **kwargs) + job = Job.for_call(f, *args, **kwargs) job.origin = self.name return job - def _push(self, pickled_job): + def enqueue_job(self, job): """Enqueues a pickled_job on the corresponding Redis queue.""" - conn.rpush(self.key, pickled_job) + job.save() + conn.rpush(self.key, job.id) def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -115,8 +82,8 @@ class Queue(object): """ job = self._create_job(f, *args, **kwargs) job.enqueued_at = datetime.utcnow() - self._push(job.pickle()) - return DelayedResult(job.rv_key) + self.enqueue_job(job) + return Job(job.id) def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" diff --git a/setup.py b/setup.py index 8dcee49..5263a04 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(): raise RuntimeError('No version info found.') def get_dependencies(): - deps = ['redis', 'procname'] + deps = ['redis', 'procname', 'times'] deps += ['logbook'] # should be soft dependency? if sys.version_info < (2, 7) or \ sys.version_info >= (3, 0) and sys.version_info < (3, 2): diff --git a/tests/test_job.py b/tests/test_job.py index de6c78e..45bdf97 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,8 +1,8 @@ +from datetime import datetime from tests import RQTestCase from pickle import dumps, loads from rq.job import Job -#from rq import Queue, Worker -from rq.exceptions import UnpickleError +from rq.exceptions import NoSuchJobError, UnpickleError def arbitrary_function(x, y, z=1): @@ -10,25 +10,91 @@ def arbitrary_function(x, y, z=1): class TestJob(RQTestCase): - def test_create_job(self): - """Creation of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) + def test_create_empty_job(self): + """Creation of new empty jobs.""" + job = Job() + + # Jobs have a random UUID + self.assertIsNotNone(job.id) + + # Jobs have no data yet... + self.assertEquals(job.func, None) + self.assertEquals(job.args, None) + self.assertEquals(job.kwargs, None) + self.assertEquals(job.origin, None) + self.assertEquals(job.enqueued_at, None) + self.assertEquals(job.result, None) + self.assertEquals(job.exc_info, None) + + # ...except for a created_at property + self.assertIsNotNone(job.created_at) + + def test_create_normal_job(self): + """Creation of jobs for function calls.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + + # Jobs have a random UUID + self.assertIsNotNone(job.id) + self.assertIsNotNone(job.created_at) + + # Job data is set... self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, {'z': 2}) + + # ...but metadata is not self.assertIsNone(job.origin) - self.assertIsNotNone(job.created_at) self.assertIsNone(job.enqueued_at) - self.assertIsNotNone(job.rv_key) + self.assertIsNone(job.result) + + + def test_save(self): + """Storing jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) - def test_pickle_job(self): - """Pickling of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) - job2 = loads(dumps(job)) + # Saving creates a Redis hash + self.assertEquals(self.testconn.exists(job.key), False) + job.save() + self.assertEquals(self.testconn.type(job.key), 'hash') + + # Saving writes pickled job data + unpickled_data = loads(self.testconn.hget(job.key, 'data')) + self.assertEquals(unpickled_data[0], arbitrary_function) + + def test_fetch(self): + """Fetching jobs.""" + # Prepare test + self.testconn.hset('rq:job:some_id', 'data', "(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") + self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") + + # Fetch returns a job + job = Job.fetch('some_id') + self.assertEquals(job.id, 'some_id') + self.assertEquals(job.func, arbitrary_function) + self.assertEquals(job.args, (3, 4)) + self.assertEquals(job.kwargs, dict(z=2)) + self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) + + + def test_persistence_of_jobs(self): + """Storing and fetching of jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + job2 = Job.fetch(job.id) self.assertEquals(job.func, job2.func) self.assertEquals(job.args, job2.args) self.assertEquals(job.kwargs, job2.kwargs) + # Mathematical equation + self.assertEquals(job, job2) + + def test_fetching_can_fail(self): + """Fetching fails for non-existing jobs.""" + with self.assertRaises(NoSuchJobError): + Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') + + def test_unpickle_errors(self): """Handling of unpickl'ing errors.""" with self.assertRaises(UnpickleError): @@ -37,7 +103,7 @@ class TestJob(RQTestCase): with self.assertRaises(UnpickleError): Job.unpickle(13) - pickle_data = dumps(Job(arbitrary_function, 2, 3)) + pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) corrupt_data = pickle_data.replace('arbitrary', 'b0rken') with self.assertRaises(UnpickleError): Job.unpickle(corrupt_data) diff --git a/tests/test_queue.py b/tests/test_queue.py index 071891f..80e1acc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -31,26 +31,26 @@ class TestQueue(RQTestCase): def test_queue_empty(self): """Detecting empty queues.""" - q = Queue('my-queue') + q = Queue('example') self.assertEquals(q.is_empty(), True) - self.testconn.rpush('rq:queue:my-queue', 'some val') + self.testconn.rpush('rq:queue:example', 'sentinel message') self.assertEquals(q.is_empty(), False) def test_enqueue(self): - """Putting work on queues.""" - q = Queue('my-queue') + """Enqueueing writes job IDs to queues.""" + q = Queue() self.assertEquals(q.is_empty(), True) # testjob spec holds which queue this is sent to - q.enqueue(testjob, 'Nick', foo='bar') - self.assertEquals(q.is_empty(), False) - for job in q.jobs: - if job.func == testjob: - break - else: - self.fail('Job not found on queue.') + job = q.enqueue(testjob, 'Nick', foo='bar') + job_id = job.id + + # Inspect data inside Redis + q_key = 'rq:queue:default' + self.assertEquals(self.testconn.llen(q_key), 1) + self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) def test_dequeue(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index b4cd0aa..06242f6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -31,7 +31,7 @@ class TestWorker(RQTestCase): # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. - job = Job(failing_job, 3) + job = Job.for_call(failing_job, 3) pickled_job = job.pickle() invalid_data = pickled_job.replace( 'failing_job', 'nonexisting_job')