port string handling to py3

Redis uses byte values for everything. We save queue names and job
IDs as unicode. So we need to convert every time we get data from redis.
main
Alex Morega 12 years ago
parent 670a4e2a4e
commit 8d61d3bf26

@ -47,6 +47,25 @@ if PY2:
string_types = (str, unicode) string_types = (str, unicode)
text_type = unicode text_type = unicode
def as_text(v):
return v
def decode_redis_hash(h):
return h
else: else:
string_types = (str,) string_types = (str,)
text_type = str text_type = str
def as_text(v):
if v is None:
return None
elif isinstance(v, bytes):
return v.decode('ascii')
elif isinstance(v, str):
return v
else:
raise ValueError('Unknown type %r' % type(v))
def decode_redis_hash(h):
return dict((as_text(k), h[k]) for k in h)

@ -9,7 +9,7 @@ except ImportError: # noqa
from .local import LocalStack from .local import LocalStack
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import UnpickleError, NoSuchJobError from .exceptions import UnpickleError, NoSuchJobError
from rq.compat import text_type from rq.compat import text_type, decode_redis_hash, as_text
def enum(name, *sequential, **named): def enum(name, *sequential, **named):
@ -98,7 +98,7 @@ class Job(object):
return self._func_name return self._func_name
def _get_status(self): def _get_status(self):
self._status = self.connection.hget(self.key, 'status') self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status return self._status
def _set_status(self, status): def _set_status(self, status):
@ -210,7 +210,7 @@ class Job(object):
@classmethod @classmethod
def key_for(cls, job_id): def key_for(cls, job_id):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job hash under."""
return 'rq:job:%s' % (job_id,) return b'rq:job:' + job_id.encode('ascii')
@property @property
def key(self): def key(self):
@ -259,7 +259,7 @@ class Job(object):
Will raise a NoSuchJobError if no corresponding Redis key exists. Will raise a NoSuchJobError if no corresponding Redis key exists.
""" """
key = self.key key = self.key
obj = self.connection.hgetall(key) obj = decode_redis_hash(self.connection.hgetall(key))
if len(obj) == 0: if len(obj) == 0:
raise NoSuchJobError('No such job: %s' % (key,)) raise NoSuchJobError('No such job: %s' % (key,))
@ -267,7 +267,7 @@ class Job(object):
if date_str is None: if date_str is None:
return None return None
else: else:
return times.to_universal(date_str) return times.to_universal(as_text(date_str))
try: try:
self.data = obj['data'] self.data = obj['data']
@ -279,16 +279,16 @@ class Job(object):
except UnpickleError: except UnpickleError:
if not safe: if not safe:
raise raise
self.created_at = to_date(obj.get('created_at')) self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = obj.get('origin') self.origin = as_text(obj.get('origin'))
self.description = obj.get('description') self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(obj.get('enqueued_at')) self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.ended_at = to_date(obj.get('ended_at')) self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info') self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None self._status = as_text(obj.get('status') if obj.get('status') else None)
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self, pipeline=None): def save(self, pipeline=None):

@ -3,7 +3,7 @@ from .connections import resolve_connection
from .job import Job, Status from .job import Job, Status
from .exceptions import (NoSuchJobError, UnpickleError, from .exceptions import (NoSuchJobError, UnpickleError,
InvalidJobOperationError, DequeueTimeout) InvalidJobOperationError, DequeueTimeout)
from .compat import total_ordering, string_types from .compat import total_ordering, string_types, as_text
def get_failed_queue(connection=None): def get_failed_queue(connection=None):
@ -27,8 +27,9 @@ class Queue(object):
connection = resolve_connection(connection) connection = resolve_connection(connection)
def to_queue(queue_key): def to_queue(queue_key):
return cls.from_queue_key(queue_key, connection=connection) return cls.from_queue_key(as_text(queue_key),
return map(to_queue, connection.keys('%s*' % prefix)) connection=connection)
return list(map(to_queue, connection.keys('%s*' % prefix)))
@classmethod @classmethod
def from_queue_key(cls, queue_key, connection=None): def from_queue_key(cls, queue_key, connection=None):
@ -81,7 +82,8 @@ class Queue(object):
end = offset + (length - 1) end = offset + (length - 1)
else: else:
end = length end = length
return self.connection.lrange(self.key, start, end) return [as_text(job_id) for job_id in
self.connection.lrange(self.key, start, end)]
def get_jobs(self, offset=0, length=-1): def get_jobs(self, offset=0, length=-1):
"""Returns a slice of jobs in the queue.""" """Returns a slice of jobs in the queue."""
@ -116,7 +118,7 @@ class Queue(object):
self.connection.rename(self.key, COMPACT_QUEUE) self.connection.rename(self.key, COMPACT_QUEUE)
while True: while True:
job_id = self.connection.lpop(COMPACT_QUEUE) job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
if job_id is None: if job_id is None:
break break
if Job.exists(job_id, self.connection): if Job.exists(job_id, self.connection):
@ -204,7 +206,7 @@ class Queue(object):
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""
return self.connection.lpop(self.key) return as_text(self.connection.lpop(self.key))
@classmethod @classmethod
def lpop(cls, queue_keys, timeout, connection=None): def lpop(cls, queue_keys, timeout, connection=None):
@ -274,7 +276,7 @@ class Queue(object):
result = cls.lpop(queue_keys, timeout, connection=connection) result = cls.lpop(queue_keys, timeout, connection=connection)
if result is None: if result is None:
return None return None
queue_key, job_id = result queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection) queue = cls.from_queue_key(queue_key, connection=connection)
try: try:
job = Job.fetch(job_id, connection=connection) job = Job.fetch(job_id, connection=connection)

@ -21,6 +21,7 @@ from .logutils import setup_loghandlers
from .exceptions import NoQueueError, UnpickleError, DequeueTimeout from .exceptions import NoQueueError, UnpickleError, DequeueTimeout
from .timeouts import death_penalty_after from .timeouts import death_penalty_after
from .version import VERSION from .version import VERSION
from rq.compat import text_type
green = make_colorizer('darkgreen') green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
@ -431,7 +432,7 @@ class Worker(object):
if rv is None: if rv is None:
self.log.info('Job OK') self.log.info('Job OK')
else: else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),))
if result_ttl == 0: if result_ttl == 0:
self.log.info('Result discarded immediately.') self.log.info('Result discarded immediately.')

@ -78,7 +78,7 @@ class TestJob(RQTestCase):
# Saving creates a Redis hash # Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False) self.assertEquals(self.testconn.exists(job.key), False)
job.save() job.save()
self.assertEquals(self.testconn.type(job.key), 'hash') self.assertEquals(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data # Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data')) unpickled_data = loads(self.testconn.hget(job.key, 'data'))
@ -108,15 +108,15 @@ class TestJob(RQTestCase):
job.save() job.save()
expected_date = strip_milliseconds(job.created_at) expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at') stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii')
self.assertEquals( self.assertEquals(
times.to_universal(stored_date), times.to_universal(stored_date),
expected_date) expected_date)
# ... and no other keys are stored # ... and no other keys are stored
self.assertItemsEqual( self.assertEqual(
self.testconn.hkeys(job.key), self.testconn.hkeys(job.key),
['created_at']) [b'created_at'])
def test_persistence_of_typical_jobs(self): def test_persistence_of_typical_jobs(self):
"""Storing typical jobs.""" """Storing typical jobs."""
@ -124,15 +124,15 @@ class TestJob(RQTestCase):
job.save() job.save()
expected_date = strip_milliseconds(job.created_at) expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at') stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii')
self.assertEquals( self.assertEquals(
times.to_universal(stored_date), times.to_universal(stored_date),
expected_date) expected_date)
# ... and no other keys are stored # ... and no other keys are stored
self.assertItemsEqual( self.assertEqual(
self.testconn.hkeys(job.key), sorted(self.testconn.hkeys(job.key)),
['created_at', 'data', 'description']) [b'created_at', b'data', b'description'])
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch.""" """Store, then fetch."""
@ -172,7 +172,7 @@ class TestJob(RQTestCase):
# equivalent to a worker not having the most up-to-date source code # equivalent to a worker not having the most up-to-date source code
# and unable to import the function) # and unable to import the function)
data = self.testconn.hget(job.key, 'data') data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('say_hello', 'shut_up') unimportable_data = data.replace(b'say_hello', b'shut_up')
self.testconn.hset(job.key, 'data', unimportable_data) self.testconn.hset(job.key, 'data', unimportable_data)
job.refresh() job.refresh()

