From 8d61d3bf26e84bfa5c1c6f4c212b3eb944e9b792 Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Mon, 5 Aug 2013 15:14:07 +0300 Subject: [PATCH] port string handling to py3 Redis uses byte values for everything. We save queue names and job IDs as unicode. So we need to convert every time we get data from redis. --- rq/compat/__init__.py | 19 +++++++++++++++++++ rq/job.py | 22 +++++++++++----------- rq/queue.py | 16 +++++++++------- rq/worker.py | 3 ++- tests/test_job.py | 18 +++++++++--------- tests/test_queue.py | 6 ++++-- tests/test_worker.py | 2 +- 7 files changed, 55 insertions(+), 31 deletions(-) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index e08e4c0..9ce6e4e 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -47,6 +47,25 @@ if PY2: string_types = (str, unicode) text_type = unicode + def as_text(v): + return v + + def decode_redis_hash(h): + return h + else: string_types = (str,) text_type = str + + def as_text(v): + if v is None: + return None + elif isinstance(v, bytes): + return v.decode('ascii') + elif isinstance(v, str): + return v + else: + raise ValueError('Unknown type %r' % type(v)) + + def decode_redis_hash(h): + return dict((as_text(k), h[k]) for k in h) diff --git a/rq/job.py b/rq/job.py index 6871419..6bf9189 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,7 +9,7 @@ except ImportError: # noqa from .local import LocalStack from .connections import resolve_connection from .exceptions import UnpickleError, NoSuchJobError -from rq.compat import text_type +from rq.compat import text_type, decode_redis_hash, as_text def enum(name, *sequential, **named): @@ -98,7 +98,7 @@ class Job(object): return self._func_name def _get_status(self): - self._status = self.connection.hget(self.key, 'status') + self._status = as_text(self.connection.hget(self.key, 'status')) return self._status def _set_status(self, status): @@ -210,7 +210,7 @@ class Job(object): @classmethod def key_for(cls, job_id): """The Redis key that is used to store job hash under.""" - return 'rq:job:%s' % (job_id,) + return b'rq:job:' + job_id.encode('ascii') @property def key(self): @@ -259,7 +259,7 @@ class Job(object): Will raise a NoSuchJobError if no corresponding Redis key exists. """ key = self.key - obj = self.connection.hgetall(key) + obj = decode_redis_hash(self.connection.hgetall(key)) if len(obj) == 0: raise NoSuchJobError('No such job: %s' % (key,)) @@ -267,7 +267,7 @@ class Job(object): if date_str is None: return None else: - return times.to_universal(date_str) + return times.to_universal(as_text(date_str)) try: self.data = obj['data'] @@ -279,16 +279,16 @@ class Job(object): except UnpickleError: if not safe: raise - self.created_at = to_date(obj.get('created_at')) - self.origin = obj.get('origin') - self.description = obj.get('description') - self.enqueued_at = to_date(obj.get('enqueued_at')) - self.ended_at = to_date(obj.get('ended_at')) + self.created_at = to_date(as_text(obj.get('created_at'))) + self.origin = as_text(obj.get('origin')) + self.description = as_text(obj.get('description')) + self.enqueued_at = to_date(as_text(obj.get('enqueued_at'))) + self.ended_at = to_date(as_text(obj.get('ended_at'))) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa - self._status = obj.get('status') if obj.get('status') else None + self._status = as_text(obj.get('status') if obj.get('status') else None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def save(self, pipeline=None): diff --git a/rq/queue.py b/rq/queue.py index f3b5f92..3f9cd96 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,7 +3,7 @@ from .connections import resolve_connection from .job import Job, Status from .exceptions import (NoSuchJobError, UnpickleError, InvalidJobOperationError, DequeueTimeout) -from .compat import total_ordering, string_types +from .compat import total_ordering, string_types, as_text def get_failed_queue(connection=None): @@ -27,8 +27,9 @@ class Queue(object): connection = resolve_connection(connection) def to_queue(queue_key): - return cls.from_queue_key(queue_key, connection=connection) - return map(to_queue, connection.keys('%s*' % prefix)) + return cls.from_queue_key(as_text(queue_key), + connection=connection) + return list(map(to_queue, connection.keys('%s*' % prefix))) @classmethod def from_queue_key(cls, queue_key, connection=None): @@ -81,7 +82,8 @@ class Queue(object): end = offset + (length - 1) else: end = length - return self.connection.lrange(self.key, start, end) + return [as_text(job_id) for job_id in + self.connection.lrange(self.key, start, end)] def get_jobs(self, offset=0, length=-1): """Returns a slice of jobs in the queue.""" @@ -116,7 +118,7 @@ class Queue(object): self.connection.rename(self.key, COMPACT_QUEUE) while True: - job_id = self.connection.lpop(COMPACT_QUEUE) + job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) if job_id is None: break if Job.exists(job_id, self.connection): @@ -204,7 +206,7 @@ class Queue(object): def pop_job_id(self): """Pops a given job ID from this Redis queue.""" - return self.connection.lpop(self.key) + return as_text(self.connection.lpop(self.key)) @classmethod def lpop(cls, queue_keys, timeout, connection=None): @@ -274,7 +276,7 @@ class Queue(object): result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None - queue_key, job_id = result + queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection) try: job = Job.fetch(job_id, connection=connection) diff --git a/rq/worker.py b/rq/worker.py index 8f1da27..4717c38 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,6 +21,7 @@ from .logutils import setup_loghandlers from .exceptions import NoQueueError, UnpickleError, DequeueTimeout from .timeouts import death_penalty_after from .version import VERSION +from rq.compat import text_type green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -431,7 +432,7 @@ class Worker(object): if rv is None: self.log.info('Job OK') else: - self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) + self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),)) if result_ttl == 0: self.log.info('Result discarded immediately.') diff --git a/tests/test_job.py b/tests/test_job.py index 3135178..522f1a6 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -78,7 +78,7 @@ class TestJob(RQTestCase): # Saving creates a Redis hash self.assertEquals(self.testconn.exists(job.key), False) job.save() - self.assertEquals(self.testconn.type(job.key), 'hash') + self.assertEquals(self.testconn.type(job.key), b'hash') # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) @@ -108,15 +108,15 @@ class TestJob(RQTestCase): job.save() expected_date = strip_milliseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at') + stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii') self.assertEquals( times.to_universal(stored_date), expected_date) # ... and no other keys are stored - self.assertItemsEqual( + self.assertEqual( self.testconn.hkeys(job.key), - ['created_at']) + [b'created_at']) def test_persistence_of_typical_jobs(self): """Storing typical jobs.""" @@ -124,15 +124,15 @@ class TestJob(RQTestCase): job.save() expected_date = strip_milliseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at') + stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii') self.assertEquals( times.to_universal(stored_date), expected_date) # ... and no other keys are stored - self.assertItemsEqual( - self.testconn.hkeys(job.key), - ['created_at', 'data', 'description']) + self.assertEqual( + sorted(self.testconn.hkeys(job.key)), + [b'created_at', b'data', b'description']) def test_store_then_fetch(self): """Store, then fetch.""" @@ -172,7 +172,7 @@ class TestJob(RQTestCase): # equivalent to a worker not having the most up-to-date source code # and unable to import the function) data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace('say_hello', 'shut_up') + unimportable_data = data.replace(b'say_hello', b'shut_up') self.testconn.hset(job.key, 'data', unimportable_data) job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index 0385e0a..a6dabb2 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -107,7 +107,9 @@ class TestQueue(RQTestCase): # Inspect data inside Redis q_key = 'rq:queue:default' self.assertEquals(self.testconn.llen(q_key), 1) - self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + self.assertEquals( + self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'), + job_id) def test_enqueue_sets_metadata(self): """Enqueueing job onto queues modifies meta data.""" @@ -258,7 +260,7 @@ class TestFailedQueue(RQTestCase): job.save() get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa - self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa + self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa self.assertEquals(get_failed_queue().count, 1) get_failed_queue().requeue(job.id) diff --git a/tests/test_worker.py b/tests/test_worker.py index d9e2fe5..d1b2632 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -57,7 +57,7 @@ class TestWorker(RQTestCase): job = Job.create(func=div_by_zero, args=(3,)) job.save() data = self.testconn.hget(job.key, 'data') - invalid_data = data.replace('div_by_zero', 'nonexisting_job') + invalid_data = data.replace(b'div_by_zero', b'nonexisting_job') assert data != invalid_data self.testconn.hset(job.key, 'data', invalid_data)