Basic Usage¶
Instalation¶
pip install carbontube
Defining Phases¶
import os
import uuid
import hashlib
from carbontube import Phase, Pipeline
from carbontube.storage import RedisStorageBackend
class GenerateFile(Phase):
job_type = 'generate-file'
def execute(self, instructions):
size = instructions.get('size')
if not size:
return
path = '/tmp/example-{0}.disposable'.format(uuid.uuid4())
data = '\n'.join([str(uuid.uuid4()) for _ in range(size)])
open(path, 'wb').write(data)
return {'file_path': path}
class HashFile(Phase):
job_type = 'calculate-hash'
def execute(self, instructions):
if 'file_path' not in instructions:
return
file_path = instructions['file_path']
if not os.path.exists(file_path):
msg = "Failed to hash file {0}: does not exist".format(file_path)
self.logger.warning(msg)
raise RuntimeError(msg)
data = open(file_path, 'rb').read()
return {'hash': hashlib.sha1(data).hexdigest(), 'file_path': instructions['file_path']}
class RemoveFile(Phase):
job_type = 'delete-file'
def execute(self, instructions):
path = instructions.get('file_path')
if path and os.path.exists(path):
os.unlink(path)
return {'deleted_path': path}
raise RuntimeError('file already deleted: {0}'.format(path))
class Example1(Pipeline):
name = 'example-one'
phases = [
GenerateFile,
RemoveFile
]
def initialize(self):
self.backend = RedisStorageBackend(self.name, redis_uri='redis://127.0.0.1:6379')
Running the servers¶
# run the pipeline
carbontube pipeline examples/simple.py example-one \
--sub-bind=tcp://127.0.0.1:6000 \
--job-pull=tcp://127.0.0.1:5050
# then execute the phases separately, they will bind to random
# local tcp ports and announce their address to the pipeline
# subscriber
carbontube phase examples/simple.py generate-file \
--sub-connect=tcp://127.0.0.1:6000
carbontube phase examples/simple.py calculate-hash \
--sub-connect=tcp://127.0.0.1:6000
carbontube phase examples/simple.py delete-file \
--sub-connect=tcp://127.0.0.1:6000
Feeding the pipeline with jobs¶
in the console¶
carbontube enqueue tcp://127.0.0.1:5050 example1 "{\"size\": 10}"
in python¶
from carbontube.clients import PipelineClient
client = PipelineClient("tcp://127.0.0.1:5050")
client.connect()
job = {
'name': 'example1'
'instructions': {}
}
ok, payload = client.enqueue_job(job)
if ok:
print "JOB ENQUEUED!"
else:
print "PIPELINE'S BUFFER IS BUSY, TRY AGAIN LATER"