@ -107,7 +107,9 @@ class TestQueue(RQTestCase):
# Inspect data inside Redis # Inspect data inside Redis
q_key = 'rq:queue:default' q_key = 'rq:queue:default'
self.assertEquals(self.testconn.llen(q_key), 1) self.assertEquals(self.testconn.llen(q_key), 1)
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) self.assertEquals(
self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'),
job_id)
def test_enqueue_sets_metadata(self): def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data.""" """Enqueueing job onto queues modifies meta data."""
@ -258,7 +260,7 @@ class TestFailedQueue(RQTestCase):
job.save() job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEquals(get_failed_queue().count, 1) self.assertEquals(get_failed_queue().count, 1)
get_failed_queue().requeue(job.id) get_failed_queue().requeue(job.id)

@ -57,7 +57,7 @@ class TestWorker(RQTestCase):
job = Job.create(func=div_by_zero, args=(3,)) job = Job.create(func=div_by_zero, args=(3,))
job.save() job.save()
data = self.testconn.hget(job.key, 'data') data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace('div_by_zero', 'nonexisting_job') invalid_data = data.replace(b'div_by_zero', b'nonexisting_job')
assert data != invalid_data assert data != invalid_data
self.testconn.hset(job.key, 'data', invalid_data) self.testconn.hset(job.key, 'data', invalid_data)

Loading…
Cancel
Save