job: add get_meta() function (#1536)

* job: add get_meta() function

The newly introduced function returns meta data stored for the job. This
is required since job.meta stays an empty dict until the job is
finished or failed.

With the new function it's possible to store arbiatraty states/stages of
the job and allow the user to track progress. A long running job may
return custom stages like `downloading_data`, `unpacking_data`,
`processing_data`, etc.

This may allow better interfaces since users can track progress.

Signed-off-by: Paul Spooren <mail@aparcar.org>

* docs: add missing `refresh` arg to get_status()

This was previously missing.

Signed-off-by: Paul Spooren <mail@aparcar.org>
main
Paul Spooren 3 years ago committed by GitHub
parent d41f60b906
commit 63abea1522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -99,7 +99,11 @@ print('Status: %s' % job.get_status())
``` ```
Some interesting job attributes include: Some interesting job attributes include:
* `job.get_status()` Possible values are `queued`, `started`, `deferred`, `finished`, `stopped`, `scheduled` and `failed` * `job.get_status(refresh=True)` Possible values are `queued`, `started`,
`deferred`, `finished`, `stopped`, `scheduled` and `failed`. If `refresh` is
`True` fresh values are fetched from Redis.
* `job.get_meta(refresh=True)` Returns custom `job.meta` dict containing user
stored data. If `refresh` is `True` fresh values are fetched from Redis.
* `job.origin` queue name of this job * `job.origin` queue name of this job
* `job.func_name` * `job.func_name`
* `job.args` arguments passed to the underlying job function * `job.args` arguments passed to the underlying job function

@ -158,6 +158,13 @@ class Job:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status) connection.hset(self.key, 'status', self._status)
def get_meta(self, refresh=True):
if refresh:
meta = self.connection.hget(self.key, 'meta')
self.meta = self.serializer.loads(meta) if meta else {}
return self.meta
@property @property
def is_finished(self): def is_finished(self):
return self.get_status() == JobStatus.FINISHED return self.get_status() == JobStatus.FINISHED

@ -347,6 +347,22 @@ class TestJob(RQTestCase):
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
self.assertEqual(job2.meta['foo'], 'bar') self.assertEqual(job2.meta['foo'], 'bar')
def test_get_meta(self):
"""Test get_meta() function"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.meta['foo'] = 'bar'
job.save()
self.assertEqual(job.get_meta()['foo'], 'bar')
# manually write different data in meta
self.testconn.hset(job.key, 'meta', dumps({'fee': 'boo'}))
# check if refresh=False keeps old data
self.assertEqual(job.get_meta(False)['foo'], 'bar')
# check if meta is updated
self.assertEqual(job.get_meta()['fee'], 'boo')
def test_custom_meta_is_rewriten_by_save_meta(self): def test_custom_meta_is_rewriten_by_save_meta(self):
"""New meta data can be stored by save_meta.""" """New meta data can be stored by save_meta."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))

Loading…
Cancel
Save