geowatch.mlops.pipeline_nodes module

The core pipeline data structure for MLOps.

This module outlines the structure for a generic DAG of bash process nodes. It contains examples of generic test pipelines. For the SMART instantiation of project-specific dags see: smart_pipeline.py

The basic idea is that each bash process knows about:

  • its filepath inputs

  • its filepath outputs

  • algorithm parameters

  • performance parameters

  • the command that invokes the job

Given a set of processes, a DAG is built by connecting process ouputs to process inputs. This DAG can then be configured with customized input paths and parameters. The resulting jobs can then be submitted to a cmd_queue.Queue for actual execution.

class geowatch.mlops.pipeline_nodes.Pipeline(nodes=None, config=None, root_dpath=None)[source]

Bases: object

A container for a group of nodes that have been connected.

Allows these connected nodes to be jointly configured and submitted to a cmd-queue for execution. Adds extra bookkeeping jobs that write invoke.sh job_config.sh metadata as well as symlinks between node output directories.

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> node_A1 = ProcessNode(name='node_A1', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A1')
>>> node_A2 = ProcessNode(name='node_A2', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A2')
>>> node_A3 = ProcessNode(name='node_A3', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A3')
>>> node_B1 = ProcessNode(name='node_B1', in_paths={'path1'}, out_paths={'path2': 'dst.txt'}, executable='node_B1')
>>> node_B2 = ProcessNode(name='node_B2', in_paths={'path2'}, out_paths={'path3': 'dst.txt'}, executable='node_B2')
>>> node_B3 = ProcessNode(name='node_B3', in_paths={'path3'}, out_paths={'path4': 'dst.txt'}, executable='node_B3')
>>> node_C1 = ProcessNode(name='node_C1', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C1')
>>> node_C2 = ProcessNode(name='node_C2', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C2')
>>> # You can connect outputs -> inputs directly (RECOMMENDED)
>>> node_A1.outputs['dst'].connect(node_A2.inputs['src'])
>>> node_A2.outputs['dst'].connect(node_A3.inputs['src'])
>>> # You can connect nodes to nodes that share input/output names (NOT RECOMMENDED)
>>> node_B1.connect(node_B2)
>>> node_B2.connect(node_B3)
>>> #
>>> # You can connect nodes to nodes that dont share input/output names
>>> # If you specify the mapping (NOT RECOMMENDED)
>>> node_A3.connect(node_B1, src_map={'dst': 'path1'})
>>> #
>>> # You can connect inputs to other inputs, which effectively
>>> # forwards the input path to the destination
>>> node_A1.inputs['src'].connect(node_C1.inputs['src1'])
>>> # The pipeline is just a container for the nodes
>>> nodes = [node_A1, node_A2, node_A3, node_B1, node_B2, node_B3, node_C1, node_C2]
>>> self = Pipeline(nodes=nodes)
>>> self.print_graphs()
classmethod demo()[source]
submit(executable, **kwargs)[source]

Dynamically create a new unique process node and add it to the dag

Is this ever used? May be able to simplify and remove this.

property node_dict
build_nx_graphs()[source]
inspect_configurables()[source]

Show the user what config options should be specified.

Todo

The idea is that we want to give the user a list of options that they could configure for this pipeline, as well as mark the one that are required / suggested / unnecessary. For now it gives a little bit of that information, but more work could be done to make it nicer.

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> self = Pipeline.demo()
>>> self.inspect_configurables()
configure(config=None, root_dpath=None, cache=True)[source]

Update the DAG configuration

Note

Currently, this will completely reset the config, and not update it. This behavior will change in the future

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> self = Pipeline.demo()
>>> self.configure()
print_process_graph(shrink_labels=1, show_types=0, smart_colors=0)[source]

Draw the networkx process graph, which only shows if there exists a connection between processes, and does not show details of which output connects to which input. See PipelineDAG.print_io_graph() for that level of detail.

print_io_graph(shrink_labels=1, show_types=0, smart_colors=0)[source]

Draw the networkx IO graph, which shows the connections between the inputs and the outputs of the processes in the pipeline.

print_commands(**kwargs)[source]

Helper (mostly for debugging) to show the commands for the current pipeline configuration. This involves making a cmdqueue instance, which is the real object that knows how to structure the commmand sequence.

Parameters:

**kwargs – See cmd_queue.base_queue.Queue.print_commands() with_status=False, with_gaurds=False, with_locks=1, exclude_tags=None, style=’colors’, **kwargs

print_graphs(shrink_labels=1, show_types=0, smart_colors=0)[source]

Prints the Process and IO graph for the DAG.

submit_jobs(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)[source]

Submits the jobs to an existing command queue or creates a new one.

Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.

make_queue(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)

Submits the jobs to an existing command queue or creates a new one.

Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.

geowatch.mlops.pipeline_nodes.bash_printf_literal_string(text, escape_newlines=True)[source]

Not only do we need to make a bash literal string we need to make sure that it is interpreted as literal by printf.

Example

json_text = ‘{“step1.param1”: “- this: \”is text 100% representing\”\n some: \”yaml config\”\n omg: \”single ' quote\”\n eek: 'double \” quote'”}’ json.loads(json_text) import shlex text = json_text literal_bash_json_text = bash_printf_literal_string(json_text) print(json_text) print(literal_bash_json_text) command = f”printf {literal_bash_json_text} | jq” print(command) ub.cmd(command, verbose=3, shell=True)

geowatch.mlops.pipeline_nodes.glob_templated_path(template)[source]

Given an unformated templated path, replace the format parts with “*” and return a glob.

Parameters:

template (str | PathLike) – a path with a {} template pattern

Example

template = ‘/foo{}/bar’ glob_templated_path(template)

class geowatch.mlops.pipeline_nodes.Node(name: str)[source]

Bases: NiceRepr

Abstract base class for a Process or IO Node.

connect(*others, param_mapping=None, src_map=None, dst_map=None)[source]

Connect the outputs of self to the inputs of others.

Conceptually, this creates an edge between the two nodes.

property key
class geowatch.mlops.pipeline_nodes.IONode(name, parent)[source]

Bases: Node

property final_value
property key
class geowatch.mlops.pipeline_nodes.InputNode(name, parent)[source]

Bases: IONode

class geowatch.mlops.pipeline_nodes.OutputNode(name, parent)[source]

Bases: IONode

property final_value
property template_value
matching_fpaths()[source]

Find all paths for this node.

class geowatch.mlops.pipeline_nodes.memoize_configured_method(func)[source]

Bases: object

ubelt memoize_method but uses a special cache name

geowatch.mlops.pipeline_nodes.memoize_configured_property(fget)[source]

ubelt memoize_property but uses a special cache name

class geowatch.mlops.pipeline_nodes.ProcessNode(*, name=None, executable=None, algo_params=None, perf_params=None, in_paths=None, out_paths=None, group_dname=None, root_dpath=None, config=None, node_dpath=None, group_dpath=None, primary_out_key=None, _overwrite_node_dpath=None, _overwrite_group_dpath=None, _no_outarg=False, _no_inarg=False, **aliases)[source]

Bases: Node

Represents a process in the pipeline.

ProcessNodes are connected via their input / output nodes.

You can create an instance of this directly, or inherit from it and set its class variables.

CommandLine

xdoctest -m geowatch.mlops.pipeline_nodes ProcessNode

Notes

When a ProcessNode is used for an evaluation node, it can / should be extended with the following methods:

  • load_result - for evaluation nodes

    In your pipeline class you a method def load_result(self, node_dpath): which returns a flat dot-dictionary of params and results from the node.

  • _default_metrics - returns Tuple of primary and display metric

    suffixes for the node that will be interpreted as the metrics. Note: this is likely to change so the user can specify if metrics need to be minimized / maximized.

  • _default_metrics2 - experimental new way of specifying metric info.

    Should be a list of dictionaries with keys

    suffix (str): the name of the metric

    objective (Optional[str]):

    minimize or maximize (defaults to maximize)

    primary (Optional[bool]):

    if the metric is primary (defaults to False, unless no other metric is primary in which case the first one defaults to True).

    display (Optional[bool]):

    show the column in the stdout table. Defaults to False. Note: any column marked as primary is also displayed.

    aggregator (Optional[bool]):

    how to aggregate this metric when computing macro averages. Can be “mean”, “gmean”, “sum”, “max”, “min”, or “ignore”. Defaults to “mean”.

  • default_vantage_points - should return a List[Dict] with

    metric1 and metric2, used for inspecting relationships between metrics.

These methods are currently used in aggregate_loader and aggregate, but they need to be more clearly defined here. Currently they are hacked in with hasattr, so we can’t define them as abstract methods yet, but in the future we will refactor to make it more clear that these methods should be implemented for evaluation nodes (i.e. nodes that produce metric results).

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> from geowatch.mlops.pipeline_nodes import _classvar_init
>>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode')
>>> dpath.delete().ensuredir()
>>> pycode = ub.codeblock(
...     '''
...     import ubelt as ub
...     src_fpath = ub.Path(ub.argval('--src'))
...     dst_fpath = ub.Path(ub.argval('--dst'))
...     foo = ub.argval('--foo')
...     bar = ub.argval('--bar')
...     new_text = foo + src_fpath.read_text() + bar
...     dst_fpath.write_text(new_text)
...     ''')
>>> src_fpath = dpath / 'here.txt'
>>> src_fpath.write_text('valid input')
>>> dst_fpath = dpath / 'there.txt'
>>> self = ProcessNode(
>>>     name='proc1',
>>>     config={
>>>         'foo': 'baz',
>>>         'bar': 'biz',
>>>         'num_workers': 3,
>>>         'src': src_fpath,
>>>         'dst': dst_fpath
>>>     },
>>>     in_paths={'src'},
>>>     out_paths={'dst': 'there.txt'},
>>>     primary_out_key='dst',
>>>     perf_params={'num_workers'},
>>>     group_dname='predictions',
>>>     #node_dname='proc1/{proc1_algo_id}/{proc1_id}',
>>>     executable=f'python -c "{chr(10)}{pycode}{chr(10)}"',
>>>     root_dpath=dpath,
>>> )
>>> self._finalize_templates()
>>> print('self.command = {}'.format(ub.urepr(self.command, nl=1, sv=1)))
>>> print(f'self.algo_id={self.algo_id}')
>>> print(f'self.root_dpath={self.root_dpath}')
>>> print(f'self.template_node_dpath={self.template_node_dpath}')
>>> print('self.templates = {}'.format(ub.urepr(self.templates, nl=2)))
>>> print('self.final = {}'.format(ub.urepr(self.final, nl=2)))
>>> print('self.condensed = {}'.format(ub.urepr(self.condensed, nl=2)))
>>> print('self.primary_out_key = {}'.format(ub.urepr(self.primary_out_key, nl=2)))

Example

>>> # How to use a ProcessNode to handle an arbitrary process call
>>> # First let's write a program to disk
>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> import stat
>>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode2')
>>> dpath.delete().ensuredir()
>>> pycode = ub.codeblock(
        '''
        #!/usr/bin/env python3
        import scriptconfig as scfg
        import ubelt as ub
class MyCLI(scfg.DataConfig):

src = None dst = None foo = None bar = None

@classmethod def main(cls, cmdline=1, **kwargs):

config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) print(‘config = ‘ + ub.urepr(config, nl=1))

if __name__ == ‘__main__’:

MyCLI.main()

… ‘’’) >>> fpath = dpath / ‘mycli.py’ >>> fpath.write_text(pycode) >>> fpath.chmod(fpath.stat().st_mode | stat.S_IXUSR) >>> # Now that we have a script that accepts some cli arguments >>> # Create a process node to represent it. We assume that >>> # everything is passed as key/val style params, which you should >>> # use for new programs, but this doesnt apply to a lot of programs >>> # out there, so we will show how to handle non key/val arguments >>> # later (todo). >>> mynode = ProcessNode(command=str(fpath)) >>> # Get the invocation by runnning >>> command = mynode.final_command() >>> print(command) >>> # Use a dictionary to configure key/value pairs >>> mynode.configure({‘src’: ‘a.txt’, ‘dst’: ‘b.txt’}) >>> command = mynode.final_command() >>> # Note: currently because of backslash formatting >>> # we need to use shell=1 or system=1 with ub.cmd >>> # in the future we will fix this in ubelt (todo). >>> # Similarly this class should be able to provide the arglist >>> # style of invocation. >>> print(command) >>> ub.cmd(command, verbose=3, shell=1)

name: str | None = None
executable: str | None = None
algo_params: Dict | Set | List | None = None
perf_params: Dict | Set | List | None = None
in_paths: Dict | Set | List | None = None
out_paths: Dict | Set | List | None = None
primary_out_key: str = None
group_dname: str | None = None
configure(config=None, cache=True, enabled=True)[source]

Update the node configuration.

This rebuilds the templates and formats them so the “final” variables take on directory names based on the given configuration. This a

FIXME:

Gotcha: Calling this twice with a new config will reset the configuration. These nodes are stateful, so we should maintain and update state, not reset it. If we need to reset to defaults, there should be a method for that. Can work around by passing self.config as the first argument.

property condensed

This is the dictionary that supplies the templated strings with the values we will finalize them with. We may want to change the name.

property final_config

This is not really “final” in the aggregate sense. It is more of a “finalized” requested config.

property final_perf_config
property final_algo_config
property final_in_paths
property template_out_paths

template out paths are not impacted by out path config overrides, but the final out paths are.

SeeAlso:

ProcessNode.final_out_paths()

Type:

Note

property final_out_paths

These are the locations each output will actually be written to.

This is based on ProcessNode.template_out_paths() as well as any manual overrides specified in self.config.

property final_node_dpath

The configured directory where all outputs are relative to.

property final_root_dpath
property template_group_dpath

The template for the directory where the configured node dpath will be placed.

property template_node_dpath

The template for the configured directory where all outputs are relative to.

property template_root_dpath
predecessor_process_nodes()[source]

Process nodes that this one depends on.

successor_process_nodes()[source]

Process nodes that depend on this one.

ancestor_process_nodes()[source]

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> pipe = Pipeline.demo()
>>> self = pipe.node_dict['node_C1']
>>> ancestors = self.ancestor_process_nodes()
>>> print('ancestors = {}'.format(ub.urepr(ancestors, nl=1)))
property depends

The mapping from ancestor and self node names to their algorithm ids Should probably rename.

property algo_id: str

A unique id to represent the output of a deterministic process.

This does NOT have a dependency on the larger the DAG.

property process_id: str

A unique id to represent the output of a deterministic process in a pipeline. This id combines the hashes of all ancestors in the DAG with its own hashed id.

This DOES have a dependency on the larger DAG.

property inputs

Input nodes representing specific input locations.

The output nodes of other processes can be connected to these. Also input nodes for one process can connect to input nodes of another process representing that they share the same input data.

Returns:

Dict[str, InputNode]

property outputs

Output nodes representing specific output locations. These can be connected to the input nodes of other processes.

Returns:

Dict[str, OutputNode]

property command: str

Returns the string shell command that will execute the process.

Basic version of command, can be overwritten

test_is_computed_command()[source]

Generate a bash command that will test if all output paths exist

Example

>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> self = ProcessNode(out_paths={
>>>     'foo': 'foo.txt',
>>>     'bar': 'bar.txt',
>>>     'baz': 'baz.txt',
>>>     'biz': 'biz.txt',
>>> }, node_dpath='.')
>>> test_cmd = self.test_is_computed_command()
>>> print(test_cmd)
test -e foo.txt -a \
     -e bar.txt -a \
     -e baz.txt -a \
     -e biz.txt
>>> self = ProcessNode(out_paths={
>>>     'foo': 'foo.txt',
>>>     'bar': 'bar.txt',
>>> }, node_dpath='.')
>>> test_cmd = self.test_is_computed_command()
>>> print(test_cmd)
test -e foo.txt -a \
     -e bar.txt
>>> self = ProcessNode(out_paths={
>>>     'foo': 'foo.txt',
>>> }, node_dpath='.')
>>> test_cmd = self.test_is_computed_command()
>>> print(test_cmd)
test -e foo.txt
>>> self = ProcessNode(out_paths={}, node_dpath='.')
>>> test_cmd = self.test_is_computed_command()
>>> print(test_cmd)
None
property does_exist: bool

Check if all of the output paths that would be written by this node already exists.

property outputs_exist: bool

Alias for does_exist

Check if all of the output paths that would be written by this node already exists.

final_command()[source]

Wraps self.command with optional checks to prevent the command from executing if its outputs already exist.

find_template_outputs(workers=8)[source]

Look in the DAG root path for output paths that are complete or unfinished

geowatch.mlops.pipeline_nodes.demodata_pipeline()[source]

A simple test pipeline.

Example

>>> # Self test
>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> demodata_pipeline()
geowatch.mlops.pipeline_nodes.demo_pipeline_run()[source]

A simple test pipeline.

Example

>>> # Self test
>>> from geowatch.mlops.pipeline_nodes import *  # NOQA
>>> demo_pipeline_run()
geowatch.mlops.pipeline_nodes.PipelineDAG

alias of Pipeline

geowatch.mlops.pipeline_nodes.coerce_pipeline(pipeline)[source]

Attempts to resolve a concise expression (typically from the command line) into a pre-defined pipeline.

Parameters:

pipeline (str) – a pre-registered name, or evaluatable code to construct a pipeline.

Returns:

Pipeline