geowatch.mlops.pipeline_nodes module¶
The core pipeline data structure for MLOps.
This module outlines the structure for a generic DAG of bash process nodes. It contains examples of generic test pipelines. For the SMART instantiation of project-specific dags see: smart_pipeline.py
The basic idea is that each bash process knows about:
its filepath inputs
its filepath outputs
algorithm parameters
performance parameters
the command that invokes the job
Given a set of processes, a DAG is built by connecting process ouputs to process inputs. This DAG can then be configured with customized input paths and parameters. The resulting jobs can then be submitted to a cmd_queue.Queue for actual execution.
- class geowatch.mlops.pipeline_nodes.Pipeline(nodes=None, config=None, root_dpath=None)[source]¶
Bases:
objectA container for a group of nodes that have been connected.
Allows these connected nodes to be jointly configured and submitted to a cmd-queue for execution. Adds extra bookkeeping jobs that write invoke.sh job_config.sh metadata as well as symlinks between node output directories.
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> node_A1 = ProcessNode(name='node_A1', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A1') >>> node_A2 = ProcessNode(name='node_A2', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A2') >>> node_A3 = ProcessNode(name='node_A3', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A3') >>> node_B1 = ProcessNode(name='node_B1', in_paths={'path1'}, out_paths={'path2': 'dst.txt'}, executable='node_B1') >>> node_B2 = ProcessNode(name='node_B2', in_paths={'path2'}, out_paths={'path3': 'dst.txt'}, executable='node_B2') >>> node_B3 = ProcessNode(name='node_B3', in_paths={'path3'}, out_paths={'path4': 'dst.txt'}, executable='node_B3') >>> node_C1 = ProcessNode(name='node_C1', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C1') >>> node_C2 = ProcessNode(name='node_C2', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C2') >>> # You can connect outputs -> inputs directly (RECOMMENDED) >>> node_A1.outputs['dst'].connect(node_A2.inputs['src']) >>> node_A2.outputs['dst'].connect(node_A3.inputs['src']) >>> # You can connect nodes to nodes that share input/output names (NOT RECOMMENDED) >>> node_B1.connect(node_B2) >>> node_B2.connect(node_B3) >>> # >>> # You can connect nodes to nodes that dont share input/output names >>> # If you specify the mapping (NOT RECOMMENDED) >>> node_A3.connect(node_B1, src_map={'dst': 'path1'}) >>> # >>> # You can connect inputs to other inputs, which effectively >>> # forwards the input path to the destination >>> node_A1.inputs['src'].connect(node_C1.inputs['src1']) >>> # The pipeline is just a container for the nodes >>> nodes = [node_A1, node_A2, node_A3, node_B1, node_B2, node_B3, node_C1, node_C2] >>> self = Pipeline(nodes=nodes) >>> self.print_graphs()
- submit(executable, **kwargs)[source]¶
Dynamically create a new unique process node and add it to the dag
- property node_dict¶
- inspect_configurables()[source]¶
Show the user what config options should be specified.
Todo
The idea is that we want to give the user a list of options that they could configure for this pipeline, as well as mark the one that are required / suggested / unnecessary. For now it gives a little bit of that information, but more work could be done to make it nicer.
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = Pipeline.demo() >>> self.inspect_configurables()
- configure(config=None, root_dpath=None, cache=True)[source]¶
Update the DAG configuration
Note
Currently, this will completely reset the config, and not update it. This behavior will change in the future
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = Pipeline.demo() >>> self.configure()
- print_graphs(shrink_labels=1, show_types=0, smart_colors=0)[source]¶
Prints the Process and IO graph for the DAG.
- submit_jobs(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)[source]¶
Submits the jobs to an existing command queue or creates a new one.
Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.
- make_queue(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)¶
Submits the jobs to an existing command queue or creates a new one.
Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.
- geowatch.mlops.pipeline_nodes.glob_templated_path(template)[source]¶
Given an unformated templated path, replace the format parts with “*” and return a glob.
- Parameters:
template (str | PathLike) – a path with a {} template pattern
Example
template = ‘/foo{}/bar’ glob_templated_path(template)
- class geowatch.mlops.pipeline_nodes.Node(name: str)[source]¶
Bases:
NiceReprAbstract base class for a Process or IO Node.
- connect(*others, param_mapping=None, src_map=None, dst_map=None)[source]¶
Connect the outputs of
selfto the inputs ofothers.Conceptually, this creates an edge between the two nodes.
- property key¶
- class geowatch.mlops.pipeline_nodes.IONode(name, parent)[source]¶
Bases:
Node- property final_value¶
- property key¶
- class geowatch.mlops.pipeline_nodes.OutputNode(name, parent)[source]¶
Bases:
IONode- property final_value¶
- property template_value¶
- class geowatch.mlops.pipeline_nodes.memoize_configured_method(func)[source]¶
Bases:
objectubelt memoize_method but uses a special cache name
- geowatch.mlops.pipeline_nodes.memoize_configured_property(fget)[source]¶
ubelt memoize_property but uses a special cache name
- class geowatch.mlops.pipeline_nodes.ProcessNode(*, name=None, executable=None, algo_params=None, perf_params=None, in_paths=None, out_paths=None, group_dname=None, root_dpath=None, config=None, node_dpath=None, group_dpath=None, _overwrite_node_dpath=None, _overwrite_group_dpath=None, _no_outarg=False, _no_inarg=False, **aliases)[source]¶
Bases:
NodeRepresents a process in the pipeline.
ProcessNodes are connected via their input / output nodes.
You can create an instance of this directly, or inherit from it and set its class variables.
CommandLine
xdoctest -m geowatch.mlops.pipeline_nodes ProcessNode
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> from geowatch.mlops.pipeline_nodes import _classvar_init >>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode') >>> dpath.delete().ensuredir() >>> pycode = ub.codeblock( ... ''' ... import ubelt as ub ... src_fpath = ub.Path(ub.argval('--src')) ... dst_fpath = ub.Path(ub.argval('--dst')) ... foo = ub.argval('--foo') ... bar = ub.argval('--bar') ... new_text = foo + src_fpath.read_text() + bar ... dst_fpath.write_text(new_text) ... ''') >>> src_fpath = dpath / 'here.txt' >>> src_fpath.write_text('valid input') >>> dst_fpath = dpath / 'there.txt' >>> self = ProcessNode( >>> name='proc1', >>> config={ >>> 'foo': 'baz', >>> 'bar': 'biz', >>> 'num_workers': 3, >>> 'src': src_fpath, >>> 'dst': dst_fpath >>> }, >>> in_paths={'src'}, >>> out_paths={'dst': 'there.txt'}, >>> perf_params={'num_workers'}, >>> group_dname='predictions', >>> #node_dname='proc1/{proc1_algo_id}/{proc1_id}', >>> executable=f'python -c "{chr(10)}{pycode}{chr(10)}"', >>> root_dpath=dpath, >>> ) >>> self._finalize_templates() >>> print('self.command = {}'.format(ub.urepr(self.command, nl=1, sv=1))) >>> print(f'self.algo_id={self.algo_id}') >>> print(f'self.root_dpath={self.root_dpath}') >>> print(f'self.template_node_dpath={self.template_node_dpath}') >>> print('self.templates = {}'.format(ub.urepr(self.templates, nl=2))) >>> print('self.final = {}'.format(ub.urepr(self.final, nl=2))) >>> print('self.condensed = {}'.format(ub.urepr(self.condensed, nl=2)))
Example
>>> # How to use a ProcessNode to handle an arbitrary process call >>> # First let's write a program to disk >>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> import stat >>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode2') >>> dpath.delete().ensuredir() >>> pycode = ub.codeblock( ''' #!/usr/bin/env python3 import scriptconfig as scfg import ubelt as ub
- class MyCLI(scfg.DataConfig):
src = None dst = None foo = None bar = None
@classmethod def main(cls, cmdline=1, **kwargs):
config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) print(‘config = ‘ + ub.urepr(config, nl=1))
- if __name__ == ‘__main__’:
MyCLI.main()
… ‘’’) >>> fpath = dpath / ‘mycli.py’ >>> fpath.write_text(pycode) >>> fpath.chmod(fpath.stat().st_mode | stat.S_IXUSR) >>> # Now that we have a script that accepts some cli arguments >>> # Create a process node to represent it. We assume that >>> # everything is passed as key/val style params, which you should >>> # use for new programs, but this doesnt apply to a lot of programs >>> # out there, so we will show how to handle non key/val arguments >>> # later (todo). >>> mynode = ProcessNode(command=str(fpath)) >>> # Get the invocation by runnning >>> command = mynode.final_command() >>> print(command) >>> # Use a dictionary to configure key/value pairs >>> mynode.configure({‘src’: ‘a.txt’, ‘dst’: ‘b.txt’}) >>> command = mynode.final_command() >>> # Note: currently because of backslash formatting >>> # we need to use shell=1 or system=1 with ub.cmd >>> # in the future we will fix this in ubelt (todo). >>> # Similarly this class should be able to provide the arglist >>> # style of invocation. >>> print(command) >>> ub.cmd(command, verbose=3, shell=1)
- configure(config=None, cache=True, enabled=True)[source]¶
Update the node configuration.
This rebuilds the templates and formats them so the “final” variables take on directory names based on the given configuration. This a
- FIXME:
Gotcha: Calling this twice with a new config will reset the configuration. These nodes are stateful, so we should maintain and update state, not reset it. If we need to reset to defaults, there should be a method for that. Can work around by passing self.config as the first argument.
- property condensed¶
This is the dictionary that supplies the templated strings with the values we will finalize them with. We may want to change the name.
- property final_config¶
This is not really “final” in the aggregate sense. It is more of a “finalized” requested config.
- property final_perf_config¶
- property final_algo_config¶
- property final_in_paths¶
- property template_out_paths¶
template out paths are not impacted by out path config overrides, but the final out paths are.
- SeeAlso:
- Type:
Note
- property final_out_paths¶
These are the locations each output will actually be written to.
This is based on
ProcessNode.template_out_paths()as well as any manual overrides specified inself.config.
- property final_node_dpath¶
The configured directory where all outputs are relative to.
- property final_root_dpath¶
- property template_group_dpath¶
The template for the directory where the configured node dpath will be placed.
- property template_node_dpath¶
The template for the configured directory where all outputs are relative to.
- property template_root_dpath¶
- ancestor_process_nodes()[source]¶
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> pipe = Pipeline.demo() >>> self = pipe.node_dict['node_C1'] >>> ancestors = self.ancestor_process_nodes() >>> print('ancestors = {}'.format(ub.urepr(ancestors, nl=1)))
- property depends¶
The mapping from ancestor and self node names to their algorithm ids Should probably rename.
- property algo_id: str¶
A unique id to represent the output of a deterministic process.
This does NOT have a dependency on the larger the DAG.
- property process_id: str¶
A unique id to represent the output of a deterministic process in a pipeline. This id combines the hashes of all ancestors in the DAG with its own hashed id.
This DOES have a dependency on the larger DAG.
- property inputs¶
Input nodes representing specific input locations.
The output nodes of other processes can be connected to these. Also input nodes for one process can connect to input nodes of another process representing that they share the same input data.
- Returns:
Dict[str, InputNode]
- property outputs¶
Output nodes representing specific output locations. These can be connected to the input nodes of other processes.
- Returns:
Dict[str, OutputNode]
- property command: str¶
Returns the string shell command that will execute the process.
Basic version of command, can be overwritten
- test_is_computed_command()[source]¶
Generate a bash command that will test if all output paths exist
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> 'bar': 'bar.txt', >>> 'baz': 'baz.txt', >>> 'biz': 'biz.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt -a \ -e bar.txt -a \ -e baz.txt -a \ -e biz.txt >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> 'bar': 'bar.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt -a \ -e bar.txt >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt >>> self = ProcessNode(out_paths={}, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) None
- property does_exist: bool¶
Check if all of the output paths that would be written by this node already exists.
- property outputs_exist: bool¶
Alias for does_exist
Check if all of the output paths that would be written by this node already exists.
- geowatch.mlops.pipeline_nodes.demodata_pipeline()[source]¶
A simple test pipeline.
Example
>>> # Self test >>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> demodata_pipeline()