|  |  |  | @ -1,17 +1,44 @@ | 
		
	
		
			
				|  |  |  |  | from rq.serializers import JSONSerializer | 
		
	
		
			
				|  |  |  |  | import time | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | from multiprocessing import Process | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | from redis import Redis | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | from tests import RQTestCase | 
		
	
		
			
				|  |  |  |  | from tests.fixtures import long_running_job | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | from rq import Queue, Worker | 
		
	
		
			
				|  |  |  |  | from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command | 
		
	
		
			
				|  |  |  |  | from rq.exceptions import InvalidJobOperation, NoSuchJobError | 
		
	
		
			
				|  |  |  |  | from rq.serializers import JSONSerializer | 
		
	
		
			
				|  |  |  |  | from rq.worker import WorkerStatus | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def _send_shutdown_command(worker_name, connection_kwargs): | 
		
	
		
			
				|  |  |  |  |     time.sleep(0.25) | 
		
	
		
			
				|  |  |  |  |     send_shutdown_command(Redis(**connection_kwargs), worker_name) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def _send_kill_horse_command(worker_name, connection_kwargs): | 
		
	
		
			
				|  |  |  |  |     """Waits 0.25 seconds before sending kill-horse command""" | 
		
	
		
			
				|  |  |  |  |     time.sleep(0.25) | 
		
	
		
			
				|  |  |  |  |     send_kill_horse_command(Redis(**connection_kwargs), worker_name) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def start_work(queue_name, worker_name, connection_kwargs): | 
		
	
		
			
				|  |  |  |  |     worker = Worker(queue_name, name=worker_name, | 
		
	
		
			
				|  |  |  |  |                     connection=Redis(**connection_kwargs)) | 
		
	
		
			
				|  |  |  |  |     worker.work() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def start_work_burst(queue_name, worker_name, connection_kwargs): | 
		
	
		
			
				|  |  |  |  |     worker = Worker(queue_name, name=worker_name, | 
		
	
		
			
				|  |  |  |  |                     connection=Redis(**connection_kwargs), | 
		
	
		
			
				|  |  |  |  |                     serializer=JSONSerializer) | 
		
	
		
			
				|  |  |  |  |     worker.work(burst=True) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | class TestCommands(RQTestCase): | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def test_shutdown_command(self): | 
		
	
	
		
			
				
					|  |  |  | @ -19,11 +46,9 @@ class TestCommands(RQTestCase): | 
		
	
		
			
				|  |  |  |  |         connection = self.testconn | 
		
	
		
			
				|  |  |  |  |         worker = Worker('foo', connection=connection) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         def _send_shutdown_command(): | 
		
	
		
			
				|  |  |  |  |             time.sleep(0.25) | 
		
	
		
			
				|  |  |  |  |             send_shutdown_command(connection, worker.name) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         p = Process(target=_send_shutdown_command) | 
		
	
		
			
				|  |  |  |  |         p = Process(target=_send_shutdown_command, | 
		
	
		
			
				|  |  |  |  |                     args=(worker.name, | 
		
	
		
			
				|  |  |  |  |                           connection.connection_pool.connection_kwargs.copy())) | 
		
	
		
			
				|  |  |  |  |         p.start() | 
		
	
		
			
				|  |  |  |  |         worker.work() | 
		
	
		
			
				|  |  |  |  |         p.join(1) | 
		
	
	
		
			
				
					|  |  |  | @ -35,22 +60,18 @@ class TestCommands(RQTestCase): | 
		
	
		
			
				|  |  |  |  |         job = queue.enqueue(long_running_job, 4) | 
		
	
		
			
				|  |  |  |  |         worker = Worker('foo', connection=connection) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         def _send_kill_horse_command(): | 
		
	
		
			
				|  |  |  |  |             """Waits 0.25 seconds before sending kill-horse command""" | 
		
	
		
			
				|  |  |  |  |             time.sleep(0.25) | 
		
	
		
			
				|  |  |  |  |             send_kill_horse_command(connection, worker.name) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         p = Process(target=_send_kill_horse_command) | 
		
	
		
			
				|  |  |  |  |         p = Process(target=_send_kill_horse_command, | 
		
	
		
			
				|  |  |  |  |                     args=(worker.name, | 
		
	
		
			
				|  |  |  |  |                           connection.connection_pool.connection_kwargs.copy())) | 
		
	
		
			
				|  |  |  |  |         p.start() | 
		
	
		
			
				|  |  |  |  |         worker.work(burst=True) | 
		
	
		
			
				|  |  |  |  |         p.join(1) | 
		
	
		
			
				|  |  |  |  |         job.refresh() | 
		
	
		
			
				|  |  |  |  |         self.assertTrue(job.id in queue.failed_job_registry) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         def start_work(): | 
		
	
		
			
				|  |  |  |  |             worker.work() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         p = Process(target=start_work) | 
		
	
		
			
				|  |  |  |  |         p = Process(target=start_work, | 
		
	
		
			
				|  |  |  |  |                     args=('foo', worker.name, | 
		
	
		
			
				|  |  |  |  |                           connection.connection_pool.connection_kwargs.copy())) | 
		
	
		
			
				|  |  |  |  |         p.start() | 
		
	
		
			
				|  |  |  |  |         p.join(2) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
	
		
			
				
					|  |  |  | @ -76,10 +97,9 @@ class TestCommands(RQTestCase): | 
		
	
		
			
				|  |  |  |  |         with self.assertRaises(NoSuchJobError): | 
		
	
		
			
				|  |  |  |  |             send_stop_job_command(connection, job_id='1', serializer=JSONSerializer) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         def start_work(): | 
		
	
		
			
				|  |  |  |  |             worker.work(burst=True) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         p = Process(target=start_work) | 
		
	
		
			
				|  |  |  |  |         p = Process(target=start_work_burst, | 
		
	
		
			
				|  |  |  |  |                     args=('foo', worker.name, | 
		
	
		
			
				|  |  |  |  |                           connection.connection_pool.connection_kwargs.copy())) | 
		
	
		
			
				|  |  |  |  |         p.start() | 
		
	
		
			
				|  |  |  |  |         p.join(1) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
	
		
			
				
					|  |  |  | 
 |