|  |  | @ -8,9 +8,6 @@ class BaseRegistry(object): | 
			
		
	
		
		
			
				
					
					|  |  |  |     """ |  |  |  |     """ | 
			
		
	
		
		
			
				
					
					|  |  |  |     Base implementation of job registry, implemented in Redis sorted set. Each job |  |  |  |     Base implementation of job registry, implemented in Redis sorted set. Each job | 
			
		
	
		
		
			
				
					
					|  |  |  |     is stored as a key in the registry, scored by expiration time (unix timestamp). |  |  |  |     is stored as a key in the registry, scored by expiration time (unix timestamp). | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     Jobs with scores are lower than current time is considered "expired" and |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     should be cleaned up. |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     """ |  |  |  |     """ | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def __init__(self, name='default', connection=None): |  |  |  |     def __init__(self, name='default', connection=None): | 
			
		
	
	
		
		
			
				
					|  |  | @ -27,9 +24,9 @@ class BaseRegistry(object): | 
			
		
	
		
		
			
				
					
					|  |  |  |         self.cleanup() |  |  |  |         self.cleanup() | 
			
		
	
		
		
			
				
					
					|  |  |  |         return self.connection.zcard(self.key) |  |  |  |         return self.connection.zcard(self.key) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def add(self, job, timeout, pipeline=None): |  |  |  |     def add(self, job, ttl, pipeline=None): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         """Adds a job to a registry with expiry time of now + timeout.""" |  |  |  |         """Adds a job to a registry with expiry time of now + ttl.""" | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         score = timeout if timeout == -1 else current_timestamp() + timeout |  |  |  |         score = ttl if ttl < 0 else current_timestamp() + ttl | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |         if pipeline is not None: |  |  |  |         if pipeline is not None: | 
			
		
	
		
		
			
				
					
					|  |  |  |             return pipeline.zadd(self.key, score, job.id) |  |  |  |             return pipeline.zadd(self.key, score, job.id) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -39,10 +36,16 @@ class BaseRegistry(object): | 
			
		
	
		
		
			
				
					
					|  |  |  |         connection = pipeline if pipeline is not None else self.connection |  |  |  |         connection = pipeline if pipeline is not None else self.connection | 
			
		
	
		
		
			
				
					
					|  |  |  |         return connection.zrem(self.key, job.id) |  |  |  |         return connection.zrem(self.key, job.id) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def get_expired_job_ids(self): |  |  |  |     def get_expired_job_ids(self, timestamp=None): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         """Returns job ids whose score are less than current timestamp.""" |  |  |  |         """Returns job ids whose score are less than current timestamp. | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         Returns ids for jobs with an expiry time earlier than timestamp, | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         specified as seconds since the Unix epoch. timestamp defaults to call | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         time if unspecified. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         """ | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         score = timestamp if timestamp is not None else current_timestamp() | 
			
		
	
		
		
			
				
					
					|  |  |  |         return [as_text(job_id) for job_id in |  |  |  |         return [as_text(job_id) for job_id in | 
			
		
	
		
		
			
				
					
					|  |  |  |                 self.connection.zrangebyscore(self.key, 0, current_timestamp())] |  |  |  |                 self.connection.zrangebyscore(self.key, 0, score)] | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def get_job_ids(self, start=0, end=-1): |  |  |  |     def get_job_ids(self, start=0, end=-1): | 
			
		
	
		
		
			
				
					
					|  |  |  |         """Returns list of all job ids.""" |  |  |  |         """Returns list of all job ids.""" | 
			
		
	
	
		
		
			
				
					|  |  | @ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry): | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     Jobs are added to registry right before they are executed and removed |  |  |  |     Jobs are added to registry right before they are executed and removed | 
			
		
	
		
		
			
				
					
					|  |  |  |     right after completion (success or failure). |  |  |  |     right after completion (success or failure). | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     Jobs whose score are lower than current time is considered "expired". |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     """ |  |  |  |     """ | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def __init__(self, name='default', connection=None): |  |  |  |     def __init__(self, name='default', connection=None): | 
			
		
	
		
		
			
				
					
					|  |  |  |         super(StartedJobRegistry, self).__init__(name, connection) |  |  |  |         super(StartedJobRegistry, self).__init__(name, connection) | 
			
		
	
		
		
			
				
					
					|  |  |  |         self.key = 'rq:wip:%s' % name |  |  |  |         self.key = 'rq:wip:%s' % name | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def cleanup(self): |  |  |  |     def cleanup(self, timestamp=None): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         """Remove expired jobs from registry and add them to FailedQueue.""" |  |  |  |         """Remove expired jobs from registry and add them to FailedQueue. | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         job_ids = self.get_expired_job_ids() |  |  |  | 
 | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         Removes jobs with an expiry time earlier than timestamp, specified as | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         seconds since the Unix epoch. timestamp defaults to call time if | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         unspecified. Removed jobs are added to the global failed job queue. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         """ | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         score = timestamp if timestamp is not None else current_timestamp() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         job_ids = self.get_expired_job_ids(score) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |         if job_ids: |  |  |  |         if job_ids: | 
			
		
	
		
		
			
				
					
					|  |  |  |             failed_queue = FailedQueue(connection=self.connection) |  |  |  |             failed_queue = FailedQueue(connection=self.connection) | 
			
		
	
		
		
			
				
					
					|  |  |  |             with self.connection.pipeline() as pipeline: |  |  |  |             with self.connection.pipeline() as pipeline: | 
			
		
	
		
		
			
				
					
					|  |  |  |                 for job_id in job_ids: |  |  |  |                 for job_id in job_ids: | 
			
		
	
		
		
			
				
					
					|  |  |  |                     failed_queue.push_job_id(job_id, pipeline=pipeline) |  |  |  |                     failed_queue.push_job_id(job_id, pipeline=pipeline) | 
			
		
	
		
		
			
				
					
					|  |  |  |                 pipeline.zremrangebyscore(self.key, 0, current_timestamp()) |  |  |  |                 pipeline.zremrangebyscore(self.key, 0, score) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |                 pipeline.execute() |  |  |  |                 pipeline.execute() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |         return job_ids |  |  |  |         return job_ids | 
			
		
	
	
		
		
			
				
					|  |  | @ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry): | 
			
		
	
		
		
			
				
					
					|  |  |  |         super(FinishedJobRegistry, self).__init__(name, connection) |  |  |  |         super(FinishedJobRegistry, self).__init__(name, connection) | 
			
		
	
		
		
			
				
					
					|  |  |  |         self.key = 'rq:finished:%s' % name |  |  |  |         self.key = 'rq:finished:%s' % name | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     def cleanup(self): |  |  |  |     def cleanup(self, timestamp=None): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         """Remove expired jobs from registry.""" |  |  |  |         """Remove expired jobs from registry. | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         self.connection.zremrangebyscore(self.key, 0, current_timestamp()) |  |  |  | 
 | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         Removes jobs with an expiry time earlier than timestamp, specified as | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         seconds since the Unix epoch. timestamp defaults to call time if | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         unspecified. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         """ | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         score = timestamp if timestamp is not None else current_timestamp() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         self.connection.zremrangebyscore(self.key, 0, score) | 
			
		
	
	
		
		
			
				
					|  |  | 
 |