This change could use far better test coverage, but I'm not sure how to
test it without refactoring more of the code than I think is reasonable
in the scope of this work.
The 'blocking' parameter was replaced with a 'timeout' parameter.
The timeout parameter is interpreted thus:
0 - no timeout (block forever, equivalent to blocking=True)
None - non-blocking (return value or None immediately, equivalent to
blocking=False)
<integer> - maximum seconds to block
Upon timing out, a dequeue operation will raise DequeueTimeout.
Basically, I don't want to enforce users to configure _any_ logging
stack when writing a basic worker, like this:
from rq import Worker, Queue, Connection
with Connection():
q = Queue()
w = Worker([q])
w.work(burst=True)
In case you want to disable logging altogether, you can configure your
logging stack to do so.
When a pickled job string can't be unpickled because some required
module isn't loadable, this leads to an `UnpickleError` in the worker
(not in the horse).
Currently we just assume "garbage" in the job's data field, and silently
ignore it.
This is bad.
Really bad.
Because it avoids the normal exception handling mechanism that RQ has.
Historically, this "feature" was introduced to ignore any invalid pickle
data ("bad strings") on queues, and go on. However, we must assume data
inside `job.data` to be valid pickle data.
While an invalid _format_ of pickle data (e.g. the string "blablah"
isn't valid) leads to unpickle errors, unpickling errors will also occur
when the job can't be validly constructed in memory for other reasons,
like being unable to load a specific class.
Django is a good example of this: try submitting jobs that use
`django.conf.settings` while the `DJANGO_SETTINGS_MODULE` env var isn't
set. Currently, RQ workers will drop these jobs and dismiss them like
any non-valid pickle data. You won't be notified.
This patch changes RQ's behaviour to never ignore invalid string data on
any queue and _always_ handle these errors explicitly (but without
bringing the main loop down, of course).
This reverts commit 1ab8c19696 and
reintroduces all changes made by @dstufft.
Still, it needs more patches to reeanble the default log-to-console
behaviour. See #121.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
This aids unpacking in the case of a function that isn't importable from
the worker's runtime. The unpickling will now (almost) always succeed,
and throw an ImportError later on, when the function is actually
accessed (thus imported implicitly).
The end result is a job on the failed queue, with exc_info describing
the import error, which is tremendously useful.
This case protects against JobTimeoutExceptions being raised immediately
after the job body has been (successfully) executed. Still,
JobTimeoutExceptions pass through naturally, like any other exception,
to be handled by the default exception handler that writes failed jobs
to the failed queue.
Timeouts therefore are reported like any other exception.
Jobs are now stored in separate keys, and only job IDs are put on Redis
queues. Much of the code has been hit by this change, but it is for the
good.
No really.
When SIGINT (``Ctrl+C``) is received when inside a blocking
os.waitpid(), OSError is thrown, effectively cancelling the wait.
However, to facilitate a "warm shutdown", as we intend, Ctrl+C is
perfectly allowed and we want to keep waiting for the child. Therefore,
we perform a trick here, catching OSError, checking whether its cause
was SIGINT (errno == EINTR), and only in that case, loop to os.waitpid()
again.
The currently running task will be waited for, so it can gracefully
be finished. Further execution will be stopped.
If, during this waiting phase, Ctrl+C is hit again, the worker and the
horse will be terminated forcefully (this means work could be lost or
partially finished).
I merely refactored the internal calls. No external API changes have been made in this commit. In order to make the dequeueing methods consistent, each dequeue method now returns a Job instance, which is just a nice lightweight wrapper around the job tuple.
The Job class makes it easier to pass the method call info around, along with some possible meta information, like the queue the job originated from.
This fixes#7.
Redis' BLPOP command takes multiple queue arguments, but LPOP can only
take a single queue. Therefore, we need to loop over all queues
manually, in order, and raise an exception is no more work is available.