Merge branch 'master' of git://github.com/nvie/rq into job_dependency

Conflicts:
	rq/queue.py
main
Selwin Ong 12 years ago
commit e7e8579888

@ -1,12 +1,17 @@
### 0.3.8
(not yet released)
- `rqworker` and `rqinfo` have a `--url` argument to connect to a Redis url.
- `rqworker` and `rqinfo` have a `--socket` option to connect to a Redis server
through a Unix socket.
- `rqworker` reads `SENTRY_DSN` from the environment, unless specifically
provided on the command line.
- `Queue` has a new API that supports paging `get_jobs(3, 7)`, which will
return at most 7 jobs, starting from the 3rd.
### 0.3.7
(February 26th, 2013)

@ -66,25 +66,38 @@ class Queue(object):
"""Returns whether the current queue is empty."""
return self.count == 0
def safe_fetch_job(self, job_id):
try:
job = Job.safe_fetch(job_id, connection=self.connection)
except NoSuchJobError:
self.remove(job_id)
return None
except UnpickleError:
return None
return job
def get_job_ids(self, start=0, limit=-1):
"""Returns a slice of job IDs in the queue."""
if limit >= 0:
end = start + limit
else:
end = limit
return self.connection.lrange(self.key, start, end)
def get_jobs(self, start=0, limit=-1):
"""Returns a slice of jobs in the queue."""
job_ids = self.get_job_ids(start, limit)
return compact([self.safe_fetch_job(job_id) for job_id in job_ids])
@property
def job_ids(self):
"""Returns a list of all job IDS in the queue."""
return self.connection.lrange(self.key, 0, -1)
return self.get_job_ids()
@property
def jobs(self):
"""Returns a list of all (valid) jobs in the queue."""
def safe_fetch(job_id):
try:
job = Job.safe_fetch(job_id, connection=self.connection)
except NoSuchJobError:
self.remove(job_id)
return None
except UnpickleError:
return None
return job
return compact([safe_fetch(job_id) for job_id in self.job_ids])
return self.get_jobs()
@property
def count(self):
@ -115,6 +128,7 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, after=None):
"""Creates a job to represent the delayed function call and enqueues

@ -7,7 +7,8 @@ def add_standard_arguments(parser):
parser.add_argument('--config', '-c', default=None,
help='Module containing RQ settings.')
parser.add_argument('--url', '-u', default=None,
help='URL describing Redis connection details')
help='URL describing Redis connection details. '
'Overrides other connection arguments if supplied.')
parser.add_argument('--host', '-H', default=None,
help='The Redis hostname (default: localhost)')
parser.add_argument('--port', '-p', default=None,
@ -47,13 +48,10 @@ def setup_default_arguments(args, settings):
if args.password is None:
args.password = settings.get('REDIS_PASSWORD', None)
if not args.queues:
args.queues = settings.get('QUEUES', ['default'])
def setup_redis(args):
if args.url is not None:
redis_conn = redis.StrictRedis.from_url(args.url, db=args.db)
redis_conn = redis.StrictRedis.from_url(args.url)
else:
redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
password=args.password, unix_socket_path=args.socket)

@ -21,6 +21,7 @@ def pad(s, pad_to_length):
"""Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,)
def get_scale(x):
"""Finds the lowest scale where x <= scale."""
scales = [20, 50, 100, 200, 400, 600, 800, 1000]
@ -29,6 +30,7 @@ def get_scale(x):
return scale
return x
def state_symbol(state):
symbols = {
'busy': red('busy'),
@ -186,4 +188,3 @@ def main():
except KeyboardInterrupt:
print
sys.exit(0)

@ -32,6 +32,19 @@ def parse_args():
return parser.parse_args()
def setup_loghandlers_from_args(args):
if args.verbose and args.quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
if args.verbose:
level = 'DEBUG'
elif args.quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)
def main():
args = parse_args()
@ -44,21 +57,15 @@ def main():
setup_default_arguments(args, settings)
# Other default arguments
# Worker specific default arguments
if not args.queues:
args.queues = settings.get('QUEUES', ['default'])
if args.sentry_dsn is None:
args.sentry_dsn = settings.get('SENTRY_DSN',
os.environ.get('SENTRY_DSN', None))
if args.verbose and args.quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
if args.verbose:
level = 'DEBUG'
elif args.quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)
setup_loghandlers_from_args(args)
setup_redis(args)
cleanup_ghosts()

@ -49,13 +49,16 @@ def access_self():
return job.id
class Calculator(object):
"""Test instance methods."""
def __init__(self, denominator):
self.denominator = denominator
class Number(object):
def __init__(self, value):
self.value = value
def calculate(self, x, y):
return x * y / self.denominator
@classmethod
def divide(cls, x, y):
return x * y
def div(self, y):
return self.value / y
with Connection():

@ -1,7 +1,7 @@
import times
from datetime import datetime
from tests import RQTestCase
from tests.fixtures import Calculator, some_calculation, say_hello, access_self
from tests.fixtures import Number, some_calculation, say_hello, access_self
from tests.helpers import strip_milliseconds
from cPickle import loads
from rq.job import Job, get_current_job
@ -51,13 +51,13 @@ class TestJob(RQTestCase):
def test_create_instance_method_job(self):
"""Creation of jobs for instance methods."""
c = Calculator(2)
job = Job.create(func=c.calculate, args=(3, 4))
n = Number(2)
job = Job.create(func=n.div, args=(4,))
# Job data is set
self.assertEquals(job.func, c.calculate)
self.assertEquals(job.instance, c)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.func, n.div)
self.assertEquals(job.instance, n)
self.assertEquals(job.args, (4,))
def test_create_job_from_string_function(self):
"""Creation of jobs using string specifier."""

@ -1,5 +1,5 @@
from tests import RQTestCase
from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation
from tests.fixtures import Number, div_by_zero, say_hello, some_calculation
from rq import Queue, get_failed_queue
from rq.job import Job, Status
from rq.exceptions import InvalidJobOperationError
@ -161,14 +161,26 @@ class TestQueue(RQTestCase):
def test_dequeue_instance_method(self):
"""Dequeueing instance method jobs from queues."""
q = Queue()
c = Calculator(2)
result = q.enqueue(c.calculate, 3, 4)
n = Number(2)
q.enqueue(n.div, 4)
job = q.dequeue()
# The instance has been pickled and unpickled, so it is now a separate
# object. Test for equality using each object's __dict__ instead.
self.assertEquals(job.instance.__dict__, c.__dict__)
self.assertEquals(job.func.__name__, 'calculate')
self.assertEquals(job.instance.__dict__, n.__dict__)
self.assertEquals(job.func.__name__, 'div')
self.assertEquals(job.args, (4,))
def test_dequeue_class_method(self):
"""Dequeueing class method jobs from queues."""
q = Queue()
q.enqueue(Number.divide, 3, 4)
job = q.dequeue()
self.assertEquals(job.instance.__dict__, Number.__dict__)
self.assertEquals(job.func.__name__, 'divide')
self.assertEquals(job.args, (3, 4))
def test_dequeue_ignores_nonexisting_jobs(self):

Loading…
Cancel
Save