Internals Reference¶
Servers¶
-
class
carbontube.servers.
Pipeline
(name, concurrency=10, backend_class=<class 'carbontube.storage.inmemory.EphemeralStorageBackend'>)[source]¶ Pipeline server class
A pipeline must be defined only after you already at least one
Phase
.-
handle_finished_job
(job)[source]¶ called when a job just finished processing.
When overriding this method make sure to call
super()
first
-
initialize
()[source]¶ Initializes the backend.
Subclasses can overload this in order to define their own backends.
-
Clients¶
-
class
carbontube.clients.
PipelineClient
(address, hwm=10)[source]¶ Pipeline client
Has the ability to push jobs to a pipeline server
-
enqueue_job
(data)[source]¶ pushes a job to the pipeline.
- Note that the data must be a dictionary with the following
- keys:
name
- the pipeline nameinstructions
- a dictionary with instructions for the first phase to execute
Parameters: data – the dictionary with the formatted payload. Returns: the payload sent to the server, which contains the job id EXAMPLE:
>>> from carbontube.clients import PipelineClient >>> properly_formatted = { ... "name": "example1", ... "instructions": { ... "size": 100", ... }, ... } >>> client = PipelineClient('tcp://127.0.0.1:5050') >>> client.connect() >>> ok, payload_sent = client.enqueue_job(properly_formatted)
-
Storage Backends¶
-
class
carbontube.storage.
BaseStorageBackend
(name, *args, **kw)[source]¶ base class for storage backends
-
connect
()[source]¶ this method is called by the pipeline once it started to listen on zmq sockets, so this is also an appropriate time to implement your own connection to a database in a backend subclass pass
-
consume_job_of_type
(job_type)[source]¶ dequeues a job for the given type. must return None when no job is ready.
Make sure to requeue this job in case it could not be fed into an immediate worker.
-
get_next_available_worker_for_type
(job_type)[source]¶ randomly picks a workers that is currently available
-
initialize
()[source]¶ backend-specific constructor. This method must be overriden by subclasses in order to setup database connections and such
-
-
class
carbontube.storage.
EphemeralStorageBackend
(name, *args, **kw)[source]¶ in-memory storage backend. It dies with the process and has no option for persistence whatsoever. Used only for testing purposes.
-
connect
()[source]¶ this method is called by the pipeline once it started to listen on zmq sockets, so this is also an appropriate time to implement your own connection to a database in a backend subclass pass
-
consume_job_of_type
(job_type)[source]¶ dequeues a job for the given type. must return None when no job is ready.
Make sure to requeue this job in case it could not be fed into an immediate worker.
-
get_next_available_worker_for_type
(job_type)[source]¶ randomly picks a workers that is currently available
-
initialize
()[source]¶ backend-specific constructor. This method must be overriden by subclasses in order to setup database connections and such
-
-
class
carbontube.storage.
RedisStorageBackend
(name, *args, **kw)[source]¶ Redis Storage Backend
-
connect
()[source]¶ this method is called by the pipeline once it started to listen on zmq sockets, so this is also an appropriate time to implement your own connection to a database in a backend subclass pass
-
get_next_available_worker_for_type
(job_type)[source]¶ randomly picks a workers that is currently available
-
initialize
(redis_uri='redis://', worker_availability_timeout=300)[source]¶ backend-specific constructor. This method must be overriden by subclasses in order to setup database connections and such
-
Utilities¶
-
class
carbontube.util.
CompressedPickle
(*args, **kw)[source]¶ Serializes to and from zlib compressed pickle
-
carbontube.util.
parse_port
(address)[source]¶ parses the port from a zmq tcp address
Parameters: address – the string of address Returns: an int
orNone