Added job.worker_name (#1375)

* Added job.worker_name

* Fix compatibility with Redis server 3.x

* Document job.worker_name

* Removed some Python 2 compatibility stuff.

* Remove unused codes
main
Selwin Ong 4 years ago committed by GitHub
parent 3ead30a34e
commit f3e924cdd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -122,6 +122,7 @@ Some interesting job attributes include:
* `job.ended_at`
* `job.exc_info` stores exception information if job doesn't finish successfully.
* `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active.
* `job.worker_name` returns the worker name currently executing this job.
If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.

@ -45,12 +45,13 @@ except ImportError:
PY2 = sys.version_info[0] == 2
if not PY2:
# Python 3.x and up
text_type = str
string_types = (str,)
def as_text(v):
# Python 3.x and up
text_type = str
string_types = (str,)
def as_text(v):
if v is None:
return None
elif isinstance(v, bytes):
@ -60,47 +61,6 @@ if not PY2:
else:
raise ValueError('Unknown type %r' % type(v))
def decode_redis_hash(h):
return dict((as_text(k), h[k]) for k in h)
else:
# Python 2.x
def text_type(v):
try:
return unicode(v) # noqa
except Exception:
return unicode(v, "utf-8", errors="ignore") # noqa
string_types = (str, unicode) # noqa
def as_text(v):
if v is None:
return None
elif isinstance(v, str):
return v.decode('utf-8')
elif isinstance(v, unicode): # noqa
return v
else:
raise Exception("Input cannot be decoded into literal thing.")
def decode_redis_hash(h):
return h
try:
from datetime import timezone
utc = timezone.utc
except ImportError:
# Python 2.x workaround
from datetime import timedelta, tzinfo
class UTC(tzinfo):
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
utc = UTC()
def decode_redis_hash(h):
return dict((as_text(k), h[k]) for k in h)

@ -343,6 +343,7 @@ class Job(object):
self.result_ttl = None
self.failure_ttl = None
self.ttl = None
self.worker_name = None
self._status = None
self._dependency_ids = []
self.meta = {}
@ -479,6 +480,7 @@ class Job(object):
self.created_at = str_to_date(obj.get('created_at'))
self.origin = as_text(obj.get('origin'))
self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None
self.description = as_text(obj.get('description'))
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
@ -538,6 +540,7 @@ class Job(object):
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
'worker_name': self.worker_name or ''
}
if self.retries_left is not None:
@ -554,7 +557,7 @@ class Job(object):
if self._result is not None:
try:
obj['result'] = self.serializer.dumps(self._result)
except Exception as e:
except: # noqa
obj['result'] = "Unserializable return value"
if self.exc_info is not None:
obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
@ -694,6 +697,23 @@ class Job(object):
assert self is _job_stack.pop()
return self._result
def prepare_for_execution(self, worker_name, pipeline):
"""Set job metadata before execution begins"""
self.worker_name = worker_name
self.last_heartbeat = utcnow()
self.started_at = self.last_heartbeat
self._status = JobStatus.STARTED
mapping = {
'last_heartbeat': utcformat(self.last_heartbeat),
'status': self._status,
'started_at': utcformat(self.started_at),
'worker_name': worker_name
}
if self.get_redis_server_version() >= StrictVersion("4.0.0"):
pipeline.hset(self.key, mapping=mapping)
else:
pipeline.hmset(self.key, mapping)
def _execute(self):
return self.func(*self.args, **self.kwargs)

@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
from datetime import datetime
from datetime import datetime, timezone
from distutils.version import StrictVersion
from redis import WatchError
from .compat import as_text, string_types, total_ordering, utc
from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
@ -451,7 +452,7 @@ nd
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedules a job to be executed in a given `timedelta` object"""
return self.enqueue_at(datetime.now(utc) + time_delta,
return self.enqueue_at(datetime.now(timezone.utc) + time_delta,
func, *args, **kwargs)
def enqueue_job(self, job, pipeline=None, at_front=False):

@ -1,8 +1,8 @@
import calendar
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from .compat import as_text, utc
from .compat import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
@ -303,7 +303,7 @@ class ScheduledJobRegistry(BaseRegistry):
if not score:
raise NoSuchJobError
return datetime.fromtimestamp(score, tz=utc)
return datetime.fromtimestamp(score, tz=timezone.utc)
def clean_registries(queue):

@ -26,7 +26,7 @@ from redis import WatchError
from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE
from .compat import PY2, as_text, string_types, text_type
from .compat import as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL,
@ -859,9 +859,7 @@ class Worker(object):
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'
@ -883,7 +881,7 @@ class Worker(object):
self.connection,
job_class=self.job_class
)
job.worker_name = None
# Requeue/reschedule if retry is configured
if job.retries_left and job.retries_left > 0:
retry = True
@ -943,6 +941,7 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.worker_name = None
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
@ -985,9 +984,7 @@ class Worker(object):
except: # NOQA
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = self._get_safe_exception_string(
traceback.format_exception(*exc_info)
)
exc_string = ''.join(traceback.format_exception(*exc_info))
self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
started_job_registry=started_job_registry)
self.handle_exception(job, *exc_info)
@ -1014,9 +1011,7 @@ class Worker(object):
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = Worker._get_safe_exception_string(
traceback.format_exception(*exc_info)
)
exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.error(exc_string, exc_info=True, extra={
'func': job.func_name,
'arguments': job.args,
@ -1037,16 +1032,6 @@ class Worker(object):
if not fallthrough:
break
@staticmethod
def _get_safe_exception_string(exc_strings):
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
if sys.version_info[0] < 3:
try:
exc_strings = [exc.decode("utf-8") for exc in exc_strings]
except ValueError:
exc_strings = [exc.decode("latin-1") for exc in exc_strings]
return ''.join(exc_strings)
def push_exc_handler(self, handler_func):
"""Pushes an exception handler onto the exc handler stack."""
self._exc_handlers.append(handler_func)
@ -1101,8 +1086,6 @@ class Worker(object):
class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs):
raise NotImplementedError("Test worker does not implement this method")
def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()"""
@ -1124,10 +1107,6 @@ class HerokuWorker(Worker):
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
if PY2:
frame_properties.extend(
['f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_restricted']
)
def setup_work_horse_signals(self):
"""Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN"""

@ -14,7 +14,7 @@ import subprocess
from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job
from rq.compat import PY2, text_type
from rq.compat import text_type
from rq.worker import HerokuWorker
@ -120,9 +120,6 @@ class CallableObject(object):
class UnicodeStringObject(object):
def __repr__(self):
if PY2:
return u'é'.encode('utf-8')
else:
return u'é'

@ -2,13 +2,12 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime
from datetime import datetime, timezone
from click.testing import CliRunner
from redis import Redis
from rq import Queue
from rq.compat import utc
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
@ -250,7 +249,7 @@ class TestRQCli(RQTestCase):
def test_worker_with_scheduler(self):
"""rq worker -u <url> --with-scheduler"""
queue = Queue(connection=self.connection)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
registry = ScheduledJobRegistry(queue=queue)
runner = CliRunner()

@ -10,7 +10,7 @@ from datetime import datetime, timedelta
from redis import WatchError
from rq.compat import PY2, as_text
from rq.compat import as_text
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry
from rq.queue import Queue
@ -32,18 +32,9 @@ class TestJob(RQTestCase):
args=[12, ""],
kwargs=dict(snowman="", null=None),
)
if not PY2:
# Python 3
expected_string = "myfunc(12, '', null=None, snowman='')"
else:
# Python 2
expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode(
'utf-8')
self.assertEqual(
job.description,
expected_string,
"myfunc(12, '', null=None, snowman='')",
)
def test_create_empty_job(self):
@ -222,7 +213,7 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at'])
[b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', b'worker_name'])
self.assertEqual(job.last_heartbeat, None)
self.assertEqual(job.last_heartbeat, None)
@ -439,11 +430,21 @@ class TestJob(RQTestCase):
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
if PY2:
self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')")
else:
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_prepare_for_execution(self):
"""job.prepare_for_execution works properly"""
job = Job.create(func=fixtures.say_hello)
job.save()
with self.testconn.pipeline() as pipeline:
job.prepare_for_execution("worker_name", pipeline)
pipeline.execute()
job.refresh()
self.assertEqual(job.worker_name, "worker_name")
self.assertEqual(job.get_status(), JobStatus.STARTED)
self.assertIsNotNone(job.last_heartbeat)
self.assertIsNotNone(job.started_at)
def test_job_access_outside_job_fails(self):
"""The current job is accessible only within a job context."""
self.assertIsNone(get_current_job())

@ -3,13 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import json
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from mock.mock import patch
from rq import Retry, Queue
from rq.compat import utc
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, ScheduledJobRegistry,
@ -364,7 +361,7 @@ class TestQueue(RQTestCase):
)
# Explicit args and kwargs should also work with enqueue_at
time = datetime.now(utc) + timedelta(seconds=10)
time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
@ -671,7 +668,7 @@ class TestJobScheduling(RQTestCase):
def test_enqueue_at(self):
"""enqueue_at() creates a job in ScheduledJobRegistry"""
queue = Queue(connection=self.testconn)
scheduled_time = datetime.now(utc) + timedelta(seconds=10)
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = queue.enqueue_at(scheduled_time, say_hello)
registry = ScheduledJobRegistry(queue=queue)
self.assertIn(job, registry)

@ -1,11 +1,11 @@
import os
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from multiprocessing import Process
from rq import Queue
from rq.compat import utc, PY2
from rq.compat import PY2
from rq.exceptions import NoSuchJobError
from rq.job import Job, Retry
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
@ -57,8 +57,8 @@ class TestScheduledJobRegistry(RQTestCase):
job = Job.create('myfunc', connection=self.testconn)
job.save()
dt = datetime(2019, 1, 1, tzinfo=utc)
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
dt = datetime(2019, 1, 1, tzinfo=timezone.utc)
registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc))
self.assertEqual(registry.get_scheduled_time(job), dt)
# get_scheduled_time() should also work with job ID
self.assertEqual(registry.get_scheduled_time(job.id), dt)
@ -74,13 +74,6 @@ class TestScheduledJobRegistry(RQTestCase):
job.save()
registry = ScheduledJobRegistry(queue=queue)
if PY2:
# On Python 2, datetime needs to have timezone
self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1))
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800) # 2019-01-01 UTC in Unix timestamp
else:
from datetime import timezone
# If we pass in a datetime with no timezone, `schedule()`
# assumes local timezone so depending on your local timezone,
@ -227,7 +220,7 @@ class TestScheduler(RQTestCase):
registry = ScheduledJobRegistry(queue=queue)
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc))
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
@ -237,7 +230,7 @@ class TestScheduler(RQTestCase):
self.assertEqual(len(registry), 0)
# Jobs scheduled in the far future should not be affected
registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc))
registry.schedule(job, datetime(2100, 1, 1, tzinfo=timezone.utc))
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
@ -294,7 +287,7 @@ class TestWorker(RQTestCase):
p = Process(target=kill_worker, args=(os.getpid(), False, 5))
p.start()
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
worker.work(burst=False, with_scheduler=True)
p.join(1)
self.assertIsNotNone(worker.scheduler)
@ -311,7 +304,7 @@ class TestQueue(RQTestCase):
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 1)
@ -330,7 +323,7 @@ class TestQueue(RQTestCase):
registry = ScheduledJobRegistry(queue=queue)
job = queue.enqueue_in(timedelta(seconds=30), say_hello)
now = datetime.now(utc)
now = datetime.now(timezone.utc)
scheduled_time = registry.get_scheduled_time(job)
# Ensure that job is scheduled roughly 30 seconds from now
self.assertTrue(

@ -181,6 +181,7 @@ class TestWorker(RQTestCase):
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Frank!')
self.assertIsNone(job.worker_name)
def test_job_times(self):
"""job times are set correctly."""
@ -296,7 +297,7 @@ class TestWorker(RQTestCase):
enqueued_at_date = str(job.enqueued_at)
w = Worker([q])
w.work(burst=True) # should silently pass
w.work(burst=True)
# Postconditions
self.assertEqual(q.count, 0)
@ -307,6 +308,7 @@ class TestWorker(RQTestCase):
# Check the job
job = Job.fetch(job.id)
self.assertEqual(job.origin, q.name)
self.assertIsNone(job.worker_name) # Worker name is cleared after failures
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
@ -586,26 +588,26 @@ class TestWorker(RQTestCase):
q = Queue()
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertNotEqual(self.testconn.ttl(job.key), 0)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), -1)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn.get(job.key), None)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
def test_worker_sets_job_status(self):
"""Ensure that worker correctly sets job status."""
@ -736,6 +738,10 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.get_state(), 'busy')
self.assertEqual(worker.get_current_job_id(), job.id)
# job status is also updated
self.assertEqual(job._status, JobStatus.STARTED)
self.assertEqual(job.worker_name, worker.name)
def test_prepare_job_execution_inf_timeout(self):
"""Prepare job execution handles infinite job timeout"""
queue = Queue(connection=self.testconn)

Loading…
Cancel
Save