@ -8,13 +8,16 @@ from time import sleep
import signal
import time
from multiprocessing import Process
import subprocess
from tests import RQTestCase , slow
from tests . fixtures import ( create_file , create_file_after_timeout ,
div_by_zero , do_nothing , say_hello , say_pid )
div_by_zero , do_nothing , say_hello , say_pid ,
access_self )
from tests . helpers import strip_microseconds
from rq import get_failed_queue , Queue , SimpleWorker , Worker
from rq import ( get_failed_queue , Queue , SimpleWorker , Worker ,
get_current_connection )
from rq . compat import as_text , PY2
from rq . job import Job , JobStatus
from rq . registry import StartedJobRegistry
@ -82,12 +85,16 @@ class TestWorker(RQTestCase):
""" Worker processes work, then quits. """
fooq , barq = Queue ( ' foo ' ) , Queue ( ' bar ' )
w = Worker ( [ fooq , barq ] )
self . assertEqual ( w . work ( burst = True ) , False ,
' Did not expect any work on the queue. ' )
self . assertEqual (
w . work ( burst = True ) , False ,
' Did not expect any work on the queue. '
)
fooq . enqueue ( say_hello , name = ' Frank ' )
self . assertEqual ( w . work ( burst = True ) , True ,
' Expected at least some work done. ' )
self . assertEqual (
w . work ( burst = True ) , True ,
' Expected at least some work done. '
)
def test_worker_ttl ( self ) :
""" Worker ttl. """
@ -102,8 +109,10 @@ class TestWorker(RQTestCase):
q = Queue ( ' foo ' )
w = Worker ( [ q ] )
job = q . enqueue ( ' tests.fixtures.say_hello ' , name = ' Frank ' )
self . assertEqual ( w . work ( burst = True ) , True ,
' Expected at least some work done. ' )
self . assertEqual (
w . work ( burst = True ) , True ,
' Expected at least some work done. '
)
self . assertEqual ( job . result , ' Hi there, Frank! ' )
def test_job_times ( self ) :
@ -116,14 +125,25 @@ class TestWorker(RQTestCase):
self . assertIsNotNone ( job . enqueued_at )
self . assertIsNone ( job . started_at )
self . assertIsNone ( job . ended_at )
self . assertEqual ( w . work ( burst = True ) , True ,
' Expected at least some work done. ' )
self . assertEqual (
w . work ( burst = True ) , True ,
' Expected at least some work done. '
)
self . assertEqual ( job . result , ' Hi there, Stranger! ' )
after = utcnow ( )
job . refresh ( )
self . assertTrue ( before < = job . enqueued_at < = after , ' Not %s <= %s <= %s ' % ( before , job . enqueued_at , after ) )
self . assertTrue ( before < = job . started_at < = after , ' Not %s <= %s <= %s ' % ( before , job . started_at , after ) )
self . assertTrue ( before < = job . ended_at < = after , ' Not %s <= %s <= %s ' % ( before , job . ended_at , after ) )
self . assertTrue (
before < = job . enqueued_at < = after ,
' Not %s <= %s <= %s ' % ( before , job . enqueued_at , after )
)
self . assertTrue (
before < = job . started_at < = after ,
' Not %s <= %s <= %s ' % ( before , job . started_at , after )
)
self . assertTrue (
before < = job . ended_at < = after ,
' Not %s <= %s <= %s ' % ( before , job . ended_at , after )
)
def test_work_is_unreadable ( self ) :
""" Unreadable jobs are put on the failed queue. """
@ -557,8 +577,8 @@ def kill_worker(pid, double_kill):
class TestWorkerShutdown ( RQTestCase ) :
def setUp ( self ) :
# we want tests to fail if signal are ignored and the work remain running,
# so set a signal to kill them after 5 seconds
# we want tests to fail if signal are ignored and the work remain
# running, so set a signal to kill them after 5 seconds
signal . signal ( signal . SIGALRM , self . _timeout )
signal . alarm ( 5 )
@ -621,3 +641,35 @@ class TestWorkerShutdown(RQTestCase):
shutdown_requested_date = w . shutdown_requested_date
self . assertIsNotNone ( shutdown_requested_date )
self . assertEqual ( type ( shutdown_requested_date ) . __name__ , ' datetime ' )
def schedule_access_self ( ) :
q = Queue ( ' default ' , connection = get_current_connection ( ) )
q . enqueue ( access_self )
class TestWorkerSubprocess ( RQTestCase ) :
def setUp ( self ) :
super ( TestWorkerSubprocess , self ) . setUp ( )
db_num = self . testconn . connection_pool . connection_kwargs [ ' db ' ]
self . redis_url = ' redis://127.0.0.1:6379/ %d ' % db_num
def test_run_empty_queue ( self ) :
""" Run the worker in its own process with an empty queue """
subprocess . check_call ( [ ' rqworker ' , ' -u ' , self . redis_url , ' -b ' ] )
def test_run_access_self ( self ) :
""" Schedule a job, then run the worker as subprocess """
q = Queue ( )
q . enqueue ( access_self )
subprocess . check_call ( [ ' rqworker ' , ' -u ' , self . redis_url , ' -b ' ] )
assert get_failed_queue ( ) . count == 0
assert q . count == 0
def test_run_scheduled_access_self ( self ) :
""" Schedule a job that schedules a job, then run the worker as subprocess """
q = Queue ( )
q . enqueue ( schedule_access_self )
subprocess . check_call ( [ ' rqworker ' , ' -u ' , self . redis_url , ' -b ' ] )
assert get_failed_queue ( ) . count == 0
assert q . count == 0