Take a different tack, and enable the exclusion of meta from saves

main
Benjamin Root 8 years ago
parent 30a7ab4899
commit efb30b45a3

@ -422,8 +422,14 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def to_dict(self):
"""Returns a serialization of the current job instance"""
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
You can exclude serializing the `meta` dictionary by setting
`include_meta=False`.
"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
obj['data'] = self.data
@ -450,19 +456,26 @@ class Job(object):
obj['status'] = self._status
if self._dependency_id is not None:
obj['dependency_id'] = self._dependency_id
if self.meta:
if self.meta and include_meta:
obj['meta'] = dumps(self.meta)
if self.ttl:
obj['ttl'] = self.ttl
return obj
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
def save(self, pipeline=None, include_meta=True):
"""
Persists the current job instance to its corresponding Redis key.
Exclude persisting the `meta` dictionary by setting
`include_meta=False`. This is useful to prevent clobbering
user metadata without an expensive `refresh()` call first.
"""
key = self.key
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict())
connection.hmset(key, self.to_dict(include_meta=include_meta))
self.cleanup(self.ttl, pipeline=connection)
def cancel(self):

@ -229,7 +229,7 @@ class Queue(object):
if not self._async:
job.perform()
job.set_status(JobStatus.FINISHED)
job.save()
job.save(include_meta=False)
job.cleanup(DEFAULT_RESULT_TTL)
return job
@ -474,7 +474,7 @@ class FailedQueue(Queue):
job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline)
job.save(pipeline=pipeline, include_meta=False)
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()

@ -89,7 +89,7 @@ class StartedJobRegistry(BaseRegistry):
try:
job = Job.fetch(job_id, connection=self.connection)
job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline)
job.save(pipeline=pipeline, include_meta=False)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass

@ -677,7 +677,8 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
@ -705,7 +706,6 @@ class Worker(object):
try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()
job.refresh()
job.ended_at = utcnow()
@ -719,7 +719,6 @@ class Worker(object):
started_job_registry=started_job_registry
)
except Exception:
job.refresh()
self.handle_job_failure(
job=job,
started_job_registry=started_job_registry

Loading…
Cancel
Save