Source code for carbontube.util

# -*- coding: utf-8 -*-
# <carbontube - distributed pipeline framework>
#
# Copyright (C) <2018>  Gabriel Falcão <gabriel@nacaolivre.org>
# (C) Author: Gabriel Falcão <gabriel@nacaolivre.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import io
import re
import zlib
import pickle
from plant import Node
from agentzero.serializers import BaseSerializer

internal_node = Node(__file__).dir


[docs]class CompressedPickle(BaseSerializer): """Serializes to and from zlib compressed pickle"""
[docs] def pack(self, item): return zlib.compress(pickle.dumps(item))
[docs] def unpack(self, item): return pickle.loads(zlib.decompress(item))
[docs]def sanitize_name(name): """ensures that a job type or pipeline name are safe for storage and handling. :param name: the string :returns: a safe string """ return re.sub(r'[^\w_-]+', '_', name).strip('_')
[docs]def parse_port(address): """parses the port from a zmq tcp address :param address: the string of address :returns: an ``int`` or ``None`` """ found = re.search(r':(\d+)', address) if found: return int(found.group(1))
[docs]def read_internal_file(path): """reads an internal file, mostly used for loading lua scripts""" target = internal_node.join(path) return io.open(target, 'rb').read()