diff --git a/.gitignore b/.gitignore
index 25a046e..f95cc7b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,3 @@
 .tox
 .vagrant
 Vagrantfile
-
-# PyCharm
-.idea
diff --git a/requirements.txt b/requirements.txt
index 539b9a4..8da152d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,2 @@
-redis
-click
+redis==2.7.0
+click>=3.0.0
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 18979dc..afb32ea 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue
 from rq.contrib.legacy import cleanup_ghosts
 from rq.exceptions import InvalidJobOperationError
 from rq.utils import import_attribute
+from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
 
 from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
                       show_both, show_queues, show_workers)
@@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
 url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
                           help='URL describing Redis connection details.')
 
+config_option = click.option('--config', '-c', help='Module containing RQ settings.')
 
-def connect(url):
+
+def connect(url, config=None):
+    settings = read_config_file(config) if config else {}
+    url = url or settings.get('REDIS_URL')
     return StrictRedis.from_url(url or 'redis://localhost:6379/0')
 
 
@@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
 
 @main.command()
 @url_option
-@click.option('--config', '-c', help='Module containing RQ settings.')
+@config_option
 @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
 @click.option('--name', '-n', help='Specify a different name')
 @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
     worker_class = import_attribute(worker_class)
     queue_class = import_attribute(queue_class)
 
+    if is_suspended(conn):
+        click.secho("The worker has been paused, run reset_paused", fg='red')
+        sys.exit(1)
+
     try:
