geowatch.mlops.pipeline_nodes module¶
The core pipeline data structure for MLOps.
This module outlines the structure for a generic DAG of bash process nodes. It contains examples of generic test pipelines. For the SMART instantiation of project-specific dags see: smart_pipeline.py
The basic idea is that each bash process knows about:
its filepath inputs
its filepath outputs
algorithm parameters
performance parameters
the command that invokes the job
Given a set of processes, a DAG is built by connecting process ouputs to process inputs. This DAG can then be configured with customized input paths and parameters. The resulting jobs can then be submitted to a cmd_queue.Queue for actual execution.
- class geowatch.mlops.pipeline_nodes.Pipeline(nodes=None, config=None, root_dpath=None)[source]¶
Bases:
object
A container for a group of nodes that have been connected.
Allows these connected nodes to be jointly configured and submitted to a cmd-queue for execution. Adds extra bookkeeping jobs that write invoke.sh job_config.sh metadata as well as symlinks between node output directories.
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> node_A1 = ProcessNode(name='node_A1', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A1') >>> node_A2 = ProcessNode(name='node_A2', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A2') >>> node_A3 = ProcessNode(name='node_A3', in_paths={'src'}, out_paths={'dst': 'dst.txt'}, executable='node_A3') >>> node_B1 = ProcessNode(name='node_B1', in_paths={'path1'}, out_paths={'path2': 'dst.txt'}, executable='node_B1') >>> node_B2 = ProcessNode(name='node_B2', in_paths={'path2'}, out_paths={'path3': 'dst.txt'}, executable='node_B2') >>> node_B3 = ProcessNode(name='node_B3', in_paths={'path3'}, out_paths={'path4': 'dst.txt'}, executable='node_B3') >>> node_C1 = ProcessNode(name='node_C1', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C1') >>> node_C2 = ProcessNode(name='node_C2', in_paths={'src1', 'src2'}, out_paths={'dst1': 'dst.txt', 'dst2': 'dst.txt'}, executable='node_C2') >>> # You can connect outputs -> inputs directly (RECOMMENDED) >>> node_A1.outputs['dst'].connect(node_A2.inputs['src']) >>> node_A2.outputs['dst'].connect(node_A3.inputs['src']) >>> # You can connect nodes to nodes that share input/output names (NOT RECOMMENDED) >>> node_B1.connect(node_B2) >>> node_B2.connect(node_B3) >>> # >>> # You can connect nodes to nodes that dont share input/output names >>> # If you specify the mapping (NOT RECOMMENDED) >>> node_A3.connect(node_B1, src_map={'dst': 'path1'}) >>> # >>> # You can connect inputs to other inputs, which effectively >>> # forwards the input path to the destination >>> node_A1.inputs['src'].connect(node_C1.inputs['src1']) >>> # The pipeline is just a container for the nodes >>> nodes = [node_A1, node_A2, node_A3, node_B1, node_B2, node_B3, node_C1, node_C2] >>> self = Pipeline(nodes=nodes) >>> self.print_graphs()
- submit(executable, **kwargs)[source]¶
Dynamically create a new unique process node and add it to the dag
Is this ever used? May be able to simplify and remove this.
- property node_dict¶
- inspect_configurables()[source]¶
Show the user what config options should be specified.
Todo
The idea is that we want to give the user a list of options that they could configure for this pipeline, as well as mark the one that are required / suggested / unnecessary. For now it gives a little bit of that information, but more work could be done to make it nicer.
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = Pipeline.demo() >>> self.inspect_configurables()
- configure(config=None, root_dpath=None, cache=True)[source]¶
Update the DAG configuration
Note
Currently, this will completely reset the config, and not update it. This behavior will change in the future
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = Pipeline.demo() >>> self.configure()
- print_process_graph(shrink_labels=1, show_types=0, smart_colors=0)[source]¶
Draw the networkx process graph, which only shows if there exists a connection between processes, and does not show details of which output connects to which input. See
PipelineDAG.print_io_graph()
for that level of detail.
- print_io_graph(shrink_labels=1, show_types=0, smart_colors=0)[source]¶
Draw the networkx IO graph, which shows the connections between the inputs and the outputs of the processes in the pipeline.
- print_commands(**kwargs)[source]¶
Helper (mostly for debugging) to show the commands for the current pipeline configuration. This involves making a cmdqueue instance, which is the real object that knows how to structure the commmand sequence.
- Parameters:
**kwargs – See
cmd_queue.base_queue.Queue.print_commands()
with_status=False, with_gaurds=False, with_locks=1, exclude_tags=None, style=’colors’, **kwargs
- print_graphs(shrink_labels=1, show_types=0, smart_colors=0)[source]¶
Prints the Process and IO graph for the DAG.
- submit_jobs(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)[source]¶
Submits the jobs to an existing command queue or creates a new one.
Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.
- make_queue(queue=None, skip_existing=False, enable_links=True, write_invocations=True, write_configs=True)¶
Submits the jobs to an existing command queue or creates a new one.
Also takes care of adding special bookkeeping jobs that add helper files and symlinks to node output paths.
- geowatch.mlops.pipeline_nodes.bash_printf_literal_string(text, escape_newlines=True)[source]¶
Not only do we need to make a bash literal string we need to make sure that it is interpreted as literal by printf.
Example
json_text = ‘{“step1.param1”: “- this: \”is text 100% representing\”\n some: \”yaml config\”\n omg: \”single ' quote\”\n eek: 'double \” quote'”}’ json.loads(json_text) import shlex text = json_text literal_bash_json_text = bash_printf_literal_string(json_text) print(json_text) print(literal_bash_json_text) command = f”printf {literal_bash_json_text} | jq” print(command) ub.cmd(command, verbose=3, shell=True)
- geowatch.mlops.pipeline_nodes.glob_templated_path(template)[source]¶
Given an unformated templated path, replace the format parts with “*” and return a glob.
- Parameters:
template (str | PathLike) – a path with a {} template pattern
Example
template = ‘/foo{}/bar’ glob_templated_path(template)
- class geowatch.mlops.pipeline_nodes.Node(name: str)[source]¶
Bases:
NiceRepr
Abstract base class for a Process or IO Node.
- connect(*others, param_mapping=None, src_map=None, dst_map=None)[source]¶
Connect the outputs of
self
to the inputs ofothers
.Conceptually, this creates an edge between the two nodes.
- property key¶
- class geowatch.mlops.pipeline_nodes.IONode(name, parent)[source]¶
Bases:
Node
- property final_value¶
- property key¶
- class geowatch.mlops.pipeline_nodes.OutputNode(name, parent)[source]¶
Bases:
IONode
- property final_value¶
- property template_value¶
- class geowatch.mlops.pipeline_nodes.memoize_configured_method(func)[source]¶
Bases:
object
ubelt memoize_method but uses a special cache name
- geowatch.mlops.pipeline_nodes.memoize_configured_property(fget)[source]¶
ubelt memoize_property but uses a special cache name
- class geowatch.mlops.pipeline_nodes.ProcessNode(*, name=None, executable=None, algo_params=None, perf_params=None, in_paths=None, out_paths=None, group_dname=None, root_dpath=None, config=None, node_dpath=None, group_dpath=None, primary_out_key=None, _overwrite_node_dpath=None, _overwrite_group_dpath=None, _no_outarg=False, _no_inarg=False, **aliases)[source]¶
Bases:
Node
Represents a process in the pipeline.
ProcessNodes are connected via their input / output nodes.
You can create an instance of this directly, or inherit from it and set its class variables.
CommandLine
xdoctest -m geowatch.mlops.pipeline_nodes ProcessNode
Notes
When a ProcessNode is used for an evaluation node, it can / should be extended with the following methods:
- load_result - for evaluation nodes
In your pipeline class you a method
def load_result(self, node_dpath):
which returns a flat dot-dictionary of params and results from the node.
- _default_metrics - returns Tuple of primary and display metric
suffixes for the node that will be interpreted as the metrics. Note: this is likely to change so the user can specify if metrics need to be minimized / maximized.
- _default_metrics2 - experimental new way of specifying metric info.
Should be a list of dictionaries with keys
suffix (str): the name of the metric
- objective (Optional[str]):
minimize or maximize (defaults to maximize)
- primary (Optional[bool]):
if the metric is primary (defaults to False, unless no other metric is primary in which case the first one defaults to True).
- display (Optional[bool]):
show the column in the stdout table. Defaults to False. Note: any column marked as primary is also displayed.
- aggregator (Optional[bool]):
how to aggregate this metric when computing macro averages. Can be “mean”, “gmean”, “sum”, “max”, “min”, or “ignore”. Defaults to “mean”.
- default_vantage_points - should return a List[Dict] with
metric1 and metric2, used for inspecting relationships between metrics.
These methods are currently used in aggregate_loader and aggregate, but they need to be more clearly defined here. Currently they are hacked in with hasattr, so we can’t define them as abstract methods yet, but in the future we will refactor to make it more clear that these methods should be implemented for evaluation nodes (i.e. nodes that produce metric results).
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> from geowatch.mlops.pipeline_nodes import _classvar_init >>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode') >>> dpath.delete().ensuredir() >>> pycode = ub.codeblock( ... ''' ... import ubelt as ub ... src_fpath = ub.Path(ub.argval('--src')) ... dst_fpath = ub.Path(ub.argval('--dst')) ... foo = ub.argval('--foo') ... bar = ub.argval('--bar') ... new_text = foo + src_fpath.read_text() + bar ... dst_fpath.write_text(new_text) ... ''') >>> src_fpath = dpath / 'here.txt' >>> src_fpath.write_text('valid input') >>> dst_fpath = dpath / 'there.txt' >>> self = ProcessNode( >>> name='proc1', >>> config={ >>> 'foo': 'baz', >>> 'bar': 'biz', >>> 'num_workers': 3, >>> 'src': src_fpath, >>> 'dst': dst_fpath >>> }, >>> in_paths={'src'}, >>> out_paths={'dst': 'there.txt'}, >>> primary_out_key='dst', >>> perf_params={'num_workers'}, >>> group_dname='predictions', >>> #node_dname='proc1/{proc1_algo_id}/{proc1_id}', >>> executable=f'python -c "{chr(10)}{pycode}{chr(10)}"', >>> root_dpath=dpath, >>> ) >>> self._finalize_templates() >>> print('self.command = {}'.format(ub.urepr(self.command, nl=1, sv=1))) >>> print(f'self.algo_id={self.algo_id}') >>> print(f'self.root_dpath={self.root_dpath}') >>> print(f'self.template_node_dpath={self.template_node_dpath}') >>> print('self.templates = {}'.format(ub.urepr(self.templates, nl=2))) >>> print('self.final = {}'.format(ub.urepr(self.final, nl=2))) >>> print('self.condensed = {}'.format(ub.urepr(self.condensed, nl=2))) >>> print('self.primary_out_key = {}'.format(ub.urepr(self.primary_out_key, nl=2)))
Example
>>> # How to use a ProcessNode to handle an arbitrary process call >>> # First let's write a program to disk >>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> import stat >>> dpath = ub.Path.appdir('geowatch/tests/pipeline/TestProcessNode2') >>> dpath.delete().ensuredir() >>> pycode = ub.codeblock( ''' #!/usr/bin/env python3 import scriptconfig as scfg import ubelt as ub
- class MyCLI(scfg.DataConfig):
src = None dst = None foo = None bar = None
@classmethod def main(cls, cmdline=1, **kwargs):
config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) print(‘config = ‘ + ub.urepr(config, nl=1))
- if __name__ == ‘__main__’:
MyCLI.main()
… ‘’’) >>> fpath = dpath / ‘mycli.py’ >>> fpath.write_text(pycode) >>> fpath.chmod(fpath.stat().st_mode | stat.S_IXUSR) >>> # Now that we have a script that accepts some cli arguments >>> # Create a process node to represent it. We assume that >>> # everything is passed as key/val style params, which you should >>> # use for new programs, but this doesnt apply to a lot of programs >>> # out there, so we will show how to handle non key/val arguments >>> # later (todo). >>> mynode = ProcessNode(command=str(fpath)) >>> # Get the invocation by runnning >>> command = mynode.final_command() >>> print(command) >>> # Use a dictionary to configure key/value pairs >>> mynode.configure({‘src’: ‘a.txt’, ‘dst’: ‘b.txt’}) >>> command = mynode.final_command() >>> # Note: currently because of backslash formatting >>> # we need to use shell=1 or system=1 with ub.cmd >>> # in the future we will fix this in ubelt (todo). >>> # Similarly this class should be able to provide the arglist >>> # style of invocation. >>> print(command) >>> ub.cmd(command, verbose=3, shell=1)
- configure(config=None, cache=True, enabled=True)[source]¶
Update the node configuration.
This rebuilds the templates and formats them so the “final” variables take on directory names based on the given configuration. This a
- FIXME:
Gotcha: Calling this twice with a new config will reset the configuration. These nodes are stateful, so we should maintain and update state, not reset it. If we need to reset to defaults, there should be a method for that. Can work around by passing self.config as the first argument.
- property condensed¶
This is the dictionary that supplies the templated strings with the values we will finalize them with. We may want to change the name.
- property final_config¶
This is not really “final” in the aggregate sense. It is more of a “finalized” requested config.
- property final_perf_config¶
- property final_algo_config¶
- property final_in_paths¶
- property template_out_paths¶
template out paths are not impacted by out path config overrides, but the final out paths are.
- SeeAlso:
- Type:
Note
- property final_out_paths¶
These are the locations each output will actually be written to.
This is based on
ProcessNode.template_out_paths()
as well as any manual overrides specified inself.config
.
- property final_node_dpath¶
The configured directory where all outputs are relative to.
- property final_root_dpath¶
- property template_group_dpath¶
The template for the directory where the configured node dpath will be placed.
- property template_node_dpath¶
The template for the configured directory where all outputs are relative to.
- property template_root_dpath¶
- ancestor_process_nodes()[source]¶
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> pipe = Pipeline.demo() >>> self = pipe.node_dict['node_C1'] >>> ancestors = self.ancestor_process_nodes() >>> print('ancestors = {}'.format(ub.urepr(ancestors, nl=1)))
- property depends¶
The mapping from ancestor and self node names to their algorithm ids Should probably rename.
- property algo_id: str¶
A unique id to represent the output of a deterministic process.
This does NOT have a dependency on the larger the DAG.
- property process_id: str¶
A unique id to represent the output of a deterministic process in a pipeline. This id combines the hashes of all ancestors in the DAG with its own hashed id.
This DOES have a dependency on the larger DAG.
- property inputs¶
Input nodes representing specific input locations.
The output nodes of other processes can be connected to these. Also input nodes for one process can connect to input nodes of another process representing that they share the same input data.
- Returns:
Dict[str, InputNode]
- property outputs¶
Output nodes representing specific output locations. These can be connected to the input nodes of other processes.
- Returns:
Dict[str, OutputNode]
- property command: str¶
Returns the string shell command that will execute the process.
Basic version of command, can be overwritten
- test_is_computed_command()[source]¶
Generate a bash command that will test if all output paths exist
Example
>>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> 'bar': 'bar.txt', >>> 'baz': 'baz.txt', >>> 'biz': 'biz.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt -a \ -e bar.txt -a \ -e baz.txt -a \ -e biz.txt >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> 'bar': 'bar.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt -a \ -e bar.txt >>> self = ProcessNode(out_paths={ >>> 'foo': 'foo.txt', >>> }, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) test -e foo.txt >>> self = ProcessNode(out_paths={}, node_dpath='.') >>> test_cmd = self.test_is_computed_command() >>> print(test_cmd) None
- property does_exist: bool¶
Check if all of the output paths that would be written by this node already exists.
- property outputs_exist: bool¶
Alias for does_exist
Check if all of the output paths that would be written by this node already exists.
- geowatch.mlops.pipeline_nodes.demodata_pipeline()[source]¶
A simple test pipeline.
Example
>>> # Self test >>> from geowatch.mlops.pipeline_nodes import * # NOQA >>> demodata_pipeline()