Tributary

Python Data Streams

https://dev.azure.com/tpaine154/tributary/_apis/build/status/timkpaine.tributary?branchName=masterBuild Status https://img.shields.io/github/issues/timkpaine/tributary.svgGitHub issues https://img.shields.io/azure-devops/coverage/tpaine154/tributary/2/masterCoverage https://img.shields.io/pypi/l/tributary.svgPyPI https://img.shields.io/pypi/v/tributary.svgPyPI https://img.shields.io/readthedocs/tributary.svgDocs

Tributary is a library for constructing dataflow graphs in python. Unlike many other DAG libraries in python (airflow, luigi, prefect, dagster, dask, kedro, etc), tributary is not designed with data/etl pipelines or scheduling in mind. Instead, tributary is more similar to libraries like mdf, pyungo, streamz, or pyfunctional, in that it is designed to be used as the implementation for a data model. One such example is the greeks library, which leverages tributary to build data models for options pricing.

https://raw.githubusercontent.com/timkpaine/tributary/master/docs/img/example.gif

Installation

Install with pip:

pip install tributary

or with conda:

conda install -c conda-forge tributary

or from source:

python setup.py install

Note: If installing from source or with pip, you’ll also need Graphviz itself if you want to visualize the graph using the .graphviz() method.

Stream Types

Tributary offers several kinds of streams:

Streaming

These are synchronous, reactive data streams, built using asynchronous python generators. They are designed to mimic complex event processors in terms of event ordering.

Functional

These are functional streams, built by currying python functions (callbacks).

Lazy

These are lazily-evaluated python streams, where outputs are propogated only as inputs change. They are implemented as directed acyclic graphs.

Examples

  • Streaming: In this example, we construct a variety of forward propogating reactive graphs.
  • Lazy: In this example, we construct a variety of lazily-evaluated directed acyclic computation graphs.
  • Automatic Differentiation: In this example, we use tributary to perform automatic differentiation on both lazy and streaming graphs.

Graph Visualization

You can visualize the graph with Graphviz. All streaming and lazy nodes support a graphviz method.

Streaming and lazy nodes also support ipydagred3 for live update monitoring.

Streaming

https://raw.githubusercontent.com/timkpaine/tributary/master/docs/img/streaming/dagred3.gif

Here green indicates executing, yellow indicates stalled for backpressure, and red indicates that StreamEnd has been propogated (e.g. stream has ended).

Lazy

https://raw.githubusercontent.com/timkpaine/tributary/master/docs/img/lazy/dagred3.gif