+
         queues = [queue_class(queue, connection=conn) for queue in queues]
         w = worker_class(queues,
                          name=name,
@@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
     except ConnectionError as e:
         print(e)
         sys.exit(1)
+
+
+@main.command()
+@url_option
+@config_option
+@click.option('--duration', help='Seconds you want the workers to be suspended.  Default is forever.', type=int)
+def suspend(url, config, duration):
+    """Suspends all workers, to resume run `rq resume`"""
+    if duration is not None and duration < 1:
+        click.echo("Duration must be an integer greater than 1")
+        sys.exit(1)
+
+    connection = connect(url, config)
+    connection_suspend(connection, duration)
+
+    if duration:
+        msg = """Suspending workers for {0} seconds.  No new jobs will be started during that time, but then will
+        automatically resume""".format(duration)
+        click.echo(msg)
+    else:
+        click.echo("Suspending workers.  No new jobs will be started.  But current jobs will be completed")
+
+
+@main.command()
+@url_option
+@config_option
+def resume(url, config):
+    """Resumes processing of queues, that where suspended with `rq suspend`"""
+    connection = connect(url, config)
+    connection_resume(connection)
+    click.echo("Resuming workers.")
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index f25cc81..9ea4b58 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -8,7 +8,9 @@ from functools import partial
 
 import click
 from rq import Queue, Worker
+from rq.worker import WorkerStatus
 from rq.logutils import setup_loghandlers
+from rq.suspension import is_suspended
 
 red = partial(click.style, fg='red')
 green = partial(click.style, fg='green')
@@ -39,8 +41,9 @@ def get_scale(x):
 
 def state_symbol(state):
     symbols = {
-        'busy': red('busy'),
-        'idle': green('idle'),
+        WorkerStatus.BUSY: red('busy'),
+        WorkerStatus.IDLE: green('idle'),
+        WorkerStatus.SUSPENDED: yellow('suspended'),
     }
     try:
         return symbols[state]
diff --git a/rq/job.py b/rq/job.py
index e8080f8..7b17492 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
 from .connections import resolve_connection
 from .exceptions import NoSuchJobError, UnpickleError
 from .local import LocalStack
-from .utils import import_attribute, utcformat, utcnow, utcparse
+from .utils import import_attribute, utcformat, utcnow, utcparse, enum
 
 try:
     import cPickle as pickle
@@ -25,16 +25,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
 loads = pickle.loads
 
 
-def enum(name, *sequential, **named):
-    values = dict(zip(sequential, range(len(sequential))), **named)
-
-    # NOTE: Yes, we *really* want to cast using str() here.
-    # On Python 2 type() requires a byte string (which is str() on Python 2).
-    # On Python 3 it does not matter, so we'll use str(), which acts as
-    # a no-op.
-    return type(str(name), (), values)
-
-Status = enum('Status',
+JobStatus = enum('JobStatus',
               QUEUED='queued', FINISHED='finished', FAILED='failed',
               STARTED='started')
 
@@ -166,19 +157,19 @@ class Job(object):
 
     @property
     def is_finished(self):
-        return self.get_status() == Status.FINISHED
+        return self.get_status() == JobStatus.FINISHED
 
     @property
     def is_queued(self):
-        return self.get_status() == Status.QUEUED
+        return self.get_status() == JobStatus.QUEUED
 
     @property
     def is_failed(self):
-        return self.get_status() == Status.FAILED
+        return self.get_status() == JobStatus.FAILED
 
     @property
     def is_started(self):
-        return self.get_status() == Status.STARTED
+        return self.get_status() == JobStatus.STARTED
 
     @property
     def dependency(self):
diff --git a/rq/queue.py b/rq/queue.py
index ff5f860..61f5575 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
 import uuid
 
 from .connections import resolve_connection
-from .job import Job, Status
+from .job import Job, JobStatus
 from .utils import import_attribute, utcnow
 
 from .exceptions import (DequeueTimeout, InvalidJobOperationError,
@@ -180,7 +180,7 @@ class Queue(object):
 
         # TODO: job with dependency shouldn't have "queued" as status
         job = self.job_class.create(func, args, kwargs, connection=self.connection,
-                                    result_ttl=result_ttl, status=Status.QUEUED,
+                                    result_ttl=result_ttl, status=JobStatus.QUEUED,
                                     description=description, depends_on=depends_on, timeout=timeout,
                                     id=job_id)
 
@@ -195,7 +195,7 @@ class Queue(object):
                 while True:
                     try:
                         pipe.watch(depends_on.key)
-                        if depends_on.get_status() != Status.FINISHED:
+                        if depends_on.get_status() != JobStatus.FINISHED:
                             job.register_dependency(pipeline=pipe)
                             job.save(pipeline=pipe)
                             pipe.execute()
@@ -390,7 +390,7 @@ class Queue(object):
 
 class FailedQueue(Queue):
     def __init__(self, connection=None):
-        super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
+        super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
 
     def quarantine(self, job, exc_info):
         """Puts the given Job in quarantine (i.e. put it on the failed
@@ -417,7 +417,7 @@ class FailedQueue(Queue):
         if self.remove(job) == 0:
             raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
 
-        job.set_status(Status.QUEUED)
+        job.set_status(JobStatus.QUEUED)
         job.exc_info = None
         q = Queue(job.origin, connection=self.connection)
         q.enqueue_job(job)
diff --git a/rq/suspension.py b/rq/suspension.py
new file mode 100644
index 0000000..b734acd
--- /dev/null
+++ b/rq/suspension.py
@@ -0,0 +1,18 @@
+WORKERS_SUSPENDED = 'rq:suspended'
+
+
+def is_suspended(connection):
+    return connection.exists(WORKERS_SUSPENDED)
+
+
+def suspend(connection, ttl=None):
+    """ttl = time to live in seconds.  Default is no expiration
+       Note:  If you pass in 0 it will invalidate right away
+    """
+    connection.set(WORKERS_SUSPENDED, 1)
+    if ttl is not None:
+        connection.expire(WORKERS_SUSPENDED, ttl)
+
+
+def resume(connection):
+    return connection.delete(WORKERS_SUSPENDED)
\ No newline at end of file
diff --git a/rq/utils.py b/rq/utils.py
index 8233ac6..db56020 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
 def current_timestamp():
     """Returns current UTC timestamp"""
     return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
+
+
+def enum(name, *sequential, **named):
+    values = dict(zip(sequential, range(len(sequential))), **named)
+
+    # NOTE: Yes, we *really* want to cast using str() here.
+    # On Python 2 type() requires a byte string (which is str() on Python 2).
+    # On Python 3 it does not matter, so we'll use str(), which acts as
+    # a no-op.
+    return type(str(name), (), values)
\ No newline at end of file
diff --git a/rq/worker.py b/rq/worker.py
index bf40a65..72f1aa4 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -12,18 +12,20 @@ import sys
 import time
 import traceback
 import warnings
+from datetime import datetime
 
 from rq.compat import as_text, string_types, text_type
 
 from .connections import get_current_connection
 from .exceptions import DequeueTimeout, NoQueueError
-from .job import Job, Status
+from .job import Job, JobStatus
 from .logutils import setup_loghandlers
 from .queue import get_failed_queue, Queue
 from .timeouts import UnixSignalDeathPenalty
-from .utils import import_attribute, make_colorizer, utcformat, utcnow
+from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
 from .version import VERSION
 from .registry import FinishedJobRegistry, StartedJobRegistry
+from .suspension import is_suspended
 
 try:
     from procname import setprocname
@@ -52,8 +54,8 @@ def compact(l):
     return [x for x in l if x is not None]
 
 _signames = dict((getattr(signal, signame), signame)
-                 for signame in dir(signal)
-                 if signame.startswith('SIG') and '_' not in signame)
+    for signame in dir(signal)
+    if signame.startswith('SIG') and '_' not in signame)
 
 
 def signal_name(signum):
@@ -65,6 +67,12 @@ def signal_name(signum):
         return 'SIG_UNKNOWN'
 
 
+WorkerStatus = enum('WorkerStatus',
+                    STARTED='started', SUSPENDED='suspended', BUSY='busy',
+                    IDLE='idle'
+                    )
+
+
 class Worker(object):
     redis_worker_namespace_prefix = 'rq:worker:'
     redis_workers_keys = 'rq:workers'
@@ -333,6 +341,30 @@ class Worker(object):
         signal.signal(signal.SIGINT, request_stop)
         signal.signal(signal.SIGTERM, request_stop)
 
+    def check_for_suspension(self, burst):
+        """Check to see if the workers have been suspended by something like `rq suspend`"""
+
+        before_state = None
+        notified = False
+
+        while not self.stopped and is_suspended(self.connection):
+
+            if burst:
+                self.log.info('Suspended in burst mode -- exiting.')
+                self.log.info('Note:  There could still be unperformed jobs on the queue')
+                raise StopRequested
+
+            if not notified:
+                self.log.info('Worker suspended, use "rq resume" command to resume')
+                before_state = self.get_state()
+                self.set_state(WorkerStatus.SUSPENDED)
+                notified = True
+            time.sleep(1)
+
+        if before_state:
+            self.set_state(before_state)
+
+
     def work(self, burst=False):
         """Starts the work loop.
 
@@ -348,15 +380,19 @@ class Worker(object):
         did_perform_work = False
         self.register_birth()
         self.log.info('RQ worker started, version %s' % VERSION)
-        self.set_state('starting')
+        self.set_state(WorkerStatus.STARTED)
+
         try:
             while True:
-                if self.stopped:
-                    self.log.info('Stopping on request.')
-                    break
-
-                timeout = None if burst else max(1, self.default_worker_ttl - 60)
                 try:
+                    self.check_for_suspension(burst)
+
+                    if self.stopped:
+                        self.log.info('Stopping on request.')
+                        break
+
+                    timeout = None if burst else max(1, self.default_worker_ttl - 60)
+
                     result = self.dequeue_job_and_maintain_ttl(timeout)
                     if result is None:
                         break
@@ -367,20 +403,22 @@ class Worker(object):
                 self.execute_job(job)
                 self.heartbeat()
 
-                if job.get_status() == Status.FINISHED:
+                if job.get_status() == JobStatus.FINISHED:
                     queue.enqueue_dependents(job)
 
                 did_perform_work = True
+
         finally:
             if not self.is_horse:
                 self.register_death()
         return did_perform_work
 
+
     def dequeue_job_and_maintain_ttl(self, timeout):
         result = None
         qnames = self.queue_names()
 
-        self.set_state('idle')
+        self.set_state(WorkerStatus.IDLE)
         self.procline('Listening on %s' % ','.join(qnames))
         self.log.info('')
         self.log.info('*** Listening on %s...' %
@@ -395,7 +433,7 @@ class Worker(object):
                 if result is not None:
                     job, queue = result
                     self.log.info('%s: %s (%s)' % (green(queue.name),
-                                  blue(job.description), job.id))
+                                                   blue(job.description), job.id))
 
                 break
             except DequeueTimeout:
@@ -477,12 +515,12 @@ class Worker(object):
         timeout = (job.timeout or 180) + 60
 
         with self.connection._pipeline() as pipeline:
-            self.set_state('busy', pipeline=pipeline)
+            self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
             self.set_current_job_id(job.id, pipeline=pipeline)
             self.heartbeat(timeout, pipeline=pipeline)
             registry = StartedJobRegistry(job.origin, self.connection)
             registry.add(job, timeout, pipeline=pipeline)
-            job.set_status(Status.STARTED, pipeline=pipeline)
+            job.set_status(JobStatus.STARTED, pipeline=pipeline)
             pipeline.execute()
 
         self.procline('Processing %s from %s since %s' % (
@@ -511,7 +549,7 @@ class Worker(object):
                 result_ttl = job.get_ttl(self.default_result_ttl)
                 if result_ttl != 0:
                     job.ended_at = utcnow()
-                    job._status = Status.FINISHED
+                    job._status = JobStatus.FINISHED
                     job.save(pipeline=pipeline)
 
                     finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
@@ -523,7 +561,7 @@ class Worker(object):
                 pipeline.execute()
 
             except Exception:
-                job.set_status(Status.FAILED, pipeline=pipeline)
+                job.set_status(JobStatus.FAILED, pipeline=pipeline)
                 started_job_registry.remove(job, pipeline=pipeline)
                 pipeline.execute()
                 self.handle_exception(job, *sys.exc_info())
@@ -552,7 +590,7 @@ class Worker(object):
             'arguments': job.args,
             'kwargs': job.kwargs,
             'queue': job.origin,
-        })
+            })
 
         for handler in reversed(self._exc_handlers):
             self.log.debug('Invoking exception handler %s' % (handler,))
