merging master

main
Jonathan Tushman 10 years ago
commit 1ae5a12a81

@ -83,7 +83,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None): id=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
@ -122,6 +122,7 @@ class Job(object):
# Extra meta data # Extra meta data
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job.ttl = ttl
job.timeout = timeout job.timeout = timeout
job._status = status job._status = status
@ -302,6 +303,7 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self.ttl = None
self._status = None self._status = None
self._dependency_id = None self._dependency_id = None
self.meta = {} self.meta = {}
@ -446,6 +448,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict()) connection.hmset(key, self.to_dict())
self.cleanup(self.ttl)
def cancel(self): def cancel(self):
"""Cancels the given job, which will prevent the job from ever being """Cancels the given job, which will prevent the job from ever being
@ -482,8 +485,15 @@ class Job(object):
return self._result return self._result
def get_ttl(self, default_ttl=None): def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result """Returns ttl for a job that determines how long a job will be
will be persisted. In the future, this method will also be responsible persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs. for determining ttl for repeated jobs.
""" """
return default_ttl if self.result_ttl is None else self.result_ttl return default_ttl if self.result_ttl is None else self.result_ttl
@ -504,14 +514,16 @@ class Job(object):
def cleanup(self, ttl=None, pipeline=None): def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually """Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its called after successful execution. How long we persist the job and its
result depends on the value of result_ttl: result depends on the value of ttl:
- If result_ttl is 0, cleanup the job immediately. - If ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds. - If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist - If ttl is negative, don't set an expiry to it (persist
forever) forever)
""" """
if ttl == 0: if ttl == 0:
self.cancel() self.cancel()
elif not ttl:
return
elif ttl > 0: elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)

@ -167,8 +167,8 @@ class Queue(object):
connection.rpush(self.key, job_id) connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None, result_ttl=None, ttl=None, description=None,
job_id=None): depends_on=None, job_id=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -190,7 +190,7 @@ class Queue(object):
# modifying the dependency. In this case we simply retry # modifying the dependency. In this case we simply retry
if depends_on is not None: if depends_on is not None:
if not isinstance(depends_on, self.job_class): if not isinstance(depends_on, self.job_class):
depends_on = Job.fetch(id=depends_on, connection=self.connection) depends_on = Job(id=depends_on, connection=self.connection)
with self.connection.pipeline() as pipe: with self.connection.pipeline() as pipe:
while True: while True:
try: try:
@ -229,6 +229,7 @@ class Queue(object):
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None) job_id = kwargs.pop('job_id', None)
@ -238,7 +239,7 @@ class Queue(object):
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id) job_id=job_id)

