Python 3 compatibility with worker.get_current_job().

main
Selwin Ong 11 years ago
parent 802ecb5ccb
commit 2fe5d9e25e

@ -244,7 +244,7 @@ class Worker(object):
def get_current_job_id(self, pipeline=None): def get_current_job_id(self, pipeline=None):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
return connection.hget(self.key, 'current_job') return as_text(connection.hget(self.key, 'current_job'))
def get_current_job(self): def get_current_job(self):
"""Returns the job id of the currently executing job.""" """Returns the job id of the currently executing job."""

@ -4,6 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, create_file, \
create_file_after_timeout create_file_after_timeout
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import Queue, Worker, get_failed_queue from rq import Queue, Worker, get_failed_queue
from rq.compat import as_text
from rq.job import Job, Status from rq.job import Job, Status
@ -185,7 +186,7 @@ class TestWorker(RQTestCase):
# TODO: Having to do the manual refresh() here is really ugly! # TODO: Having to do the manual refresh() here is really ugly!
res.refresh() res.refresh()
self.assertIn('JobTimeoutException', res.exc_info) self.assertIn('JobTimeoutException', as_text(res.exc_info))
def test_worker_sets_result_ttl(self): def test_worker_sets_result_ttl(self):
"""Ensure that Worker properly sets result_ttl for individual jobs.""" """Ensure that Worker properly sets result_ttl for individual jobs."""
@ -260,6 +261,6 @@ class TestWorker(RQTestCase):
worker.set_current_job_id(job.id) worker.set_current_job_id(job.id)
self.assertEqual( self.assertEqual(
worker.get_current_job_id(), worker.get_current_job_id(),
self.testconn.hget(worker.key, 'current_job') as_text(self.testconn.hget(worker.key, 'current_job'))
) )
self.assertEqual(worker.get_current_job(), job) self.assertEqual(worker.get_current_job(), job)

Loading…
Cancel
Save