diff --git a/tests/test_cli.py b/tests/test_cli.py
index a92fb34..3977006 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -26,6 +26,17 @@ class TestCommandLine(TestCase):
 
 
 class TestRQCli(RQTestCase):
+
+    def assert_normal_execution(self, result):
+        if result.exit_code == 0:
+            return True
+        else:
+            print("Non normal execution")
+            print("Exit Code: {}".format(result.exit_code))
+            print("Output: {}".format(result.output))
+            print("Exception: {}".format(result.exception))
+            self.assertEqual(result.exit_code, 0)
+
     """Test rq_cli script"""
     def setUp(self):
         super(TestRQCli, self).setUp()
@@ -41,25 +52,58 @@ class TestRQCli(RQTestCase):
         """rq empty -u <url> failed"""
         runner = CliRunner()
         result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
-        self.assertEqual(result.exit_code, 0)
+        self.assert_normal_execution(result)
         self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
 
     def test_requeue(self):
         """rq requeue -u <url> --all"""
         runner = CliRunner()
         result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
-        self.assertEqual(result.exit_code, 0)
+        self.assert_normal_execution(result)
         self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
 
     def test_info(self):
         """rq info -u <url>"""
         runner = CliRunner()
         result = runner.invoke(main, ['info', '-u', self.redis_url])
