|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import (absolute_import, division, print_function,
|
|
|
|
unicode_literals)
|
|
|
|
|
|
|
|
import os
|
|
|
|
from time import sleep
|
|
|
|
|
|
|
|
from tests import RQTestCase, slow
|
|
|
|
from tests.fixtures import (create_file, create_file_after_timeout,
|
|
|
|
div_by_zero, do_nothing, say_hello, say_pid)
|
|
|
|
from tests.helpers import strip_microseconds
|
|
|
|
|
|
|
|
from rq import get_failed_queue, Queue, SimpleWorker, Worker
|
|
|
|
from rq.compat import as_text
|
|
|
|
from rq.exceptions import NoQueueError
|
|
|
|
from rq.job import Job, JobStatus
|
|
|
|
from rq.registry import StartedJobRegistry
|
|
|
|
from rq.suspension import resume, suspend
|
|
|
|
|
|
|
|
|
|
|
|
class CustomJob(Job):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class TestWorker(RQTestCase):
|
|
|
|
def test_create_worker(self):
|
|
|
|
"""Worker creation."""
|
|
|
|
fooq, barq = Queue('foo'), Queue('bar')
|
|
|
|
w = Worker([fooq, barq])
|
|
|
|
self.assertEquals(w.queues, [fooq, barq])
|
|
|
|
|
|
|
|
def test_create_worker_args_single_queue(self):
|
|
|
|
"""Test Worker creation with single queue instance arg"""
|
|
|
|
fooq = Queue('foo')
|
|
|
|
w = Worker(fooq)
|
|
|
|
self.assertEquals(w.queue_keys(), ['rq:queue:foo'])
|
|
|
|
|
|
|
|
def test_create_worker_args_single_string(self):
|
|
|
|
""" Test Worker creation with single string arg"""
|
|
|
|
w = Worker('foo')
|
|
|
|
self.assertEquals(w.queue_keys(),['rq:queue:foo'])
|
|
|
|
|
|
|
|
def test_create_worker_args_iterable_strings(self):
|
|
|
|
""" Test Worker creation with iterable of strings"""
|
|
|
|
w = Worker(['foo', 'bar'])
|
|
|
|
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
|
|
|
|
|
|
|
|
def test_create_worker_args_iterable_queues(self):
|
|
|
|
""" Test Worker test worker creation
|
|
|
|
with an iterable of queue instance args"""
|
|
|
|
w = Worker(map(Queue, ['foo', 'bar']))
|
|
|
|
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
|
|
|
|
|
|
|
|
def test_create_worker_args_list_map(self):
|
|
|
|
""" Test Worker test worker creation
|
|
|
|
with a list of queue from map"""
|
|
|
|
w = Worker(list(map(Queue, ['foo', 'bar'])))
|
|
|
|
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
|
|
|
|
|
|
|
|
def test_create_worker_raises_noqueue_error(self):
|
|
|
|
""" make sure raises noqueue error if a
|
|
|
|
a non string or queue is passed"""
|
|
|
|
with self.assertRaises(NoQueueError):
|
|
|
|
w = Worker([1])
|
|
|
|
|
|
|
|
def test_work_and_quit(self):
|
|
|
|
"""Worker processes work, then quits."""
|
|
|
|
fooq, barq = Queue('foo'), Queue('bar')
|
|
|
|
w = Worker([fooq, barq])
|
|
|
|
self.assertEquals(w.work(burst=True), False,
|
|
|
|
'Did not expect any work on the queue.')
|
|
|
|
|
|
|
|
fooq.enqueue(say_hello, name='Frank')
|
|
|
|
self.assertEquals(w.work(burst=True), True,
|
|
|
|
'Expected at least some work done.')
|
|
|
|
|
|
|
|
def test_worker_ttl(self):
|
|
|
|
"""Worker ttl."""
|
|
|
|
w = Worker([])
|
|
|
|
w.register_birth() # ugly: our test should only call public APIs
|
|
|
|
[worker_key] = self.testconn.smembers(Worker.redis_workers_keys)
|
|
|
|
self.assertIsNotNone(self.testconn.ttl(worker_key))
|
|
|
|
w.register_death()
|
|
|
|
|
|
|
|
def test_work_via_string_argument(self):
|
|
|
|
"""Worker processes work fed via string arguments."""
|
|
|
|
q = Queue('foo')
|
|
|
|
w = Worker([q])
|
|
|
|
job = q.enqueue('tests.fixtures.say_hello', name='Frank')
|
|
|
|
self.assertEquals(w.work(burst=True), True,
|
|
|
|
'Expected at least some work done.')
|
|
|
|
self.assertEquals(job.result, 'Hi there, Frank!')
|
|
|
|
|
|
|
|
def test_work_is_unreadable(self):
|
|
|
|
"""Unreadable jobs are put on the failed queue."""
|
|
|
|
q = Queue()
|
New connection management.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
13 years ago
|
|
|
failed_q = get_failed_queue()
|
|
|
|
|
|
|
|
self.assertEquals(failed_q.count, 0)
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
|
|
|
# NOTE: We have to fake this enqueueing for this test case.
|
|
|
|
# What we're simulating here is a call to a function that is not
|
|
|
|
# importable from the worker process.
|
|
|
|
job = Job.create(func=div_by_zero, args=(3,))
|
|
|
|
job.save()
|
|
|
|
data = self.testconn.hget(job.key, 'data')
|
|
|
|
invalid_data = data.replace(b'div_by_zero', b'nonexisting')
|
|
|
|
assert data != invalid_data
|
|
|
|
self.testconn.hset(job.key, 'data', invalid_data)
|
|
|
|
|
|
|
|
# We use the low-level internal function to enqueue any data (bypassing
|
|
|
|
# validity checks)
|
|
|
|
q.push_job_id(job.id)
|
|
|
|
|
|
|
|
self.assertEquals(q.count, 1)
|
|
|
|
|
|
|
|
# All set, we're going to process it
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True) # should silently pass
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
self.assertEquals(failed_q.count, 1)
|
|
|
|
|
|
|
|
def test_work_fails(self):
|
|
|
|
"""Failing jobs are put on the failed queue."""
|
|
|
|
q = Queue()
|
New connection management.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
13 years ago
|
|
|
failed_q = get_failed_queue()
|
|
|
|
|
|
|
|
# Preconditions
|
|
|
|
self.assertEquals(failed_q.count, 0)
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
|
|
|
# Action
|
|
|
|
job = q.enqueue(div_by_zero)
|
|
|
|
self.assertEquals(q.count, 1)
|
|
|
|
|
|
|
|
# keep for later
|
|
|
|
enqueued_at_date = strip_microseconds(job.enqueued_at)
|
|
|
|
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True) # should silently pass
|
|
|
|
|
|
|
|
# Postconditions
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
self.assertEquals(failed_q.count, 1)
|
|
|
|
|
|
|
|
# Check the job
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEquals(job.origin, q.name)
|
|
|
|
|
|
|
|
# Should be the original enqueued_at date, not the date of enqueueing
|
|
|
|
# to the failed queue
|
|
|
|
self.assertEquals(job.enqueued_at, enqueued_at_date)
|
|
|
|
self.assertIsNotNone(job.exc_info) # should contain exc_info
|
|
|
|
|
|
|
|
def test_custom_exc_handling(self):
|
|
|
|
"""Custom exception handling."""
|
|
|
|
def black_hole(job, *exc_info):
|
|
|
|
# Don't fall through to default behaviour (moving to failed queue)
|
|
|
|
return False
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
failed_q = get_failed_queue()
|
|
|
|
|
|
|
|
# Preconditions
|
|
|
|
self.assertEquals(failed_q.count, 0)
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
|
|
|
# Action
|
|
|
|
job = q.enqueue(div_by_zero)
|
|
|
|
self.assertEquals(q.count, 1)
|
|
|
|
|
|
|
|
w = Worker([q], exc_handler=black_hole)
|
|
|
|
w.work(burst=True) # should silently pass
|
|
|
|
|
|
|
|
# Postconditions
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
self.assertEquals(failed_q.count, 0)
|
|
|
|
|
|
|
|
# Check the job
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEquals(job.is_failed, True)
|
|
|
|
|
|
|
|
def test_cancelled_jobs_arent_executed(self): # noqa
|
|
|
|
"""Cancelling jobs."""
|
|
|
|
|
|
|
|
SENTINEL_FILE = '/tmp/rq-tests.txt'
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Remove the sentinel if it is leftover from a previous test run
|
|
|
|
os.remove(SENTINEL_FILE)
|
|
|
|
except OSError as e:
|
|
|
|
if e.errno != 2:
|
|
|
|
raise
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
job = q.enqueue(create_file, SENTINEL_FILE)
|
|
|
|
|
|
|
|
# Here, we cancel the job, so the sentinel file may not be created
|
|
|
|
self.testconn.delete(job.key)
|
|
|
|
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True)
|
|
|
|
assert q.count == 0
|
|
|
|
|
|
|
|
# Should not have created evidence of execution
|
|
|
|
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
|
|
|
|
|
|
|
|
@slow # noqa
|
|
|
|
def test_timeouts(self):
|
|
|
|
"""Worker kills jobs after timeout."""
|
|
|
|
sentinel_file = '/tmp/.rq_sentinel'
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
# Put it on the queue with a timeout value
|
|
|
|
res = q.enqueue(create_file_after_timeout,
|
|
|
|
args=(sentinel_file, 4),
|
|
|
|
timeout=1)
|
|
|
|
|
|
|
|
try:
|
|
|
|
os.unlink(sentinel_file)
|
|
|
|
except OSError as e:
|
|
|
|
if e.errno == 2:
|
|
|
|
pass
|
|
|
|
|
|
|
|
self.assertEquals(os.path.exists(sentinel_file), False)
|
|
|
|
w.work(burst=True)
|
|
|
|
self.assertEquals(os.path.exists(sentinel_file), False)
|
|
|
|
|
|
|
|
# TODO: Having to do the manual refresh() here is really ugly!
|
|
|
|
res.refresh()
|
|
|
|
self.assertIn('JobTimeoutException', as_text(res.exc_info))
|
|
|
|
|
|
|
|
def test_worker_sets_result_ttl(self):
|
|
|
|
"""Ensure that Worker properly sets result_ttl for individual jobs."""
|
|
|
|
q = Queue()
|
|
|
|
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True)
|
|
|
|
self.assertNotEqual(self.testconn._ttl(job.key), 0)
|
|
|
|
|
|
|
|
# Job with -1 result_ttl don't expire
|
|
|
|
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True)
|
|
|
|
self.assertEqual(self.testconn._ttl(job.key), -1)
|
|
|
|
|
|
|
|
# Job with result_ttl = 0 gets deleted immediately
|
|
|
|
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
|
|
|
|
w = Worker([q])
|
|
|
|
w.work(burst=True)
|
|
|
|
self.assertEqual(self.testconn.get(job.key), None)
|
|
|
|
|
|
|
|
def test_worker_sets_job_status(self):
|
|
|
|
"""Ensure that worker correctly sets job status."""
|
|
|
|
q = Queue()
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
job = q.enqueue(say_hello)
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.QUEUED)
|
|
|
|
self.assertEqual(job.is_queued, True)
|
|
|
|
self.assertEqual(job.is_finished, False)
|
|
|
|
self.assertEqual(job.is_failed, False)
|
|
|
|
|
|
|
|
w.work(burst=True)
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.FINISHED)
|
|
|
|
self.assertEqual(job.is_queued, False)
|
|
|
|
self.assertEqual(job.is_finished, True)
|
|
|
|
self.assertEqual(job.is_failed, False)
|
|
|
|
|
|
|
|
# Failed jobs should set status to "failed"
|
|
|
|
job = q.enqueue(div_by_zero, args=(1,))
|
|
|
|
w.work(burst=True)
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.FAILED)
|
|
|
|
self.assertEqual(job.is_queued, False)
|
|
|
|
self.assertEqual(job.is_finished, False)
|
|
|
|
self.assertEqual(job.is_failed, True)
|
|
|
|
|
|
|
|
def test_job_dependency(self):
|
|
|
|
"""Enqueue dependent jobs only if their parents don't fail"""
|
|
|
|
q = Queue()
|
|
|
|
w = Worker([q])
|
|
|
|
parent_job = q.enqueue(say_hello)
|
|
|
|
job = q.enqueue_call(say_hello, depends_on=parent_job)
|
|
|
|
w.work(burst=True)
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.FINISHED)
|
|
|
|
|
|
|
|
parent_job = q.enqueue(div_by_zero)
|
|
|
|
job = q.enqueue_call(say_hello, depends_on=parent_job)
|
|
|
|
w.work(burst=True)
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
|
|
|
|
|
|
|
|
def test_get_current_job(self):
|
|
|
|
"""Ensure worker.get_current_job() works properly"""
|
|
|
|
q = Queue()
|
|
|
|
worker = Worker([q])
|
|
|
|
job = q.enqueue_call(say_hello)
|
|
|
|
|
|
|
|
self.assertEqual(self.testconn.hget(worker.key, 'current_job'), None)
|
|
|
|
worker.set_current_job_id(job.id)
|
|
|
|
self.assertEqual(
|
|
|
|
worker.get_current_job_id(),
|
|
|
|
as_text(self.testconn.hget(worker.key, 'current_job'))
|
|
|
|
)
|
|
|
|
self.assertEqual(worker.get_current_job(), job)
|
|
|
|
|
|
|
|
def test_custom_job_class(self):
|
|
|
|
"""Ensure Worker accepts custom job class."""
|
|
|
|
q = Queue()
|
|
|
|
worker = Worker([q], job_class=CustomJob)
|
|
|
|
self.assertEqual(worker.job_class, CustomJob)
|
|
|
|
|
|
|
|
def test_work_via_simpleworker(self):
|
|
|
|
"""Worker processes work, with forking disabled,
|
|
|
|
then returns."""
|
|
|
|
fooq, barq = Queue('foo'), Queue('bar')
|
|
|
|
w = SimpleWorker([fooq, barq])
|
|
|
|
self.assertEquals(w.work(burst=True), False,
|
|
|
|
'Did not expect any work on the queue.')
|
|
|
|
|
|
|
|
job = fooq.enqueue(say_pid)
|
|
|
|
self.assertEquals(w.work(burst=True), True,
|
|
|
|
'Expected at least some work done.')
|
|
|
|
self.assertEquals(job.result, os.getpid(),
|
|
|
|
'PID mismatch, fork() is not supposed to happen here')
|
|
|
|
|
|
|
|
def test_prepare_job_execution(self):
|
|
|
|
"""Prepare job execution does the necessary bookkeeping."""
|
|
|
|
queue = Queue(connection=self.testconn)
|
|
|
|
job = queue.enqueue(say_hello)
|
|
|
|
worker = Worker([queue])
|
|
|
|
worker.prepare_job_execution(job)
|
|
|
|
|
|
|
|
# Updates working queue
|
|
|
|
registry = StartedJobRegistry(connection=self.testconn)
|
|
|
|
self.assertEqual(registry.get_job_ids(), [job.id])
|
|
|
|
|
|
|
|
# Updates worker statuses
|
|
|
|
self.assertEqual(worker.state, 'busy')
|
|
|
|
self.assertEqual(worker.get_current_job_id(), job.id)
|
|
|
|
|
|
|
|
def test_work_unicode_friendly(self):
|
|
|
|
"""Worker processes work with unicode description, then quits."""
|
|
|
|
q = Queue('foo')
|
|
|
|
w = Worker([q])
|
|
|
|
job = q.enqueue('tests.fixtures.say_hello', name='Adam',
|
|
|
|
description='你好 世界!')
|
|
|
|
self.assertEquals(w.work(burst=True), True,
|
|
|
|
'Expected at least some work done.')
|
|
|
|
self.assertEquals(job.result, 'Hi there, Adam!')
|
|
|
|
self.assertEquals(job.description, '你好 世界!')
|
|
|
|
|
|
|
|
def test_suspend_worker_execution(self):
|
|
|
|
"""Test Pause Worker Execution"""
|
|
|
|
|
|
|
|
SENTINEL_FILE = '/tmp/rq-tests.txt'
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Remove the sentinel if it is leftover from a previous test run
|
|
|
|
os.remove(SENTINEL_FILE)
|
|
|
|
except OSError as e:
|
|
|
|
if e.errno != 2:
|
|
|
|
raise
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
q.enqueue(create_file, SENTINEL_FILE)
|
|
|
|
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
suspend(self.testconn)
|
|
|
|
|
|
|
|
w.work(burst=True)
|
|
|
|
assert q.count == 1
|
|
|
|
|
|
|
|
# Should not have created evidence of execution
|
|
|
|
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
|
|
|
|
|
|
|
|
resume(self.testconn)
|
|
|
|
w.work(burst=True)
|
|
|
|
assert q.count == 0
|
|
|
|
self.assertEquals(os.path.exists(SENTINEL_FILE), True)
|
|
|
|
|
|
|
|
def test_suspend_with_duration(self):
|
|
|
|
q = Queue()
|
|
|
|
for _ in range(5):
|
|
|
|
q.enqueue(do_nothing)
|
|
|
|
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
# This suspends workers for working for 2 second
|
|
|
|
suspend(self.testconn, 2)
|
|
|
|
|
|
|
|
# So when this burst of work happens the queue should remain at 5
|
|
|
|
w.work(burst=True)
|
|
|
|
assert q.count == 5
|
|
|
|
|
|
|
|
sleep(3)
|
|
|
|
|
|
|
|
# The suspension should be expired now, and a burst of work should now clear the queue
|
|
|
|
w.work(burst=True)
|
|
|
|
assert q.count == 0
|
|
|
|
|
|
|
|
def test_worker_hash_(self):
|
|
|
|
"""Workers are hashed by their .name attribute"""
|
|
|
|
q = Queue('foo')
|
|
|
|
w1 = Worker([q], name="worker1")
|
|
|
|
w2 = Worker([q], name="worker2")
|
|
|
|
w3 = Worker([q], name="worker1")
|
|
|
|
worker_set = set([w1, w2, w3])
|
|
|
|
self.assertEquals(len(worker_set), 2)
|
|
|
|
|
|
|
|
def test_worker_sets_birth(self):
|
|
|
|
"""Ensure worker correctly sets worker birth date."""
|
|
|
|
q = Queue()
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
w.register_birth()
|
|
|
|
|
|
|
|
birth_date = w.birth_date
|
|
|
|
self.assertIsNotNone(birth_date)
|
|
|
|
self.assertEquals(type(birth_date).__name__, 'datetime')
|
|
|
|
|
|
|
|
def test_worker_sets_death(self):
|
|
|
|
"""Ensure worker correctly sets worker death date."""
|
|
|
|
q = Queue()
|
|
|
|
w = Worker([q])
|
|
|
|
|
|
|
|
w.register_death()
|
|
|
|
|
|
|
|
death_date = w.death_date
|
|
|
|
self.assertIsNotNone(death_date)
|
|
|
|
self.assertEquals(type(death_date).__name__, 'datetime')
|