Merge pull request #781 from WeatherGod/dont_clobber_jobinfo

Trigger a refresh after job execution
main
Selwin Ong 8 years ago committed by GitHub
commit e531cd05fe

@ -422,8 +422,14 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def to_dict(self): def to_dict(self, include_meta=True):
"""Returns a serialization of the current job instance""" """
Returns a serialization of the current job instance
You can exclude serializing the `meta` dictionary by setting
`include_meta=False`.
"""
obj = {} obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow()) obj['created_at'] = utcformat(self.created_at or utcnow())
obj['data'] = self.data obj['data'] = self.data
@ -450,19 +456,26 @@ class Job(object):
obj['status'] = self._status obj['status'] = self._status
if self._dependency_id is not None: if self._dependency_id is not None:
obj['dependency_id'] = self._dependency_id obj['dependency_id'] = self._dependency_id
if self.meta: if self.meta and include_meta:
obj['meta'] = dumps(self.meta) obj['meta'] = dumps(self.meta)
if self.ttl: if self.ttl:
obj['ttl'] = self.ttl obj['ttl'] = self.ttl
return obj return obj
def save(self, pipeline=None): def save(self, pipeline=None, include_meta=True):
"""Persists the current job instance to its corresponding Redis key.""" """
Persists the current job instance to its corresponding Redis key.
Exclude persisting the `meta` dictionary by setting
`include_meta=False`. This is useful to prevent clobbering
user metadata without an expensive `refresh()` call first.
"""
key = self.key key = self.key
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict()) connection.hmset(key, self.to_dict(include_meta=include_meta))
self.cleanup(self.ttl, pipeline=connection) self.cleanup(self.ttl, pipeline=connection)
def cancel(self): def cancel(self):

@ -229,7 +229,7 @@ class Queue(object):
if not self._async: if not self._async:
job.perform() job.perform()
job.set_status(JobStatus.FINISHED) job.set_status(JobStatus.FINISHED)
job.save() job.save(include_meta=False)
job.cleanup(DEFAULT_RESULT_TTL) job.cleanup(DEFAULT_RESULT_TTL)
return job return job
@ -474,7 +474,7 @@ class FailedQueue(Queue):
job.ended_at = utcnow() job.ended_at = utcnow()
job.exc_info = exc_info job.exc_info = exc_info
job.save(pipeline=pipeline) job.save(pipeline=pipeline, include_meta=False)
self.push_job_id(job.id, pipeline=pipeline) self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute() pipeline.execute()

@ -89,7 +89,7 @@ class StartedJobRegistry(BaseRegistry):
try: try:
job = Job.fetch(job_id, connection=self.connection) job = Job.fetch(job_id, connection=self.connection)
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline) job.save(pipeline=pipeline, include_meta=False)
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError: except NoSuchJobError:
pass pass

@ -677,7 +677,8 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline) job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline) # Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
finished_job_registry = FinishedJobRegistry(job.origin, finished_job_registry = FinishedJobRegistry(job.origin,
self.connection) self.connection)

@ -61,6 +61,19 @@ def access_self():
assert get_current_job() is not None assert get_current_job() is not None
def modify_self(meta):
j = get_current_job()
j.meta.update(meta)
j.save()
def modify_self_and_error(meta):
j = get_current_job()
j.meta.update(meta)
j.save()
return 1 / 0
def echo(*args, **kwargs): def echo(*args, **kwargs):
return (args, kwargs) return (args, kwargs)

@ -19,7 +19,8 @@ import mock
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid, div_by_zero, do_nothing, say_hello, say_pid,
run_dummy_heroku_worker, access_self) run_dummy_heroku_worker, access_self,
modify_self, modify_self_and_error)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import (get_failed_queue, Queue, SimpleWorker, Worker, from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
@ -606,6 +607,52 @@ class TestWorker(RQTestCase):
# So before that fix the call count was 4 instead of 3 # So before that fix the call count was 4 instead of 3
self.assertEqual(mocked.call_count, 3) self.assertEqual(mocked.call_count, 3)
def test_self_modification_persistence(self):
"""Make sure that any meta modification done by
the job itself persists completely through the
queue/worker/job stack."""
q = Queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self, meta={'foo': 'bar', 'baz': 42},
args=[{'baz': 10, 'newinfo': 'waka'}])
w = Worker([q])
w.work(burst=True)
job_check = Job.fetch(job.id)
self.assertEqual(set(job_check.meta.keys()),
set(['foo', 'baz', 'newinfo']))
self.assertEqual(job_check.meta['foo'], 'bar')
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')
def test_self_modification_persistence_with_error(self):
"""Make sure that any meta modification done by
the job itself persists completely through the
queue/worker/job stack -- even if the job errored"""
q = Queue()
failed_q = get_failed_queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42},
args=[{'baz': 10, 'newinfo': 'waka'}])
w = Worker([q])
w.work(burst=True)
# Postconditions
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
self.assertEqual(w.get_current_job_id(), None)
job_check = Job.fetch(job.id)
self.assertEqual(set(job_check.meta.keys()),
set(['foo', 'baz', 'newinfo']))
self.assertEqual(job_check.meta['foo'], 'bar')
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')
def kill_worker(pid, double_kill): def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process # wait for the worker to be started over on the main process

Loading…
Cancel
Save