fix tests

syntax: assertEquals -> assertEqual, assertNotEquals -> assertNotEqual
usage: status of worker and job now will use get/set method instead of property method
main
ahxxm 9 years ago
parent 91177b7317
commit b06f112cb0

@ -88,7 +88,7 @@ class StartedJobRegistry(BaseRegistry):
for job_id in job_ids:
try:
job = Job.fetch(job_id, connection=self.connection)
job.status = JobStatus.FAILED
job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:

@ -86,13 +86,13 @@ class TestRQCli(RQTestCase):
# If exception handler is not given, failed job goes to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEquals(failed_q.count, 1)
self.assertEqual(failed_q.count, 1)
# Black hole exception handler doesn't add failed jobs to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--exception-handler', 'tests.fixtures.black_hole'])
self.assertEquals(failed_q.count, 1)
self.assertEqual(failed_q.count, 1)
def test_suspend_and_resume(self):
"""rq suspend -u <url>

@ -16,7 +16,7 @@ class TestConnectionInheritance(RQTestCase):
def test_connection_detection(self):
"""Automatic detection of the connection."""
q = Queue()
self.assertEquals(q.connection, self.testconn)
self.assertEqual(q.connection, self.testconn)
def test_connection_stacking(self):
"""Connection stacking."""
@ -27,7 +27,7 @@ class TestConnectionInheritance(RQTestCase):
q1 = Queue()
with Connection(conn2):
q2 = Queue()
self.assertNotEquals(q1.connection, q2.connection)
self.assertNotEqual(q1.connection, q2.connection)
def test_connection_pass_thru(self):
"""Connection passed through from queues to jobs."""
@ -36,5 +36,5 @@ class TestConnectionInheritance(RQTestCase):
q2 = Queue()
job1 = q1.enqueue(do_nothing)
job2 = q2.enqueue(do_nothing)
self.assertEquals(q1.connection, job1.connection)
self.assertEquals(q2.connection, job2.connection)
self.assertEqual(q1.connection, job1.connection)
self.assertEqual(q2.connection, job2.connection)

@ -38,7 +38,7 @@ class TestJob(RQTestCase):
# Python 2
expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
self.assertEquals(
self.assertEqual(
job.description,
expected_string,
)
@ -78,9 +78,9 @@ class TestJob(RQTestCase):
self.assertIsNone(job.instance)
# Job data is set...
self.assertEquals(job.func, fixtures.some_calculation)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2})
self.assertEqual(job.func, fixtures.some_calculation)
self.assertEqual(job.args, (3, 4))
self.assertEqual(job.kwargs, {'z': 2})
# ...but metadata is not
self.assertIsNone(job.origin)
@ -93,26 +93,26 @@ class TestJob(RQTestCase):
job = Job.create(func=n.div, args=(4,))
# Job data is set
self.assertEquals(job.func, n.div)
self.assertEquals(job.instance, n)
self.assertEquals(job.args, (4,))
self.assertEqual(job.func, n.div)
self.assertEqual(job.instance, n)
self.assertEqual(job.args, (4,))
def test_create_job_from_string_function(self):
"""Creation of jobs using string specifier."""
job = Job.create(func='tests.fixtures.say_hello', args=('World',))
# Job data is set
self.assertEquals(job.func, fixtures.say_hello)
self.assertEqual(job.func, fixtures.say_hello)
self.assertIsNone(job.instance)
self.assertEquals(job.args, ('World',))
self.assertEqual(job.args, ('World',))
def test_create_job_from_callable_class(self):
"""Creation of jobs using a callable class specifier."""
kallable = fixtures.CallableObject()
job = Job.create(func=kallable)
self.assertEquals(job.func, kallable.__call__)
self.assertEquals(job.instance, kallable)
self.assertEqual(job.func, kallable.__call__)
self.assertEqual(job.instance, kallable)
def test_job_properties_set_data_property(self):
"""Data property gets derived from the job tuple."""
@ -120,33 +120,33 @@ class TestJob(RQTestCase):
job.func_name = 'foo'
fname, instance, args, kwargs = loads(job.data)
self.assertEquals(fname, job.func_name)
self.assertEquals(instance, None)
self.assertEquals(args, ())
self.assertEquals(kwargs, {})
self.assertEqual(fname, job.func_name)
self.assertEqual(instance, None)
self.assertEqual(args, ())
self.assertEqual(kwargs, {})
def test_data_property_sets_job_properties(self):
"""Job tuple gets derived lazily from data property."""
job = Job()
job.data = dumps(('foo', None, (1, 2, 3), {'bar': 'qux'}))
self.assertEquals(job.func_name, 'foo')
self.assertEquals(job.instance, None)
self.assertEquals(job.args, (1, 2, 3))
self.assertEquals(job.kwargs, {'bar': 'qux'})
self.assertEqual(job.func_name, 'foo')
self.assertEqual(job.instance, None)
self.assertEqual(job.args, (1, 2, 3))
self.assertEqual(job.kwargs, {'bar': 'qux'})
def test_save(self): # noqa
"""Storing jobs."""
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False)
self.assertEqual(self.testconn.exists(job.key), False)
job.save()
self.assertEquals(self.testconn.type(job.key), b'hash')
self.assertEqual(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation')
self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')
def test_fetch(self):
"""Fetching jobs."""
@ -158,12 +158,12 @@ class TestJob(RQTestCase):
# Fetch returns a job
job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func_name, 'tests.fixtures.some_calculation')
self.assertEqual(job.id, 'some_id')
self.assertEqual(job.func_name, 'tests.fixtures.some_calculation')
self.assertIsNone(job.instance)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
self.assertEqual(job.args, (3, 4))
self.assertEqual(job.kwargs, dict(z=2))
self.assertEqual(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
def test_persistence_of_empty_jobs(self): # noqa
"""Storing empty jobs."""
@ -178,7 +178,7 @@ class TestJob(RQTestCase):
expected_date = strip_microseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8')
self.assertEquals(
self.assertEqual(
stored_date,
utcformat(expected_date))
@ -209,12 +209,12 @@ class TestJob(RQTestCase):
job.save()
job2 = Job.fetch(job.id)
self.assertEquals(job.func, job2.func)
self.assertEquals(job.args, job2.args)
self.assertEquals(job.kwargs, job2.kwargs)
self.assertEqual(job.func, job2.func)
self.assertEqual(job.args, job2.args)
self.assertEqual(job.kwargs, job2.kwargs)
# Mathematical equation
self.assertEquals(job, job2)
self.assertEqual(job, job2)
def test_fetching_can_fail(self):
"""Fetching fails for non-existing jobs."""

@ -21,12 +21,12 @@ class TestQueue(RQTestCase):
def test_create_queue(self):
"""Creating queues."""
q = Queue('my-queue')
self.assertEquals(q.name, 'my-queue')
self.assertEqual(q.name, 'my-queue')
def test_create_default_queue(self):
"""Instantiating the default queue."""
q = Queue()
self.assertEquals(q.name, 'default')
self.assertEqual(q.name, 'default')
def test_equality(self):
"""Mathematical equality of queues."""
@ -34,10 +34,10 @@ class TestQueue(RQTestCase):
q2 = Queue('foo')
q3 = Queue('bar')
self.assertEquals(q1, q2)
self.assertEquals(q2, q1)
self.assertNotEquals(q1, q3)
self.assertNotEquals(q2, q3)
self.assertEqual(q1, q2)
self.assertEqual(q2, q1)
self.assertNotEqual(q1, q3)
self.assertNotEqual(q2, q3)
def test_empty_queue(self):
"""Emptying queues."""
@ -45,11 +45,11 @@ class TestQueue(RQTestCase):
self.testconn.rpush('rq:queue:example', 'foo')
self.testconn.rpush('rq:queue:example', 'bar')
self.assertEquals(q.is_empty(), False)
self.assertEqual(q.is_empty(), False)
q.empty()
self.assertEquals(q.is_empty(), True)
self.assertEqual(q.is_empty(), True)
self.assertIsNone(self.testconn.lpop('rq:queue:example'))
def test_empty_removes_jobs(self):
@ -63,10 +63,10 @@ class TestQueue(RQTestCase):
def test_queue_is_empty(self):
"""Detecting empty queues."""
q = Queue('example')
self.assertEquals(q.is_empty(), True)
self.assertEqual(q.is_empty(), True)
self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.is_empty(), False)
self.assertEqual(q.is_empty(), False)
def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
@ -113,7 +113,7 @@ class TestQueue(RQTestCase):
def test_enqueue(self):
"""Enqueueing job onto queues."""
q = Queue()
self.assertEquals(q.is_empty(), True)
self.assertEqual(q.is_empty(), True)
# say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar')
@ -122,8 +122,8 @@ class TestQueue(RQTestCase):
# Inspect data inside Redis
q_key = 'rq:queue:default'
self.assertEquals(self.testconn.llen(q_key), 1)
self.assertEquals(
self.assertEqual(self.testconn.llen(q_key), 1)
self.assertEqual(
self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'),
job_id)
@ -149,11 +149,11 @@ class TestQueue(RQTestCase):
q.push_job_id(uuid)
# Pop it off the queue...
self.assertEquals(q.count, 1)
self.assertEquals(q.pop_job_id(), uuid)
self.assertEqual(q.count, 1)
self.assertEqual(q.pop_job_id(), uuid)
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
self.assertEqual(q.count, 0)
def test_dequeue(self):
"""Dequeueing jobs from queues."""
@ -162,16 +162,16 @@ class TestQueue(RQTestCase):
result = q.enqueue(say_hello, 'Rick', foo='bar')
# Dequeue a job (not a job ID) off the queue
self.assertEquals(q.count, 1)
self.assertEqual(q.count, 1)
job = q.dequeue()
self.assertEquals(job.id, result.id)
self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, q.name)
self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar')
self.assertEqual(job.id, result.id)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, q.name)
self.assertEqual(job.args[0], 'Rick')
self.assertEqual(job.kwargs['foo'], 'bar')
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
self.assertEqual(q.count, 0)
def test_dequeue_deleted_jobs(self):
"""Dequeueing deleted jobs from queues don't blow the stack."""
@ -191,9 +191,9 @@ class TestQueue(RQTestCase):
# The instance has been pickled and unpickled, so it is now a separate
# object. Test for equality using each object's __dict__ instead.
self.assertEquals(job.instance.__dict__, n.__dict__)
self.assertEquals(job.func.__name__, 'div')
self.assertEquals(job.args, (4,))
self.assertEqual(job.instance.__dict__, n.__dict__)
self.assertEqual(job.func.__name__, 'div')
self.assertEqual(job.args, (4,))
def test_dequeue_class_method(self):
"""Dequeueing class method jobs from queues."""
@ -202,9 +202,9 @@ class TestQueue(RQTestCase):
job = q.dequeue()
self.assertEquals(job.instance.__dict__, Number.__dict__)
self.assertEquals(job.func.__name__, 'divide')
self.assertEquals(job.args, (3, 4))
self.assertEqual(job.instance.__dict__, Number.__dict__)
self.assertEqual(job.func.__name__, 'divide')
self.assertEqual(job.args, (3, 4))
def test_dequeue_ignores_nonexisting_jobs(self):
"""Dequeuing silently ignores non-existing jobs."""
@ -217,40 +217,40 @@ class TestQueue(RQTestCase):
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 4)
self.assertEquals(q.dequeue().id, result.id)
self.assertEqual(q.count, 4)
self.assertEqual(q.dequeue().id, result.id)
self.assertIsNone(q.dequeue())
self.assertEquals(q.count, 0)
self.assertEqual(q.count, 0)
def test_dequeue_any(self):
"""Fetching work from any given queue."""
fooq = Queue('foo')
barq = Queue('bar')
self.assertEquals(Queue.dequeue_any([fooq, barq], None), None)
self.assertEqual(Queue.dequeue_any([fooq, barq], None), None)
# Enqueue a single item
barq.enqueue(say_hello)
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(job.func, say_hello)
self.assertEquals(queue, barq)
self.assertEqual(job.func, say_hello)
self.assertEqual(queue, barq)
# Enqueue items on both queues
barq.enqueue(say_hello, 'for Bar')
fooq.enqueue(say_hello, 'for Foo')
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(queue, fooq)
self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, fooq.name)
self.assertEquals(job.args[0], 'for Foo',
self.assertEqual(queue, fooq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, fooq.name)
self.assertEqual(job.args[0], 'for Foo',
'Foo should be dequeued first.')
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(queue, barq)
self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, barq.name)
self.assertEquals(job.args[0], 'for Bar',
self.assertEqual(queue, barq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, barq.name)
self.assertEqual(job.args[0], 'for Bar',
'Bar should be dequeued second.')
def test_dequeue_any_ignores_nonexisting_jobs(self):
@ -261,10 +261,10 @@ class TestQueue(RQTestCase):
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 1)
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa
self.assertEqual(q.count, 1)
self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa
None)
self.assertEquals(q.count, 0)
self.assertEqual(q.count, 0)
def test_enqueue_sets_status(self):
"""Enqueueing a job sets its status to "queued"."""
@ -305,15 +305,15 @@ class TestQueue(RQTestCase):
q3 = Queue('third-queue')
# Ensure a queue is added only once a job is enqueued
self.assertEquals(len(Queue.all()), 0)
self.assertEqual(len(Queue.all()), 0)
q1.enqueue(say_hello)
self.assertEquals(len(Queue.all()), 1)
self.assertEqual(len(Queue.all()), 1)
# Ensure this holds true for multiple queues
q2.enqueue(say_hello)
q3.enqueue(say_hello)
names = [q.name for q in Queue.all()]
self.assertEquals(len(Queue.all()), 3)
self.assertEqual(len(Queue.all()), 3)
# Verify names
self.assertTrue('first-queue' in names)
@ -325,7 +325,7 @@ class TestQueue(RQTestCase):
w.work(burst=True)
# Queue.all() should still report the empty queues
self.assertEquals(len(Queue.all()), 3)
self.assertEqual(len(Queue.all()), 3)
def test_enqueue_dependents(self):
"""Enqueueing dependent jobs pushes all jobs in the depends set to the queue
@ -447,12 +447,12 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEquals(get_failed_queue().count, 1)
self.assertEqual(get_failed_queue().count, 1)
get_failed_queue().requeue(job.id)
self.assertEquals(get_failed_queue().count, 0)
self.assertEquals(Queue('fake').count, 1)
self.assertEqual(get_failed_queue().count, 0)
self.assertEqual(Queue('fake').count, 1)
def test_requeue_nonfailed_job_fails(self):
"""Requeueing non-failed jobs raises error."""
@ -471,7 +471,7 @@ class TestFailedQueue(RQTestCase):
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
self.assertEquals(job.timeout, 200)
self.assertEqual(job.timeout, 200)
def test_requeueing_preserves_timeout(self):
"""Requeueing preserves job timeout."""
@ -483,7 +483,7 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEquals(job.timeout, 200)
self.assertEqual(job.timeout, 200)
def test_requeue_sets_status_to_queued(self):
"""Requeueing a job should set its status back to QUEUED."""

@ -74,7 +74,7 @@ class TestRegistry(RQTestCase):
self.assertIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
job.refresh()
self.assertEqual(job.status, JobStatus.FAILED)
self.assertEqual(job.get_status(), JobStatus.FAILED)
def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""

@ -24,7 +24,7 @@ class TestSentry(RQTestCase):
# Action
q.enqueue('_non.importable.job')
self.assertEquals(q.count, 1)
self.assertEqual(q.count, 1)
w = Worker([q])
register_sentry(FakeSentry(), w)
@ -32,5 +32,5 @@ class TestSentry(RQTestCase):
w.work(burst=True)
# Postconditions
self.assertEquals(failed_q.count, 1)
self.assertEquals(q.count, 0)
self.assertEqual(failed_q.count, 1)
self.assertEqual(q.count, 0)

@ -32,41 +32,41 @@ class TestWorker(RQTestCase):
# With single string argument
w = Worker('foo')
self.assertEquals(w.queues[0].name, 'foo')
self.assertEqual(w.queues[0].name, 'foo')
# With list of strings
w = Worker(['foo', 'bar'])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With iterable of strings
w = Worker(iter(['foo', 'bar']))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With single Queue
w = Worker(Queue('foo'))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEqual(w.queues[0].name, 'foo')
# With iterable of Queues
w = Worker(iter([Queue('foo'), Queue('bar')]))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With list of Queues
w = Worker([Queue('foo'), Queue('bar')])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False,
self.assertEqual(w.work(burst=True), False,
'Did not expect any work on the queue.')
fooq.enqueue(say_hello, name='Frank')
self.assertEquals(w.work(burst=True), True,
self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.')
def test_worker_ttl(self):
@ -82,17 +82,17 @@ class TestWorker(RQTestCase):
q = Queue('foo')
w = Worker([q])
job = q.enqueue('tests.fixtures.say_hello', name='Frank')
self.assertEquals(w.work(burst=True), True,
self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Frank!')
self.assertEqual(job.result, 'Hi there, Frank!')
def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue."""
q = Queue()
failed_q = get_failed_queue()
self.assertEquals(failed_q.count, 0)
self.assertEquals(q.count, 0)
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
# NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not
@ -108,13 +108,13 @@ class TestWorker(RQTestCase):
# validity checks)
q.push_job_id(job.id)
self.assertEquals(q.count, 1)
self.assertEqual(q.count, 1)
# All set, we're going to process it
w = Worker([q])
w.work(burst=True) # should silently pass
self.assertEquals(q.count, 0)
self.assertEquals(failed_q.count, 1)
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
@ -122,12 +122,12 @@ class TestWorker(RQTestCase):
failed_q = get_failed_queue()
# Preconditions
self.assertEquals(failed_q.count, 0)
self.assertEquals(q.count, 0)
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
# Action
job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1)
self.assertEqual(q.count, 1)
# keep for later
enqueued_at_date = strip_microseconds(job.enqueued_at)
@ -136,16 +136,16 @@ class TestWorker(RQTestCase):
w.work(burst=True) # should silently pass
# Postconditions
self.assertEquals(q.count, 0)
self.assertEquals(failed_q.count, 1)
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
# Check the job
job = Job.fetch(job.id)
self.assertEquals(job.origin, q.name)
self.assertEqual(job.origin, q.name)
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertEqual(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info
def test_custom_exc_handling(self):
@ -158,23 +158,23 @@ class TestWorker(RQTestCase):
failed_q = get_failed_queue()
# Preconditions
self.assertEquals(failed_q.count, 0)
self.assertEquals(q.count, 0)
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
# Action
job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1)
self.assertEqual(q.count, 1)
w = Worker([q], exception_handlers=black_hole)
w.work(burst=True) # should silently pass
# Postconditions
self.assertEquals(q.count, 0)
self.assertEquals(failed_q.count, 0)
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 0)
# Check the job
job = Job.fetch(job.id)
self.assertEquals(job.is_failed, True)
self.assertEqual(job.is_failed, True)
def test_cancelled_jobs_arent_executed(self): # noqa
"""Cancelling jobs."""
@ -199,7 +199,7 @@ class TestWorker(RQTestCase):
assert q.count == 0
# Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
@slow # noqa
def test_timeouts(self):
@ -220,9 +220,9 @@ class TestWorker(RQTestCase):
if e.errno == 2:
pass
self.assertEquals(os.path.exists(sentinel_file), False)
self.assertEqual(os.path.exists(sentinel_file), False)
w.work(burst=True)
self.assertEquals(os.path.exists(sentinel_file), False)
self.assertEqual(os.path.exists(sentinel_file), False)
# TODO: Having to do the manual refresh() here is really ugly!
res.refresh()
@ -316,14 +316,14 @@ class TestWorker(RQTestCase):
then returns."""
fooq, barq = Queue('foo'), Queue('bar')
w = SimpleWorker([fooq, barq])
self.assertEquals(w.work(burst=True), False,
'Did not expect any work on the queue.')
self.assertEqual(w.work(burst=True), False,
'Did not expect any work on the queue.')
job = fooq.enqueue(say_pid)
self.assertEquals(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEquals(job.result, os.getpid(),
'PID mismatch, fork() is not supposed to happen here')
self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEqual(job.result, os.getpid(),
'PID mismatch, fork() is not supposed to happen here')
def test_prepare_job_execution(self):
"""Prepare job execution does the necessary bookkeeping."""
@ -337,7 +337,7 @@ class TestWorker(RQTestCase):
self.assertEqual(registry.get_job_ids(), [job.id])
# Updates worker statuses
self.assertEqual(worker.state, 'busy')
self.assertEqual(worker.get_state(), 'busy')
self.assertEqual(worker.get_current_job_id(), job.id)
def test_work_unicode_friendly(self):
@ -346,10 +346,10 @@ class TestWorker(RQTestCase):
w = Worker([q])
job = q.enqueue('tests.fixtures.say_hello', name='Adam',
description='你好 世界!')
self.assertEquals(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Adam!')
self.assertEquals(job.description, '你好 世界!')
self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEqual(job.result, 'Hi there, Adam!')
self.assertEqual(job.description, '你好 世界!')
def test_suspend_worker_execution(self):
"""Test Pause Worker Execution"""
@ -374,12 +374,12 @@ class TestWorker(RQTestCase):
assert q.count == 1
# Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
resume(self.testconn)
w.work(burst=True)
assert q.count == 0
self.assertEquals(os.path.exists(SENTINEL_FILE), True)
self.assertEqual(os.path.exists(SENTINEL_FILE), True)
def test_suspend_with_duration(self):
q = Queue()
@ -408,7 +408,7 @@ class TestWorker(RQTestCase):
w2 = Worker([q], name="worker2")
w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2)
self.assertEqual(len(worker_set), 2)
def test_worker_sets_birth(self):
"""Ensure worker correctly sets worker birth date."""
@ -419,7 +419,7 @@ class TestWorker(RQTestCase):
birth_date = w.birth_date
self.assertIsNotNone(birth_date)
self.assertEquals(type(birth_date).__name__, 'datetime')
self.assertEqual(type(birth_date).__name__, 'datetime')
def test_worker_sets_death(self):
"""Ensure worker correctly sets worker death date."""
@ -430,7 +430,7 @@ class TestWorker(RQTestCase):
death_date = w.death_date
self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime')
self.assertEqual(type(death_date).__name__, 'datetime')
def test_clean_queue_registries(self):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""

Loading…
Cancel
Save