diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index b067f3c..e4defe1 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -12,7 +12,7 @@ jobs. ### Job Creation -When you enqueue a function, a job will be returned. You may then access the +When you enqueue a function, a job will be returned. You may then access the id property, which can later be used to retrieve the job. ```python @@ -50,8 +50,8 @@ job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id') The keyword arguments accepted by `create()` are: * `timeout` specifies the maximum runtime of the job before it's interrupted - and marked as `failed`. Its default unit is seconds and it can be an integer - or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can + and marked as `failed`. Its default unit is seconds and it can be an integer + or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second (e.g. `'1h'`, `'3m'`, `'5s'`). * `result_ttl` specifies how long (in seconds) successful jobs and their @@ -70,7 +70,7 @@ The keyword arguments accepted by `create()` are: * `args` and `kwargs`: use these to explicitly pass arguments and keyword to the underlying job function. This is useful if your function happens to have conflicting argument names with RQ, for example `description` or `ttl`. - + In the last case, if you want to pass `description` and `ttl` keyword arguments to your job and not to RQ's enqueue function, this is what you do: @@ -99,7 +99,7 @@ print('Status: %s' % job.get_status()) ``` Some interesting job attributes include: -* `job.get_status()` Possible values are `queued`, `started`, `deferred`, `finished`, and `failed` +* `job.get_status()` Possible values are `queued`, `started`, `deferred`, `finished`, `stopped`, and `failed` * `job.origin` queue name of this job * `job.func_name` * `job.args` arguments passed to the underlying job function @@ -136,6 +136,8 @@ redis = Redis() send_stop_job_command(redis, job_id) ``` +Unlike failed jobs, stopped jobs will *not* be automatically retried if retry is configured. Subclasses of `Worker` which override `handle_job_failure()` should likewise take care to handle jobs with a `stopped` status appropriately. + ## Job / Queue Creation with Custom Serializer When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. @@ -152,8 +154,8 @@ queue = Queue(connection=connection, serializer=JSONSerializer) ``` ## Accessing The "current" Job from within the job function - -Since job functions are regular Python functions, you must retrieve the + +Since job functions are regular Python functions, you must retrieve the job in order to inspect or update the job's attributes. To do this from within the function, you can use: @@ -193,19 +195,19 @@ def add(x, y): ## Time to live for job in queue -A job has two TTLs, one for the job result, `result_ttl`, and one for the job itself, `ttl`. +A job has two TTLs, one for the job result, `result_ttl`, and one for the job itself, `ttl`. The latter is used if you have a job that shouldn't be executed after a certain amount of time. ```python # When creating the job: -job = Job.create(func=say_hello, +job = Job.create(func=say_hello, result_ttl=600, # how long (in seconds) to keep the job (if successful) and its results ttl=43, # maximum queued time (in seconds) of the job before it's discarded. ) # or when queueing a new job: -job = q.enqueue(count_words_at_url, - 'http://nvie.com', +job = q.enqueue(count_words_at_url, + 'http://nvie.com', result_ttl=600, # how long to keep the job (if successful) and its results ttl=43 # maximum queued time ) @@ -306,4 +308,4 @@ rq requeue --queue myqueue -u redis://localhost:6379 foo_job_id bar_job_id # This command will requeue all jobs in myqueue's failed job registry rq requeue --queue myqueue -u redis://localhost:6379 --all -``` \ No newline at end of file +``` diff --git a/rq/command.py b/rq/command.py index 94eed96..e8ebcc5 100644 --- a/rq/command.py +++ b/rq/command.py @@ -72,6 +72,9 @@ def handle_stop_job_command(worker, payload): job_id = payload.get('job_id') worker.log.debug('Received command to stop job %s', job_id) if job_id and worker.get_current_job_id() == job_id: + # Sets the '_stopped_job_id' so that the job failure handler knows it + # was intentional. + worker._stopped_job_id = job_id worker.kill_horse() else: worker.log.info('Not working on job %s, command ignored.', job_id) diff --git a/rq/job.py b/rq/job.py index 126a863..a7a9fe4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -35,6 +35,7 @@ JobStatus = enum( STARTED='started', DEFERRED='deferred', SCHEDULED='scheduled', + STOPPED='stopped', ) # Sentinel value to mark that some of our lazily evaluated properties have not @@ -175,6 +176,10 @@ class Job(object): def is_scheduled(self): return self.get_status() == JobStatus.SCHEDULED + @property + def is_stopped(self): + return self.get_status() == JobStatus.STOPPED + @property def _dependency_id(self): """Returns the first item in self._dependency_ids. Present to diff --git a/rq/worker.py b/rq/worker.py index c5762fd..d2b400f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -208,6 +208,8 @@ class Worker(object): self._is_horse = False self._horse_pid = 0 self._stop_requested = False + self._stopped_job_id = None + self.log = logger self.log_job_description = log_job_description self.last_cleaned_at = None @@ -659,7 +661,7 @@ class Worker(object): except DequeueTimeout: pass except redis.exceptions.ConnectionError as conn_err: - self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', + self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time) time.sleep(connection_wait_time) connection_wait_time *= self.exponential_backoff_factor @@ -797,8 +799,14 @@ class Worker(object): job_status = job.get_status() if job_status is None: # Job completed and its ttl has expired return - if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - + elif job_status == JobStatus.STOPPED: + # Work-horse killed deliberately + self.log.warning('Job stopped by user, moving job to FailedJobRegistry') + self.handle_job_failure( + job, queue=queue, + exc_string="Job stopped by user, work-horse terminated." + ) + elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() @@ -811,7 +819,7 @@ class Worker(object): self.handle_job_failure( job, queue=queue, exc_string="Work-horse was terminated unexpectedly " - "(waitpid returned %s)" % ret_val + "(waitpid returned %s)" % ret_val ) def execute_job(self, job, queue): @@ -895,14 +903,19 @@ class Worker(object): job_class=self.job_class ) job.worker_name = None - # Requeue/reschedule if retry is configured - if job.retries_left and job.retries_left > 0: - retry = True - retry_interval = job.get_retry_interval() - job.retries_left = job.retries_left - 1 + + # check whether a job was stopped intentionally and set the job + # status appropriately if it was this job. + job_is_stopped = self._stopped_job_id == job.id + retry = job.retries_left and job.retries_left > 0 and not job_is_stopped + + if job_is_stopped: + job.set_status(JobStatus.STOPPED, pipeline=pipeline) + self._stopped_job_id = None else: - retry = False - job.set_status(JobStatus.FAILED, pipeline=pipeline) + # Requeue/reschedule if retry is configured, otherwise + if not retry: + job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) @@ -920,6 +933,8 @@ class Worker(object): ) if retry: + retry_interval = job.get_retry_interval() + job.retries_left = job.retries_left - 1 if retry_interval: scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval) job.set_status(JobStatus.SCHEDULED) diff --git a/tests/test_commands.py b/tests/test_commands.py index a7a0dd6..f402daa 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -71,7 +71,7 @@ class TestCommands(RQTestCase): with self.assertRaises(InvalidJobOperation): send_stop_job_command(connection, job_id=job.id) - # An exception is raised if job ID is invalid + # An exception is raised if job ID is invalid with self.assertRaises(NoSuchJobError): send_stop_job_command(connection, job_id='1') @@ -92,6 +92,10 @@ class TestCommands(RQTestCase): send_stop_job_command(connection, job_id=job.id) time.sleep(0.25) - # Worker has stopped working + + # Job status is set appropriately + self.assertTrue(job.is_stopped) + + # Worker has stopped working worker.refresh() self.assertEqual(worker.get_state(), WorkerStatus.IDLE)