-        self.assertEqual(result.exit_code, 0)
+        self.assert_normal_execution(result)
         self.assertIn('1 queues, 1 jobs total', result.output)
 
     def test_worker(self):
         """rq worker -u <url> -b"""
         runner = CliRunner()
         result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
-        self.assertEqual(result.exit_code, 0)
+        self.assert_normal_execution(result)
+
+    def test_suspend_and_resume(self):
+        """rq suspend -u <url>
+           rq resume -u <url>
+        """
+        runner = CliRunner()
+        result = runner.invoke(main, ['suspend', '-u', self.redis_url])
+        self.assert_normal_execution(result)
+
+        result = runner.invoke(main, ['resume', '-u', self.redis_url])
+        self.assert_normal_execution(result)
+
+    def test_suspend_with_ttl(self):
+        """rq suspend -u <url> --duration=2
+        """
+        runner = CliRunner()
+        result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
+        self.assert_normal_execution(result)
+
+    def test_suspend_with_invalid_ttl(self):
+        """rq suspend -u <url> --duration=0
+        """
+        runner = CliRunner()
+        result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
+
+        self.assertEqual(result.exit_code, 1)
+        self.assertIn("Duration must be an integer greater than 1", result.output)
+
+
+
+
+
+
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 8990e35..c905f13 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
 
 from rq import get_failed_queue, Queue
 from rq.exceptions import InvalidJobOperationError
-from rq.job import Job, Status
+from rq.job import Job, JobStatus
 from rq.worker import Worker
 
 from tests import RQTestCase
@@ -262,7 +262,7 @@ class TestQueue(RQTestCase):
         """Enqueueing a job sets its status to "queued"."""
         q = Queue()
         job = q.enqueue(say_hello)
-        self.assertEqual(job.get_status(), Status.QUEUED)
+        self.assertEqual(job.get_status(), JobStatus.QUEUED)
 
     def test_enqueue_explicit_args(self):
         """enqueue() works for both implicit/explicit args."""
@@ -346,7 +346,7 @@ class TestQueue(RQTestCase):
         self.assertEqual(q.job_ids, [])
 
         # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(Status.FINISHED)
+        parent_job.set_status(JobStatus.FINISHED)
         parent_job.save()
         job = q.enqueue_call(say_hello, depends_on=parent_job)
         self.assertEqual(q.job_ids, [job.id])
@@ -363,7 +363,7 @@ class TestQueue(RQTestCase):
         self.assertEqual(q.job_ids, [])
 
         # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(Status.FINISHED)
