CHECKPOINT: Second part of the big refactoring.

Jobs are now stored in separate keys, and only job IDs are put on Redis
queues.  Much of the code has been hit by this change, but it is for the
good.

No really.
main
Vincent Driessen 13 years ago
parent 65105b44c3
commit b1650cb9b9

@ -5,6 +5,21 @@ from .proxy import conn
from .exceptions import UnpickleError, NoSuchJobError from .exceptions import UnpickleError, NoSuchJobError
def unpickle(pickled_string):
"""Unpickles a string, but raises a unified UnpickleError in case anything
fails.
This is a helper method to not have to deal with the fact that `loads()`
potentially raises many types of exceptions (e.g. AttributeError,
IndexError, TypeError, KeyError, etc.)
"""
try:
obj = loads(pickled_string)
except StandardError:
raise UnpickleError('Could not unpickle.', pickled_string)
return obj
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
@ -105,7 +120,7 @@ class Job(object):
if pickled_data is None: if pickled_data is None:
raise NoSuchJobError('No such job: %s' % (key,)) raise NoSuchJobError('No such job: %s' % (key,))
self.func, self.args, self.kwargs = loads(pickled_data) self.func, self.args, self.kwargs = unpickle(pickled_data)
self.created_at = times.to_universal(conn.hget(key, 'created_at')) self.created_at = times.to_universal(conn.hget(key, 'created_at'))
@ -155,15 +170,3 @@ class Job(object):
""" """
return dumps(self) return dumps(self)
@classmethod
def unpickle(cls, pickle_data):
"""Constructs a Job instance form the given pickle'd job tuple data."""
try:
unpickled_obj = loads(pickle_data)
assert isinstance(unpickled_obj, Job)
return unpickled_obj
except (AssertionError, AttributeError, IndexError, TypeError, KeyError):
raise UnpickleError('Could not unpickle Job.', pickle_data)

@ -1,8 +1,8 @@
from datetime import datetime import times
from functools import total_ordering from functools import total_ordering
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
from .exceptions import UnpickleError from .exceptions import NoSuchJobError, UnpickleError
@total_ordering @total_ordering
@ -58,21 +58,9 @@ class Queue(object):
return conn.llen(self.key) return conn.llen(self.key)
def _create_job(self, f, *args, **kwargs): def push_job_id(self, job_id):
"""Creates a Job instance for the given function call and attaches queue """Pushes a job ID on the corresponding Redis queue."""
meta data to it. conn.rpush(self.key, job_id)
"""
if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
job = Job.for_call(f, *args, **kwargs)
job.origin = self.name
return job
def enqueue_job(self, job):
"""Enqueues a pickled_job on the corresponding Redis queue."""
job.save()
conn.rpush(self.key, job.id)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Enqueues a function call for delayed execution. """Enqueues a function call for delayed execution.
@ -80,25 +68,55 @@ class Queue(object):
Expects the function to call, along with the arguments and keyword Expects the function to call, along with the arguments and keyword
arguments. arguments.
""" """
job = self._create_job(f, *args, **kwargs) if f.__module__ == '__main__':
job.enqueued_at = datetime.utcnow() raise ValueError('Functions from the __main__ module cannot be processed by workers.')
self.enqueue_job(job)
job = Job.for_call(f, *args, **kwargs)
job.origin = self.name
job.enqueued_at = times.now()
job.save()
self.push_job_id(job.id)
return Job(job.id) return Job(job.id)
def requeue(self, job): def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue.""" """Requeues an existing (typically a failed job) onto the queue."""
raise NotImplementedError('Implement this') raise NotImplementedError('Implement this')
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return conn.lpop(self.key)
@classmethod
def lpop(cls, queue_keys, blocking):
"""Helper method. Intermediate method to abstract away from some Redis
API details, where LPOP accepts only a single key, whereas BLPOP accepts
multiple. So if we want the non-blocking LPOP, we need to iterate over
all queues, do individual LPOPs, and return the result.
Until Redis receives a specific method for this, we'll have to wrap it
this way.
"""
if blocking:
queue_key, job_id = conn.blpop(queue_keys)
else:
for queue_key in queue_keys:
blob = conn.lpop(queue_key)
if blob is not None:
return queue_key, blob
return None
def dequeue(self): def dequeue(self):
"""Dequeues the function call at the front of this Queue. """Dequeues the front-most job from this queue.
Returns a Job instance, which can be executed or inspected. Returns a Job instance, which can be executed or inspected.
""" """
blob = conn.lpop(self.key) job_id = self.pop_job_id()
if blob is None: if job_id is None:
return None return None
try: try:
job = Job.unpickle(blob) job = Job.fetch(job_id)
except NoSuchJobError as e:
return None
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting
@ -107,20 +125,6 @@ class Queue(object):
job.origin = self job.origin = self
return job return job
@classmethod
def _lpop_any(cls, queue_keys):
"""Helper method. You should not call this directly.
Redis' BLPOP command takes multiple queue arguments, but LPOP can only
take a single queue. Therefore, we need to loop over all queues
manually, in order, and return None if no more work is available.
"""
for queue_key in queue_keys:
blob = conn.lpop(queue_key)
if blob is not None:
return (queue_key, blob)
return None
@classmethod @classmethod
def dequeue_any(cls, queues, blocking): def dequeue_any(cls, queues, blocking):
"""Class method returning the Job instance at the front of the given set """Class method returning the Job instance at the front of the given set
@ -130,18 +134,17 @@ class Queue(object):
either blocks execution of this function until new messages arrive on either blocks execution of this function until new messages arrive on
any of the queues, or returns None. any of the queues, or returns None.
""" """
queue_keys = map(lambda q: q.key, queues) queue_keys = [q.key for q in queues]
if blocking: result = cls.lpop(queue_keys, blocking)
queue_key, blob = conn.blpop(queue_keys) if result is None:
else: return None
redis_result = cls._lpop_any(queue_keys) queue_key, job_id = result
if redis_result is None:
return None
queue_key, blob = redis_result
queue = Queue.from_queue_key(queue_key) queue = Queue.from_queue_key(queue_key)
try: try:
job = Job.unpickle(blob) job = Job.fetch(job_id)
except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore)
return None
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting

