Source code for carbontube.storage.pyredis

# -*- coding: utf-8 -*-
# <carbontube - distributed pipeline framework>
#
# Copyright (C) <2018>  Gabriel Falcão <gabriel@nacaolivre.org>
# (C) Author: Gabriel Falcão <gabriel@nacaolivre.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
import time
import json
import redis
import hashlib
import random
import dj_redis_url
from datetime import datetime
from carbontube.models import Job, Worker
from carbontube.util import sanitize_name, read_internal_file
from carbontube.storage.base import BaseStorageBackend


class JobNotFound(Exception):
    pass


def dt_to_string(dt):
    return dt.strftime("%Y-%m-%d %H:%M:%S")


def utc_now_string():
    return dt_to_string(datetime.utcnow())


def read_lua_script(name):
    """reads a lua script from "carbontube/storage/redis-lua-scripts/<name>"""
    path = 'storage/redis-lua-scripts/{0}'.format(name)
    return read_internal_file(path).strip()


def create_new_pool_from_uri(redis_uri):
    parts = dj_redis_url.parse(redis_uri)
    if not parts:
        raise RuntimeError('invalid uri: {0}'.format(repr(redis_uri)))

    params = dict(map((lambda (k, v): (k.lower(), v)),
                      parts.items()))

    return redis.ConnectionPool(**params)


class KeyManager(object):
    PREFIX = 'carbontube'

    JOB_STATES = {
        "enqueued",
        "running",
        "success",
        "failed",
    }

    def __init__(self, name):
        self.name = sanitize_name(name)

    def prefix(self, *parts):
        items = [self.PREFIX]
        items.extend(parts)
        return ":".join(items)

    def join(self, *parts):
        return self.prefix(self.name, *parts)

    def mainstream(self):
        return self.join('mainstream')

    def sink(self):
        return self.join('sink')

    def available_worker(self, worker_id):
        return self.join('worker', 'available', worker_id)

    def worker_of_job_type(self, job_type):
        return self.join('worker', 'job_type', job_type)

    def job_type_queue(self, job_type, state):
        return self.join('queue', state, 'job_type', job_type)

    def jobs_hash(self):
        return self.join('persisted_jobs')

    def every_other_state_queue(self, job_type, state):
        other_states = self.JOB_STATES.difference({state})
        return [self.job_type_queue(job_type, s) for s in other_states]

    def known_pipelines(self):
        return self.prefix('known_pipelines')

    def known_job_types(self):
        return self.prefix('known_job_types')


