import os from tests import RQTestCase from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file from tests.helpers import strip_milliseconds from rq import Queue, Worker, Job class TestWorker(RQTestCase): def test_create_worker(self): """Worker creation.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) self.assertEquals(w.queues, [fooq, barq]) def test_work_and_quit(self): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" q = Queue() failed_q = Queue('failed') self.assertEquals(failed_q.count, 0) self.assertEquals(q.count, 0) # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. job = Job.create(div_by_zero, 3) job.save() data = self.testconn.hget(job.key, 'data') invalid_data = data.replace('failing_job', 'nonexisting_job') self.testconn.hset(job.key, 'data', invalid_data) # We use the low-level internal function to enqueue any data (bypassing # validity checks) q.push_job_id(job.id) self.assertEquals(q.count, 1) # All set, we're going to process it w = Worker([q]) w.work(burst=True) # should silently pass self.assertEquals(q.count, 0) self.assertEquals(failed_q.count, 1) def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue() failed_q = Queue('failed') # Preconditions self.assertEquals(failed_q.count, 0) self.assertEquals(q.count, 0) # Action job = q.enqueue(div_by_zero) self.assertEquals(q.count, 1) # keep for later enqueued_at_date = strip_milliseconds(job.enqueued_at) w = Worker([q]) w.work(burst=True) # should silently pass # Postconditions self.assertEquals(q.count, 0) self.assertEquals(failed_q.count, 1) # Check the job job = Job.fetch(job.id) self.assertEquals(job.origin, q.name) # should be the original enqueued_at date, not the date of enqueueing to # the failed queue self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info def test_cancelled_jobs_arent_executed(self): """Cancelling jobs.""" SENTINEL_FILE = '/tmp/rq-tests.txt' try: # Remove the sentinel if it is leftover from a previous test run os.remove(SENTINEL_FILE) except OSError as e: if e.errno != 2: raise q = Queue() result = q.enqueue(create_file, SENTINEL_FILE) # Here, we cancel the job, so the sentinel file may not be created assert q.count == 1 result.cancel() assert q.count == 1 w = Worker([q]) w.work(burst=True) assert q.count == 0 # Should not have created evidence of execution self.assertEquals(os.path.exists(SENTINEL_FILE), False) def test_cleaning_up_of_jobs(self): """Jobs get cleaned up after successful execution.""" q = Queue() job_with_rv = q.enqueue(say_hello, 'Franklin') job_without_rv = q.enqueue(do_nothing) # Job hashes exists self.assertEquals(self.testconn.type(job_with_rv.key), 'hash') self.assertEquals(self.testconn.type(job_without_rv.key), 'hash') # Execute the job w = Worker([q]) w.work(burst=True) # Jobs with results expire after a certain TTL, while jobs without # results are immediately removed assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.exists(job_without_rv.key) == False