@ -1,9 +1,9 @@
import sys import sys
import os import os
import errno import errno
import datetime
import random import random
import time import time
import times
import procname import procname
import socket import socket
import signal import signal
@ -233,7 +233,7 @@ class Worker(object):
Pops and performs all jobs on the current list of queues. When all Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` is True. queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed. The return value indicates whether any jobs were processed.
""" """
@ -249,7 +249,7 @@ class Worker(object):
break break
self.state = 'idle' self.state = 'idle'
qnames = self.queue_names() qnames = self.queue_names()
self.procline('Listening on %s' % (','.join(qnames))) self.procline('Listening on %s' % ','.join(qnames))
self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) self.log.info('*** Listening for work on %s...' % (', '.join(qnames)))
wait_for_job = not burst wait_for_job = not burst
try: try:
@ -321,7 +321,7 @@ class Worker(object):
fq = self.failure_queue fq = self.failure_queue
self.log.warning('Moving job to %s queue.' % (fq.name,)) self.log.warning('Moving job to %s queue.' % (fq.name,))
job.ended_at = datetime.datetime.utcnow() job.ended_at = times.now()
job.exc_info = e job.exc_info = e
fq._push(job.pickle()) fq._push(job.pickle())
else: else:

@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from tests import RQTestCase from tests import RQTestCase
from pickle import dumps, loads from pickle import loads
from rq.job import Job from rq.job import Job
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
@ -95,16 +95,26 @@ class TestJob(RQTestCase):
Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0')
def test_unpickle_errors(self): def test_dequeue_unreadable_data(self):
"""Handling of unpickl'ing errors.""" """Dequeue fails on unreadable data."""
with self.assertRaises(UnpickleError): # Set up
Job.unpickle('this is no pickle data') job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
# Just replace the data hkey with some random noise
self.testconn.hset(job.key, 'data', 'this is no pickle string')
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
Job.unpickle(13) job.refresh()
pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) # Set up (part B)
corrupt_data = pickle_data.replace('arbitrary', 'b0rken') job = Job.for_call(arbitrary_function, 3, 4, z=2)
with self.assertRaises(UnpickleError): job.save()
Job.unpickle(corrupt_data)
# Now slightly modify the job to make it unpickl'able (this is
# equivalent to a worker not having the most up-to-date source code and
# unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('arbitrary_function', 'broken')
self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError):
job.refresh()

@ -39,7 +39,7 @@ class TestQueue(RQTestCase):
def test_enqueue(self): def test_enqueue(self):
"""Enqueueing writes job IDs to queues.""" """Enqueueing job onto queues."""
q = Queue() q = Queue()
self.assertEquals(q.is_empty(), True) self.assertEquals(q.is_empty(), True)
@ -53,18 +53,51 @@ class TestQueue(RQTestCase):
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
def test_pop_job_id(self):
"""Popping job IDs from queues."""
# Set up
q = Queue()
uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
q.push_job_id(uuid)
# Pop it off the queue...
self.assertEquals(q.count, 1)
self.assertEquals(q.pop_job_id(), uuid)
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue(self): def test_dequeue(self):
"""Fetching work from specific queue.""" """Dequeueing jobs from queues."""
q = Queue('foo') # Set up
q.enqueue(testjob, 'Rick', foo='bar') q = Queue()
result = q.enqueue(testjob, 'Rick', foo='bar')
# Pull it off the queue (normally, a worker would do this) # Dequeue a job (not a job ID) off the queue
self.assertEquals(q.count, 1)
job = q.dequeue() job = q.dequeue()
self.assertEquals(job.id, result.id)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, q) self.assertEquals(job.origin, q)
self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar') self.assertEquals(job.kwargs['foo'], 'bar')
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue_ignores_nonexisting_jobs(self):
"""Dequeuing silently ignores non-existing jobs."""
q = Queue()
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 1)
self.assertEquals(q.dequeue(), None)
self.assertEquals(q.count, 0)
def test_dequeue_any(self): def test_dequeue_any(self):
"""Fetching work from any given queue.""" """Fetching work from any given queue."""
fooq = Queue('foo') fooq = Queue('foo')
@ -91,33 +124,15 @@ class TestQueue(RQTestCase):
self.assertEquals(job.origin, barq) self.assertEquals(job.origin, barq)
self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.')
def test_dequeue_unpicklable_data(self): def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Error handling of invalid pickle data.""" """Dequeuing (from any queue) silently ignores non-existing jobs."""
# Push non-pickle data on the queue
q = Queue('foo')
blob = 'this is nothing like pickled data'
self.testconn.rpush(q._key, blob)
with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing
# Push value pickle data, but not representing a job tuple
q = Queue('foo')
blob = dumps('this is pickled, but not a job tuple')
self.testconn.rpush(q._key, blob)
with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing
# Push slightly incorrect pickled data onto the queue (simulate
# a function that can't be imported from the worker)
q = Queue('foo')
job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) q = Queue('low')
blob = job_tuple.replace('testjob', 'fooobar') uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
self.testconn.rpush(q._key, blob) q.push_job_id(uuid)
with self.assertRaises(UnpickleError): # Dequeue simply ignores the missing job and returns None
q.dequeue() # error occurs when dequeue()'ing self.assertEquals(q.count, 1)
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None)
self.assertEquals(q.count, 0)

@ -21,7 +21,7 @@ class TestWorker(RQTestCase):
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
"""Worker processes unreadable job.""" """Worker ignores unreadable job."""
q = Queue() q = Queue()
failure_q = Queue('failure') failure_q = Queue('failure')
@ -32,13 +32,14 @@ class TestWorker(RQTestCase):
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job.for_call(failing_job, 3) job = Job.for_call(failing_job, 3)
pickled_job = job.pickle() job.save()
invalid_data = pickled_job.replace( data = self.testconn.hget(job.key, 'data')
'failing_job', 'nonexisting_job') invalid_data = data.replace('failing_job', 'nonexisting_job')
self.testconn.hset(job.key, 'data', invalid_data)
# We use the low-level internal function to enqueue any data (bypassing # We use the low-level internal function to enqueue any data (bypassing
# validity checks) # validity checks)
q._push(invalid_data) q.push_job_id(invalid_data)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)

Loading…
Cancel
Save