@ -8,9 +8,6 @@ class BaseRegistry(object):
""" """
Base implementation of job registry, implemented in Redis sorted set. Each job Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp). is stored as a key in the registry, scored by expiration time (unix timestamp).
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
@ -27,9 +24,9 @@ class BaseRegistry(object):
self.cleanup() self.cleanup()
return self.connection.zcard(self.key) return self.connection.zcard(self.key)
def add(self, job, timeout, pipeline=None): def add(self, job, ttl, pipeline=None):
"""Adds a job to StartedJobRegistry with expiry time of now + timeout.""" """Adds a job to a registry with expiry time of now + ttl."""
score = current_timestamp() + timeout score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None: if pipeline is not None:
return pipeline.zadd(self.key, score, job.id) return pipeline.zadd(self.key, score, job.id)
@ -39,10 +36,16 @@ class BaseRegistry(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id) return connection.zrem(self.key, job.id)
def get_expired_job_ids(self): def get_expired_job_ids(self, timestamp=None):
"""Returns job ids whose score are less than current timestamp.""" """Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
specified as seconds since the Unix epoch. timestamp defaults to call
time if unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, current_timestamp())] self.connection.zrangebyscore(self.key, 0, score)]
def get_job_ids(self, start=0, end=-1): def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids.""" """Returns list of all job ids."""
@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed Jobs are added to registry right before they are executed and removed
right after completion (success or failure). right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection) super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name self.key = 'rq:wip:%s' % name
def cleanup(self): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue.""" """Remove expired jobs from registry and add them to FailedQueue.
job_ids = self.get_expired_job_ids()
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.
"""
score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)
if job_ids: if job_ids:
failed_queue = FailedQueue(connection=self.connection) failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.zremrangebyscore(self.key, 0, current_timestamp()) pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()
return job_ids return job_ids
@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry):
super(FinishedJobRegistry, self).__init__(name, connection) super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name self.key = 'rq:finished:%s' % name
def cleanup(self): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.""" """Remove expired jobs from registry.
self.connection.zremrangebyscore(self.key, 0, current_timestamp())
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)

@ -546,7 +546,7 @@ class Worker(object):
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.ended_at = utcnow() job.ended_at = utcnow()
job._status = JobStatus.FINISHED job._status = JobStatus.FINISHED
@ -618,6 +618,16 @@ class Worker(object):
"""Pops the latest exception handler off of the exc handler stack.""" """Pops the latest exception handler off of the exc handler stack."""
return self._exc_handlers.pop() return self._exc_handlers.pop()
def __eq__(self, other):
"""Equality does not take the database/connection into account"""
if not isinstance(other, self.__class__):
raise TypeError('Cannot compare workers to other types (of workers)')
return self.name == other.name
def __hash__(self):
"""The hash does not take the database/connection into account"""
return hash(self.name)
class SimpleWorker(Worker): class SimpleWorker(Worker):
def _install_signal_handlers(self, *args, **kwargs): def _install_signal_handlers(self, *args, **kwargs):

@ -290,17 +290,27 @@ class TestJob(RQTestCase):
self.assertEqual(job.id, id) self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self) self.assertEqual(job.func, access_self)
def test_get_ttl(self): def test_get_result_ttl(self):
"""Getting job TTL.""" """Getting job result TTL."""
job_ttl = 1 job_result_ttl = 1
default_ttl = 2 default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl) job = Job.create(func=say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=say_hello, ttl=ttl)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), ttl)
self.assertEqual(job.get_ttl(), job_ttl)
job = Job.create(func=say_hello) job = Job.create(func=say_hello)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None) self.assertEqual(job.get_ttl(), None)
def test_cleanup(self): def test_cleanup(self):

@ -355,8 +355,6 @@ class TestQueue(RQTestCase):
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects.""" """Enqueueing jobs should work as expected by id as well as job-objects."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
# We need to save the job for the ID to exist in redis
parent_job.save()
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id) q.enqueue_call(say_hello, depends_on=parent_job.id)

@ -27,6 +27,10 @@ class TestRegistry(RQTestCase):
self.assertLess(self.testconn.zscore(self.registry.key, job.id), self.assertLess(self.testconn.zscore(self.registry.key, job.id),
timestamp + 1002) timestamp + 1002)
# Ensure that a timeout of -1 results in a score of -1
self.registry.add(job, -1)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), -1)
# Ensure that job is properly removed from sorted set # Ensure that job is properly removed from sorted set
self.registry.remove(job) self.registry.remove(job)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
@ -44,14 +48,22 @@ class TestRegistry(RQTestCase):
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar'])
def test_cleanup(self): def test_cleanup(self):
"""Moving expired jobs to FailedQueue.""" """Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn) failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty()) self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 2, 'foo')
self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2)
self.registry.cleanup() self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
@ -99,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase):
timestamp = current_timestamp() timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.registry.cleanup() self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar']) self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self): def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry.""" """Completed jobs are added to FinishedJobRegistry."""

@ -370,3 +370,12 @@ class TestWorker(RQTestCase):
# The suspension should be expired now, and a burst of work should now clear the queue # The suspension should be expired now, and a burst of work should now clear the queue
w.work(burst=True) w.work(burst=True)
assert q.count == 0 assert q.count == 0
def test_worker_hash_(self):
"""Workers are hashed by their .name attribute"""
q = Queue('foo')
w1 = Worker([q], name="worker1")
w2 = Worker([q], name="worker2")
w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2)

Loading…
Cancel
Save