[docs]class RedisStorageBackend(BaseStorageBackend): """Redis Storage Backend"""
[docs] def initialize(self, redis_uri='redis://', worker_availability_timeout=300): self.keys = KeyManager(self.pipeline_name) self.pool = None self.redis_uri = redis_uri self.worker_availability_timeout = worker_availability_timeout
def serialize(self, data): if isinstance(data, dict): data = dict(data) return json.dumps(data) def deserialize(self, data): return json.loads(data) @property def redis(self): if not self.pool: raise RuntimeError('the RedisStorageBackend can only be used after .connect() was called') return redis.StrictRedis(connection_pool=self.pool)
[docs] def connect(self): self.pool = create_new_pool_from_uri(self.redis_uri)
[docs] def register_worker(self, worker): worker['checkin'] = time.time() available_key = self.keys.available_worker(worker.id) type_key = self.keys.worker_of_job_type(worker.job_type) pipe = self.redis.pipeline() pipe = pipe.setex(available_key, self.worker_availability_timeout, self.serialize(worker)) pipe = pipe.sadd(type_key, worker.id) return pipe.execute()[0]
[docs] def unregister_worker(self, worker): available_key = self.keys.available_worker(worker.id) type_key = self.keys.worker_of_job_type(worker.job_type) pipe = self.redis.pipeline() pipe = pipe.delete(available_key) pipe = pipe.srem(type_key, worker.id) return pipe.execute()
[docs] def enqueue_job(self, job, state): if state not in self.keys.JOB_STATES: raise RuntimeError('invalid state: {0} ({1})'.format(state, self.keys.JOB_STATES)) job_id = job['id'] job_type = job['job_type'] pipe = self.redis.pipeline() job['state'] = state if state == 'running': job['started_at'] = time.time() if state == 'enqueued': pipe.rpush(self.keys.mainstream(), job_id) pipe = pipe.rpush(self.keys.job_type_queue(job_type, state), job_id) job['enqueued_at'] = time.time() else: pipe = pipe.rpush(self.keys.job_type_queue(job_type, state), job_id) serialized_job = self.serialize(dict(job)) pipe = pipe.hset(self.keys.jobs_hash(), job_id, serialized_job) return pipe.execute()
def consume_job(self): key = self.keys.mainstream() job_id = self.redis.lpop(key) if job_id: data = self.redis.hget(self.keys.jobs_hash(), job_id) job = Job(self.deserialize(data)) self.enqueue_job(job, 'running') return job def register_pipeline(self, name): self.redis.sadd(self.keys.known_pipelines(), name) def list_pipelines(self): return list(self.redis.smembers(self.keys.known_pipelines())) def register_job_type(self, name): self.redis.sadd(self.keys.known_job_types(), name) def list_job_types(self): return list(self.redis.smembers(self.keys.known_job_types())) def list_available_workers_keys(self): keys = self.redis.keys(self.keys.available_worker('*')) return keys def list_keys_for_workers_of_type(self, job_type): type_key = self.keys.worker_of_job_type(job_type) return self.redis.smembers(type_key) def retrieve_all_workers_by_keys(self, keys): pipeline = self.redis.pipeline() for key in keys: pipeline = pipeline.get(key) return map(Worker, map(self.deserialize, filter(bool, pipeline.execute())))
[docs] def get_next_available_worker_for_type(self, job_type): keys = map(self.keys.available_worker, self.list_keys_for_workers_of_type(job_type)) found = list(sorted(self.retrieve_all_workers_by_keys(keys), key=lambda x: x['checkin'], reverse=True)) if found: return found[-1]
def list_all_available_workers(self): items = self.retrieve_all_workers_by_keys(self.list_available_workers_keys()) return self.sort_by_id(items) def retrieve_jobs(self): job_types = self.list_job_types() job_states = list(self.keys.JOB_STATES) result = {} for state_name in job_states: by_type = {} for type_name in job_types: key = self.keys.job_type_queue(type_name, state_name) items = self.sort_by_id(map(self.deserialize, map(lambda job_id: self.get_job_by_id(job_id), self.redis.lrange(key, 0, -1)))) by_type[type_name] = items result[state_name] = by_type key = self.keys.mainstream() result['mainstream'] = map(self.deserialize, map(lambda job_id: self.get_job_by_id(job_id), self.redis.lrange(key, 0, -1))) result['sink'] = self.get_finished_jobs() return result def get_job_by_id(self, job_id): key = self.keys.jobs_hash() return self.redis.hget(key, job_id) def sort_by_id(self, items): return sorted(items, key=lambda x: x['id']) def report_job_running(self, data): job_id = data['id'] job_type = data['job_type'] running_key = self.keys.job_type_queue(job_type, 'running') self.redis.lrem(running_key, 0, job_id) def report_job_completion(self, data): job_id = data['id'] job_type = data['job_type'] success = data.get('success', False) state = success and 'success' or 'failed' running_key = self.keys.job_type_queue(job_type, state) self.redis.lrem(running_key, 0, job_id) def report_pipeline_completion(self, data): pipe = self.redis.pipeline() pipe.lpush(self.keys.sink(), json.dumps(data)) return pipe.execute() def get_finished_jobs(self): return map(json.loads, filter(bool, self.redis.lrange(self.keys.sink(), 0, -1)))
def sha1(data): return hashlib.sha1(data).hexdigest() class LuaBridge(object): scripts = { 'enqueue': read_lua_script('enqueue.lua'), 'consume': read_lua_script('consume.lua'), 'consume_by_type': read_lua_script('consume-by-type.lua'), 'list_pending': read_lua_script('list-pending-jobs.lua'), 'list_pending_by_type': read_lua_script('list-pending-jobs-by-type.lua'), } def __init__(self, parent): self.hashes = dict([(n, sha1(s)) for n, s in self.scripts.items()]) self.actions = {} self.parent = parent def register_script(self, name, script): self.actions[name] = self.redis.register_script(script) def register_all_scripts(self): for name, script in self.scripts.items(): self.register_script(name, script) @property def redis(self): return self.parent.redis def execute(self, name, *args): action = self.actions.get(name) if not action: raise RuntimeError('unregistered action: {0}'.format(name)) return action(keys=[], args=args, client=self.redis)
[docs]class RedisJobStorage(RedisStorageBackend):
[docs] def initialize(self, redis_uri='redis://', worker_availability_timeout=300): self.keys = KeyManager(self.pipeline_name) self.pool = None self.redis_uri = redis_uri self.worker_availability_timeout = worker_availability_timeout self.lua = LuaBridge(self)
[docs] def connect(self): self.pool = create_new_pool_from_uri(self.redis_uri) self.lua.register_all_scripts()
def enqueue(self, pipeline_name, job): args = [ job['id'], job['job_type'], pipeline_name, json.dumps(dict(job)), ] return self.lua.execute('enqueue', *args) def list_pending_jobs_of_type(self, pipeline_name, job_type): args = [ pipeline_name, job_type ] result = self.lua.execute('list_pending_by_type', *args) return map(json.loads, filter(bool, result)) def list_pending_jobs(self, pipeline_name): args = [ pipeline_name, ] result = self.lua.execute('list_pending', *args) return map(json.loads, filter(bool, result)) def consume(self, pipeline_name): args = [ pipeline_name, ] result = self.lua.execute('consume', *args) return json.loads(result) def consume_by_type(self, pipeline_name, job_type): args = [ pipeline_name, job_type, ] result = self.lua.execute('consume_by_type', *args) return json.loads(result)