Here green indicates executing, and red indicates that the node is dirty. Note the the determination if a node is dirty is also done lazily (we can check with isDirty whcih will update the node’s graph state.

Sources and Sinks

Sources

  • Python Function/Generator/Async Function/Async Generator
  • Random - generates a random dictionary of values
  • File - streams data from a file, optionally loading each line as a json
  • Kafka - streams data from kafka
  • Websocket - strams data from a websocket
  • Http - polls a url with GET requests, streams data out
  • SocketIO - streams data from a socketIO connection

Sinks

  • File - data to a file
  • Kafka - streams data to kafka
  • Http - POSTs data to an url
  • Websocket - streams data to a websocket
  • SocketIO - streams data to a socketIO connection

Transforms

Modulate

  • Delay - Streaming wrapper to delay a stream
  • Apply - Streaming wrapper to apply a function to an input stream
  • Window - Streaming wrapper to collect a window of values
  • Unroll - Streaming wrapper to unroll an iterable stream
  • UnrollDataFrame - Streaming wrapper to unroll a dataframe into a stream
  • Merge - Streaming wrapper to merge 2 inputs into a single output
  • ListMerge - Streaming wrapper to merge 2 input lists into a single output list
  • DictMerge - Streaming wrapper to merge 2 input dicts into a single output dict. Preference is given to the second input (e.g. if keys overlap)
  • Reduce - Streaming wrapper to merge any number of inputs

Calculations

Arithmetic Operators

  • Noop (unary) - Pass input to output
  • Negate (unary) - -1 * input
  • Invert (unary) - 1/input
  • Add (binary) - add 2 inputs
  • Sub (binary) - subtract second input from first
  • Mult (binary) - multiple inputs
  • Div (binary) - divide first input by second
  • RDiv (binary) - divide second input by first
  • Mod (binary) - first input % second input
  • Pow (binary) - first input^second input
  • Sum (n-ary) - sum all inputs
  • Average (n-ary) - average of all inputs

Boolean Operators

  • Not (unary) - Not input
  • And (binary) - And inputs
  • Or (binary) - Or inputs

Comparators

  • Equal (binary) - inputs are equal
  • NotEqual (binary) - inputs are not equal
  • Less (binary) - first input is less than second input
  • LessOrEqual (binary) - first input is less than or equal to second input
  • Greater (binary) - first input is greater than second input
  • GreaterOrEqual (binary) - first input is greater than or equal to second input

Math

  • Log (unary)
  • Sin (unary)
  • Cos (unary)
  • Tan (unary)
  • Arcsin (unary)
  • Arccos (unary)
  • Arctan (unary)
  • Sqrt (unary)
  • Abs (unary)
  • Exp (unary)
  • Erf (unary)

Converters

  • Int (unary)
  • Float (unary)
  • Bool (unary)
  • Str (unary)

Python Builtins

  • Len (unary)

Rolling

  • RollingCount - Node to count inputs
  • RollingMin - Node to take rolling min of inputs
  • RollingMax - Node to take rolling max of inputs
  • RollingSum - Node to take rolling sum inputs
  • RollingAverage - Node to take the running average
  • SMA - Node to take the simple moving average over a window
  • EMA - Node to take an exponential moving average over a window

Node Type Converters

  • Lazy->Streaming

API Documentation

API

Streaming

tributary.streaming.run(node)[source]
class tributary.streaming.base.StreamingGraph(output_node)[source]

Bases: object

dagre()[source]
graph()[source]
graphviz()[source]
run()[source]
tributary.streaming.utils.Apply(node, foo, foo_kwargs=None)[source]

Streaming wrapper to apply a function to an input stream

Parameters:
  • node (node) – input stream
  • foo (callable) – function to apply
  • foo_kwargs (dict) – kwargs for function
tributary.streaming.utils.Delay(node, delay=1)[source]

Streaming wrapper to delay a stream

Parameters:
  • node (node) – input stream
  • delay (float) – time to delay input stream
tributary.streaming.utils.DictMerge(node1, node2)[source]
Streaming wrapper to merge 2 input dicts into a single output dict.
Preference is given to the second input (e.g. if keys overlap)
Parameters:
  • node1 (node) – input stream
  • node2 (node) – input stream
tributary.streaming.utils.FixedMap(node, count, mapper=None)[source]

Streaming wrapper to split stream into a fixed number of outputs

Parameters:
  • node (Node) – input stream
  • count (int) – number of output nodes to generate
  • mapper (function) – how to map the inputs into count streams
tributary.streaming.utils.ListMerge(node1, node2)[source]

Streaming wrapper to merge 2 input lists into a single output list

Parameters:
  • node1 (node) – input stream
  • node2 (node) – input stream
tributary.streaming.utils.Merge(node1, node2)[source]

Streaming wrapper to merge 2 inputs into a single output

Parameters:
  • node1 (node) – input stream
  • node2 (node) – input stream
tributary.streaming.utils.Reduce(*nodes, reducer=None)[source]

Streaming wrapper to merge any number of inputs

Parameters:
  • nodes (tuple) – input streams
  • reducer (function) – how to map the outputs into one stream
tributary.streaming.utils.Unroll(node)[source]

Streaming wrapper to unroll an iterable stream

Parameters:node (node) – input stream
tributary.streaming.utils.UnrollDataFrame(node, json=False, wrap=False)[source]

Streaming wrapper to unroll a dataframe into a stream

Parameters:node (node) – input stream
tributary.streaming.utils.Window(node, size=-1, full_only=False)[source]

Streaming wrapper to collect a window of values

Parameters:
  • node (node) – input stream
  • size (int) – size of windows to use
  • full_only (bool) – only return if list is full

Input

class tributary.streaming.input.file.File(filename, json=True)[source]

Bases: tributary.streaming.input.input.Foo

Open up a file and yield back lines in the file

Parameters:
  • filename (str) – filename to read
  • json (bool) – load file line as json
class tributary.streaming.input.http.HTTP(url, interval=1, repeat=1, json=False, wrap=False, field=None, proxies=None, cookies=None)[source]

Bases: tributary.streaming.input.input.Foo

Connect to url and yield results

Parameters:
  • url (str) – url to connect to
  • interval (int) – interval to re-query
  • repeat (int) – number of times to request
  • json (bool) – load http content data as json
  • wrap (bool) – wrap result in a list
  • field (str) – field to index result by
  • proxies (list) – list of URL proxies to pass to requests.get
  • cookies (list) – list of cookies to pass to requests.get
class tributary.streaming.input.kafka.Kafka(servers, group, topics, json=False, wrap=False, interval=1, **consumer_kwargs)[source]

Bases: tributary.streaming.input.input.Foo

Connect to kafka server and yield back results

Parameters:
  • servers (list) – kafka bootstrap servers
  • group (str) – kafka group id
  • topics (list) – list of kafka topics to connect to
  • json (bool) – load input data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – kafka poll interval
class tributary.streaming.input.socketio.SocketIO(url, channel='', field='', sendinit=None, json=False, wrap=False, interval=1)[source]

Bases: tributary.streaming.input.input.Foo

Connect to socketIO server and yield back results

Parameters:
  • url (str) – url to connect to
  • channel (str) – socketio channel to connect through
  • field (str) – field to index result by
  • sendinit (list) – data to send on socketio connection open
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – socketio wai interval
class tributary.streaming.input.ws.WebSocket(url, json=False, wrap=False)[source]

Bases: tributary.streaming.input.input.Foo

Connect to websocket and yield back results

Parameters:
  • url (str) – websocket url to connect to
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list

Output

tributary.streaming.output.http.HTTP(node, url='', json=False, wrap=False, field=None, proxies=None, cookies=None)[source]

Connect to url and post results to it

Parameters:
  • node (Node) – input tributary
  • url (str) – url to post to
  • json (bool) – dump data as json
  • wrap (bool) – wrap input in a list
  • field (str) – field to index result by
  • proxies (list) – list of URL proxies to pass to requests.post
  • cookies (list) – list of cookies to pass to requests.post
tributary.streaming.output.kafka.Kafka(node, servers='', topic='', json=False, wrap=False, **producer_kwargs)[source]

Connect to kafka server and send data

Parameters:
  • node (Node) – input tributary
  • servers (list) – kafka bootstrap servers
  • topic (str) – kafka topic to connect to
  • json (bool) – load input data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – kafka poll interval
tributary.streaming.output.socketio.SocketIO(node, url, channel='', field='', sendinit=None, json=False, wrap=False, interval=1)[source]

Connect to socketIO server and send updates

Parameters:
  • node (Node) – input stream
  • url (str) – url to connect to
  • channel (str) – socketio channel to connect through
  • field (str) – field to index result by
  • sendinit (list) – data to send on socketio connection open
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – socketio wai interval
tributary.streaming.output.ws.WebSocket(node, url='', json=False, wrap=False, field=None, response=False, response_timeout=1)[source]

Connect to websocket and send data

Parameters:
  • node (Node) – input tributary
  • url (str) – websocket url to connect to
  • json (bool) – dump data as json
  • wrap (bool) – wrap result in a list

Calculations

tributary.streaming.calculations.ops.Abs(self)
tributary.streaming.calculations.ops.Add(self, other)
tributary.streaming.calculations.ops.And(self, other)
tributary.streaming.calculations.ops.Arccos(self)
tributary.streaming.calculations.ops.Arcsin(self)
tributary.streaming.calculations.ops.Arctan(self)
tributary.streaming.calculations.ops.Average(*others)
tributary.streaming.calculations.ops.Bool(self)
tributary.streaming.calculations.ops.Ceil(self)
tributary.streaming.calculations.ops.Cos(self)
tributary.streaming.calculations.ops.Div(self, other)
tributary.streaming.calculations.ops.Equal(self, other)
tributary.streaming.calculations.ops.Erf(self)
tributary.streaming.calculations.ops.Exp(self)
tributary.streaming.calculations.ops.Float(self)
tributary.streaming.calculations.ops.Floor(self)
tributary.streaming.calculations.ops.Ge(self, other)
tributary.streaming.calculations.ops.Gt(self, other)
tributary.streaming.calculations.ops.Int(self)
tributary.streaming.calculations.ops.Invert(self)
tributary.streaming.calculations.ops.Le(self, other)
tributary.streaming.calculations.ops.Len(self)
tributary.streaming.calculations.ops.Log(self)
tributary.streaming.calculations.ops.Lt(self, other)
tributary.streaming.calculations.ops.Mod(self, other)
tributary.streaming.calculations.ops.Mult(self, other)
tributary.streaming.calculations.ops.Negate(self)
tributary.streaming.calculations.ops.Noop(self)
tributary.streaming.calculations.ops.Not(self)
tributary.streaming.calculations.ops.NotEqual(self, other)
tributary.streaming.calculations.ops.Or(self, other)
tributary.streaming.calculations.ops.Pow(self, other)
tributary.streaming.calculations.ops.RDiv(self, other)
tributary.streaming.calculations.ops.Round(self, ndigits=0)[source]
tributary.streaming.calculations.ops.Sin(self)
tributary.streaming.calculations.ops.Sqrt(self)
tributary.streaming.calculations.ops.Str(self)
tributary.streaming.calculations.ops.Sub(self, other)
tributary.streaming.calculations.ops.Sum(*others)
tributary.streaming.calculations.ops.Tan(self)
tributary.streaming.calculations.ops.binary(foos, name)[source]
tributary.streaming.calculations.ops.n_ary(foos, name)[source]
tributary.streaming.calculations.ops.unary(foos, name)[source]
tributary.streaming.calculations.rolling.Average(node)[source]

Node to take the running average

If stream type is iterable, will do (average + sum(input))/(count+len(input)). If input stream type is not iterable, will do (average + input)/count

tributary.streaming.calculations.rolling.Count(node)[source]

Node to count inputs

Parameters:node (Node) – input stream
tributary.streaming.calculations.rolling.EMA(node, window_width=10, full_only=False)[source]

Node to take the exponential moving average over a window of ticks

Parameters:
  • node (node) – input stream
  • window_width (int) – size of window to use
  • full_only (bool) – only return if list is full
tributary.streaming.calculations.rolling.First(node)[source]

Node to return the first value encountered

tributary.streaming.calculations.rolling.Last(node)[source]

Node to return the last value encountered

tributary.streaming.calculations.rolling.Max(node)[source]

Node to take rolling max of inputs

Parameters:node (Node) – input stream
tributary.streaming.calculations.rolling.Min(node)[source]

Node to take rolling min of inputs

Parameters:node (Node) – input stream
tributary.streaming.calculations.rolling.SMA(node, window_width=10, full_only=False)[source]

Node to take the simple moving average over a window of ticks

Parameters:
  • node (node) – input stream
  • window_width (int) – size of window to use
  • full_only (bool) – only return if list is full
tributary.streaming.calculations.rolling.Sum(node)[source]

Node to take rolling sum inputs

If stream type is iterable, will do += sum(input). If input stream type is not iterable, will do += input.

Parameters:node (Node) – input stream

Functional

tributary.functional.pipeline(foos, foo_callbacks, foo_kwargs=None, on_data=<built-in function print>, on_data_kwargs=None)[source]

Pipeline a sequence of functions together via callbacks

Parameters:
  • foos (list of callables) – list of functions to pipeline
  • foo_callbacks (List[str]) – list of strings indicating the callback names (kwargs of the foos)
  • foo_kwargs (List[dict]) –
  • on_data (callable) – callable to call at the end of the pipeline
  • on_data_kwargs (dict) – kwargs to pass to the on_data function>?
tributary.functional.run_submit(fn, function_to_call, *args, **kwargs)[source]
tributary.functional.stop()[source]

Stop the executor for the pipeline runtime

tributary.functional.submit(fn, *args, **kwargs)[source]

Submit a function to be run on the executor (internal)

Parameters:
  • fn (callable) – function to call
  • args (tuple) – args to pass to function
  • kwargs (dict) – kwargs to pass to function
tributary.functional.wrap(function, *args, **kwargs)[source]

wrap a function in a partial

tributary.functional.utils.map(data, *callbacks)[source]

Pass data to multiple callbacks

Parameters:
  • data (any) – data to pass to all callbacks
  • callbacks (tuple) – callbacks to pass data to
tributary.functional.utils.merge(data1, data2, callback)[source]

merge two data sources into one callback

Parameters:
  • data1 (any) – first data to pass to callback
  • data2 (any) – second data to pass to callback
  • callback (callable) – callback to pass data to
tributary.functional.utils.reduce(callback, *datas)[source]

merge multiple data sources into one callback

Parameters:
  • callback (callable) – callback to pass data to
  • datas (tuple) – data to pass to callback
tributary.functional.utils.split(data, callback1, callback2)[source]

Pass data to 2 callbacks

Parameters:
  • data (any) – data to pass to both callbacks
  • callback1 (callable) – first function to call
  • callback2 (callable) – second function to call

Input

tributary.functional.input.http(url, callback, interval=1, repeat=1, json=False, wrap=False, field=None, proxies=None, cookies=None)[source]

Connect to url and pipe results through the callback

Parameters:
  • url (str) – url to connect to
  • callback (callable) – function to call on websocket data
  • interval (int) – interval to re-query
  • repeat (int) – number of times to request
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list
  • field (str) – field to index result by
  • proxies (list) – list of URL proxies to pass to requests.get
  • cookies (list) – list of cookies to pass to requests.get
tributary.functional.input.kafka(callback, servers, group, topics, json=False, wrap=False, interval=1)[source]

Connect to kafka server and pipe results through the callback

Parameters:
  • callback (callable) – function to call on websocket data
  • servers (list) – kafka bootstrap servers
  • group (str) – kafka group id
  • topics (list) – list of kafka topics to connect to
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – socketio wai interval
tributary.functional.input.socketio(url, callback, channel='', field='', sendinit=None, json=False, wrap=False, interval=1)[source]

Connect to socketIO server and pipe results through the callback

Parameters:
  • url (str) – url to connect to
  • callback (callable) – function to call on websocket data
  • channel (str) – socketio channel to connect through
  • field (str) – field to index result by
  • sendinit (list) – data to send on socketio connection open
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list
  • interval (int) – socketio wai interval
tributary.functional.input.ws(url, callback, json=False, wrap=False)[source]

Connect to websocket and pipe results through the callback

Parameters:
  • url (str) – websocket url to connect to
  • callback (callable) – function to call on websocket data
  • json (bool) – load websocket data as json
  • wrap (bool) – wrap result in a list

Output

Lazy

Input

Output

Calculations

tributary.lazy.calculations.ops.Abs(self)[source]
tributary.lazy.calculations.ops.Add(self, other)[source]
tributary.lazy.calculations.ops.And(self, other)[source]
tributary.lazy.calculations.ops.Arccos(self)[source]
tributary.lazy.calculations.ops.Arcsin(self)[source]
tributary.lazy.calculations.ops.Arctan(self)[source]
tributary.lazy.calculations.ops.Average(self, *others)[source]
tributary.lazy.calculations.ops.Bool(self)[source]
tributary.lazy.calculations.ops.Ceil(self)[source]
tributary.lazy.calculations.ops.Cos(self)[source]
tributary.lazy.calculations.ops.Div(self, other)[source]
tributary.lazy.calculations.ops.Equal(self, other)[source]
tributary.lazy.calculations.ops.Erf(self)[source]
tributary.lazy.calculations.ops.Exp(self)[source]
tributary.lazy.calculations.ops.Float(self)[source]
tributary.lazy.calculations.ops.Floor(self)[source]
tributary.lazy.calculations.ops.Ge(self, other)[source]
tributary.lazy.calculations.ops.Gt(self, other)[source]
tributary.lazy.calculations.ops.Int(self)[source]
tributary.lazy.calculations.ops.Invert(self)[source]
tributary.lazy.calculations.ops.Le(self, other)[source]
tributary.lazy.calculations.ops.Len(self)[source]
tributary.lazy.calculations.ops.Log(self)[source]
tributary.lazy.calculations.ops.Lt(self, other)[source]
tributary.lazy.calculations.ops.Mod(self, other)[source]
tributary.lazy.calculations.ops.Mult(self, other)[source]
tributary.lazy.calculations.ops.Negate(self)[source]
tributary.lazy.calculations.ops.Not(self)[source]
tributary.lazy.calculations.ops.NotEqual(self, other)[source]
tributary.lazy.calculations.ops.Or(self, other)[source]
tributary.lazy.calculations.ops.Pow(self, other)[source]
tributary.lazy.calculations.ops.Round(self, ndigits=0)[source]
tributary.lazy.calculations.ops.Sin(self)[source]
tributary.lazy.calculations.ops.Sqrt(self)[source]
tributary.lazy.calculations.ops.Str(self)[source]
tributary.lazy.calculations.ops.Sub(self, other)[source]
tributary.lazy.calculations.ops.Sum(self, *others)[source]
tributary.lazy.calculations.ops.Tan(self)[source]
tributary.lazy.calculations.ops.binary(node1, other, name, lam)[source]
tributary.lazy.calculations.ops.n_nary(node, others, name, lam)[source]
tributary.lazy.calculations.ops.unary(node, name, lam)[source]

Symbolic

tributary.symbolic.construct_lazy(expr, modules=None)[source]

Construct Lazy tributary class from sympy expression

Parameters:
  • expr (sympy expression) – A Sympy expression
  • modules (list) – a list of modules to use for sympy’s lambdify function
Returns:

tributary.lazy.LazyGraph

tributary.symbolic.construct_streaming(expr, modules=None)[source]

Construct streaming tributary class from sympy expression

Parameters:
  • expr (sympy expression) – A Sympy expression
  • modules (list) – a list of modules to use for sympy’s lambdify function

Returns:

tributary.symbolic.graphviz(expr)[source]

Plot sympy expression tree using graphviz :param expr: :type expr: sympy expression

tributary.symbolic.parse_expression(expr)[source]

Parse string as sympy expression :param expr: string to convert to sympy expression :type expr: string

tributary.symbolic.symbols(expr)[source]

Get symbols used in sympy expression :param expr: :type expr: sympy expression

tributary.symbolic.traversal(expr)[source]

Traverse sympy expression tree :param expr: :type expr: sympy expression

Common

class tributary.base.StreamEnd[source]

Bases: object

Indicates that a stream has nothing left in it

instance = None
class tributary.base.StreamNone[source]

Bases: object

indicates that a stream does not have a value

all_bin_ops(other)[source]
all_un_ops()[source]
float()[source]
instance = None
int()[source]
class tributary.base.StreamRepeat[source]

Bases: object

Indicates that a stream has a gap, this object should be ignored and the previous action repeated

instance = None
tributary.thread.run(target, timeout=1)[source]

Helper for running a thread

Parameters:
  • target (function) – function to run on a thread
  • timeout (int) – how long to wait for target to return
Returns:

result of the function

Return type:

data

tributary.utils.LazyToStreaming(lazy_node)[source]