diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index e08e4c0..9ce6e4e 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -47,6 +47,25 @@ if PY2: string_types = (str, unicode) text_type = unicode + def as_text(v): + return v + + def decode_redis_hash(h): + return h + else: string_types = (str,) text_type = str + + def as_text(v): + if v is None: + return None + elif isinstance(v, bytes): + return v.decode('ascii') + elif isinstance(v, str): + return v + else: + raise ValueError('Unknown type %r' % type(v)) + + def decode_redis_hash(h): + return dict((as_text(k), h[k]) for k in h) diff --git a/rq/job.py b/rq/job.py index 6871419..6bf9189 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,7 +9,7 @@ except ImportError: # noqa from .local import LocalStack from .connections import resolve_connection from .exceptions import UnpickleError, NoSuchJobError -from rq.compat import text_type +from rq.compat import text_type, decode_redis_hash, as_text def enum(name, *sequential, **named): @@ -98,7 +98,7 @@ class Job(object): return self._func_name def _get_status(self): - self._status = self.connection.hget(self.key, 'status') + self._status = as_text(self.connection.hget(self.key, 'status')) return self._status def _set_status(self, status): @@ -210,7 +210,7 @@ class Job(object): @classmethod def key_for(cls, job_id): """The Redis key that is used to store job hash under.""" - return 'rq:job:%s' % (job_id,) + return b'rq:job:' + job_id.encode('ascii') @property def key(self): @@ -259,7 +259,7 @@ class Job(object): Will raise a NoSuchJobError if no corresponding Redis key exists. """ key = self.key - obj = self.connection.hgetall(key) + obj = decode_redis_hash(self.connection.hgetall(key)) if len(obj) == 0: raise NoSuchJobError('No such job: %s' % (key,)) @@ -267,7 +267,7 @@ class Job(object): if date_str is None: return None else: - return times.to_universal(date_str) + return times.to_universal(as_text(date_str)) try: self.data = obj['data'] @@ -279,16 +279,16 @@ class Job(object): except UnpickleError: if not safe: raise - self.created_at = to_date(obj.get('created_at')) - self.origin = obj.get('origin') - self.description = obj.get('description') - self.enqueued_at = to_date(obj.get('enqueued_at')) - self.ended_at = to_date(obj.get('ended_at')) + self.created_at = to_date(as_text(obj.get('created_at'))) + self.origin = as_text(obj.get('origin')) + self.description = as_text(obj.get('description')) + self.enqueued_at = to_date(as_text(obj.get('enqueued_at'))) + self.ended_at = to_date(as_text(obj.get('ended_at'))) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa - self._status = obj.get('status') if obj.get('status') else None + self._status = as_text(obj.get('status') if obj.get('status') else None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def save(self, pipeline=None): diff --git a/rq/queue.py b/rq/queue.py index f3b5f92..3f9cd96 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,7 +3,7 @@ from .connections import resolve_connection from .job import Job, Status from .exceptions import (NoSuchJobError, UnpickleError, InvalidJobOperationError, DequeueTimeout) -from .compat import total_ordering, string_types +from .compat import total_ordering, string_types, as_text def get_failed_queue(connection=None): @@ -27,8 +27,9 @@ class Queue(object): connection = resolve_connection(connection) def to_queue(queue_key): - return cls.from_queue_key(queue_key, connection=connection) - return map(to_queue, connection.keys('%s*' % prefix)) + return cls.from_queue_key(as_text(queue_key), + connection=connection) + return list(map(to_queue, connection.keys('%s*' % prefix))) @classmethod def from_queue_key(cls, queue_key, connection=None): @@ -81,7 +82,8 @@ class Queue(object): end = offset + (length - 1) else: end = length - return self.connection.lrange(self.key, start, end) + return [as_text(job_id) for job_id in + self.connection.lrange(self.key, start, end)] def get_jobs(self, offset=0, length=-1): """Returns a slice of jobs in the queue.""" @@ -116,7 +118,7 @@ class Queue(object): self.connection.rename(self.key, COMPACT_QUEUE) while True: - job_id = self.connection.lpop(COMPACT_QUEUE) + job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) if job_id is None: break if Job.exists(job_id, self.connection): @@ -204,7 +206,7 @@ class Queue(object): def pop_job_id(self): """Pops a given job ID from this Redis queue.""" - return self.connection.lpop(self.key) + return as_text(self.connection.lpop(self.key)) @classmethod def lpop(cls, queue_keys, timeout, connection=None): @@ -274,7 +276,7 @@ class Queue(object): result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None - queue_key, job_id = result + queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection) try: job = Job.fetch(job_id, connection=connection) diff --git a/rq/worker.py b/rq/worker.py index 8f1da27..4717c38 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,6 +21,7 @@ from .logutils import setup_loghandlers from .exceptions import NoQueueError, UnpickleError, DequeueTimeout from .timeouts import death_penalty_after from .version import VERSION +from rq.compat import text_type green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -431,7 +432,7 @@ class Worker(object): if rv is None: self.log.info('Job OK') else: - self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) + self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),)) if result_ttl == 0: self.log.info('Result discarded immediately.') diff --git a/tests/test_job.py b/tests/test_job.py index 3135178..522f1a6 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -78,7 +78,7 @@ class TestJob(RQTestCase): # Saving creates a Redis hash self.assertEquals(self.testconn.exists(job.key), False) job.save() - self.assertEquals(self.testconn.type(job.key), 'hash') + self.assertEquals(self.testconn.type(job.key), b'hash') # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) @@ -108,15 +108,15 @@ class TestJob(RQTestCase): job.save() expected_date = strip_milliseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at') + stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii') self.assertEquals( times.to_universal(stored_date), expected_date) # ... and no other keys are stored - self.assertItemsEqual( + self.assertEqual( self.testconn.hkeys(job.key), - ['created_at']) + [b'created_at']) def test_persistence_of_typical_jobs(self): """Storing typical jobs.""" @@ -124,15 +124,15 @@ class TestJob(RQTestCase): job.save() expected_date = strip_milliseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at') + stored_date = self.testconn.hget(job.key, 'created_at').decode('ascii') self.assertEquals( times.to_universal(stored_date), expected_date) # ... and no other keys are stored - self.assertItemsEqual( - self.testconn.hkeys(job.key), - ['created_at', 'data', 'description']) + self.assertEqual( + sorted(self.testconn.hkeys(job.key)), + [b'created_at', b'data', b'description']) def test_store_then_fetch(self): """Store, then fetch.""" @@ -172,7 +172,7 @@ class TestJob(RQTestCase): # equivalent to a worker not having the most up-to-date source code # and unable to import the function) data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace('say_hello', 'shut_up') + unimportable_data = data.replace(b'say_hello', b'shut_up') self.testconn.hset(job.key, 'data', unimportable_data) job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index 0385e0a..a6dabb2 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -107,7 +107,9 @@ class TestQueue(RQTestCase): # Inspect data inside Redis q_key = 'rq:queue:default' self.assertEquals(self.testconn.llen(q_key), 1) - self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + self.assertEquals( + self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'), + job_id) def test_enqueue_sets_metadata(self): """Enqueueing job onto queues modifies meta data.""" @@ -258,7 +260,7 @@ class TestFailedQueue(RQTestCase): job.save() get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa - self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa + self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa self.assertEquals(get_failed_queue().count, 1) get_failed_queue().requeue(job.id) diff --git a/tests/test_worker.py b/tests/test_worker.py index d9e2fe5..d1b2632 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -57,7 +57,7 @@ class TestWorker(RQTestCase): job = Job.create(func=div_by_zero, args=(3,)) job.save() data = self.testconn.hget(job.key, 'data') - invalid_data = data.replace('div_by_zero', 'nonexisting_job') + invalid_data = data.replace(b'div_by_zero', b'nonexisting_job') assert data != invalid_data self.testconn.hset(job.key, 'data', invalid_data)