+        parent_job.set_status(JobStatus.FINISHED)
         parent_job.save()
         job = q.enqueue_call(say_hello, depends_on=parent_job.id)
         self.assertEqual(q.job_ids, [job.id])
@@ -379,7 +379,7 @@ class TestQueue(RQTestCase):
         self.assertEqual(job.timeout, 123)
 
         # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(Status.FINISHED)
+        parent_job.set_status(JobStatus.FINISHED)
         parent_job.save()
         job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
         self.assertEqual(q.job_ids, [job.id])
@@ -441,7 +441,7 @@ class TestFailedQueue(RQTestCase):
         get_failed_queue().requeue(job.id)
 
         job = Job.fetch(job.id)
-        self.assertEqual(job.get_status(), Status.QUEUED)
+        self.assertEqual(job.get_status(), JobStatus.QUEUED)
 
     def test_enqueue_preserves_result_ttl(self):
         """Enqueueing persists result_ttl."""
diff --git a/tests/test_worker.py b/tests/test_worker.py
index c6d85ff..75f30aa 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -6,8 +6,9 @@ import os
 
 from rq import get_failed_queue, Queue, Worker, SimpleWorker
 from rq.compat import as_text
-from rq.job import Job, Status
+from rq.job import Job, JobStatus
 from rq.registry import StartedJobRegistry
+from rq.suspension import suspend, resume
 
 from tests import RQTestCase, slow
 from tests.fixtures import (create_file, create_file_after_timeout,
@@ -222,14 +223,14 @@ class TestWorker(RQTestCase):
         w = Worker([q])
 
         job = q.enqueue(say_hello)
-        self.assertEqual(job.get_status(), Status.QUEUED)
+        self.assertEqual(job.get_status(), JobStatus.QUEUED)
         self.assertEqual(job.is_queued, True)
         self.assertEqual(job.is_finished, False)
         self.assertEqual(job.is_failed, False)
 
         w.work(burst=True)
         job = Job.fetch(job.id)
-        self.assertEqual(job.get_status(), Status.FINISHED)
+        self.assertEqual(job.get_status(), JobStatus.FINISHED)
         self.assertEqual(job.is_queued, False)
         self.assertEqual(job.is_finished, True)
         self.assertEqual(job.is_failed, False)
@@ -238,7 +239,7 @@ class TestWorker(RQTestCase):
         job = q.enqueue(div_by_zero, args=(1,))
         w.work(burst=True)
         job = Job.fetch(job.id)
-        self.assertEqual(job.get_status(), Status.FAILED)
+        self.assertEqual(job.get_status(), JobStatus.FAILED)
         self.assertEqual(job.is_queued, False)
         self.assertEqual(job.is_finished, False)
         self.assertEqual(job.is_failed, True)
@@ -251,13 +252,13 @@ class TestWorker(RQTestCase):
         job = q.enqueue_call(say_hello, depends_on=parent_job)
         w.work(burst=True)
         job = Job.fetch(job.id)
-        self.assertEqual(job.get_status(), Status.FINISHED)
+        self.assertEqual(job.get_status(), JobStatus.FINISHED)
 
         parent_job = q.enqueue(div_by_zero)
         job = q.enqueue_call(say_hello, depends_on=parent_job)
         w.work(burst=True)
         job = Job.fetch(job.id)
-        self.assertNotEqual(job.get_status(), Status.FINISHED)
+        self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
 
     def test_get_current_job(self):
         """Ensure worker.get_current_job() works properly"""
@@ -318,3 +319,33 @@ class TestWorker(RQTestCase):
                           'Expected at least some work done.')
         self.assertEquals(job.result, 'Hi there, Adam!')
         self.assertEquals(job.description, '你好 世界!')
+
+    def test_pause_worker_execution(self):
+        """Test Pause Worker Execution"""
+
+        SENTINEL_FILE = '/tmp/rq-tests.txt'
+
+        try:
+            # Remove the sentinel if it is leftover from a previous test run
+            os.remove(SENTINEL_FILE)
+        except OSError as e:
+            if e.errno != 2:
+                raise
+
+        q = Queue()
+        job = q.enqueue(create_file, SENTINEL_FILE)
+
+        w = Worker([q])
+
+        suspend(self.testconn)
+
+        w.work(burst=True)
+        assert q.count == 1
+
+        # Should not have created evidence of execution
+        self.assertEquals(os.path.exists(SENTINEL_FILE), False)
+
+        resume(self.testconn)
+        w.work(burst=True)
+        assert q.count == 0
+        self.assertEquals(os.path.exists(SENTINEL_FILE), True)