Running The System In Smartflow¶
Note
This file is currently a working document and contains a lot of notes
This goes over how to run the GEOWATCH system in smartflow.
The outline of this document is:
Prerequisites¶
Be sure you have
Section 0: NOTICE OF AUTOMATION¶
This entire process has been scripted and lives in the watch-smartflow-dags repo repo.
The prepare_system.sh script is the main driver. TODO: We should document how to use that script here instead of these manual instructions.
Other script of interest are:
run_smartflow_dags.py - This is used by prepare_system to trigger dags and can be used as a standalone script to trigger dags that have already been uploaded.
pull_results.py - This pulls results down from smartflow DAG runs and can optionally sumarize metrics.
Section 1: The GEOWATCH Docker Image¶
In this section we will go over how to build the docker image used in a submission:
There are two images that need to be build, first the pyenv dockerfile. And then the watch dockerfile that builds on top of it watch dockerfile. The heredocs in these files provide futher instructions. We will also need to add models to the image
Building the Pyenv Base Image¶
Here we will go over the basic use case for a specific version of Python /
dependencies. We will use Python 3.11 and strict dependencies. We will assume
that your watch repo is in ~/code/watch
, but if it is not then change the
environment variable:
export WATCH_REPO_DPATH=$HOME/code/watch
export STAGING_DPATH=$HOME/temp/container-staging
# The name/tag of the image we will create
export PYENV_IMAGE=pyenv:3.11.2
# The Python version we want
export PYTHON_VERSION=3.11.2
# Create a directory on the host for context
mkdir -p $STAGING_DPATH
# For pyenv we will use an empty directory
mkdir -p $STAGING_DPATH/empty
DOCKER_BUILDKIT=1 docker build --progress=plain \
-t $PYENV_IMAGE \
--build-arg PYTHON_VERSION=$PYTHON_VERSION \
-f $WATCH_REPO_DPATH/dockerfiles/pyenv.Dockerfile \
$STAGING_DPATH/empty
# Optional: push this image to kitware and smartgitlab registries
# optional: Push to smartgitlab
docker tag $PYENV_IMAGE registry.smartgitlab.com/kitware/$PYENV_IMAGE
docker push registry.smartgitlab.com/kitware/$PYENV_IMAGE
# optional: Push to gitlab.kitware.com
docker tag $PYENV_IMAGE gitlab.kitware.com:4567/smart/watch/$PYENV_IMAGE
docker push gitlab.kitware.com:4567/smart/watch/$PYENV_IMAGE
Now that the pyenv image pyenv:3.11.2
has been created we can quickly test it:
export PYENV_IMAGE=pyenv:3.11.2
# Hello world should always run
docker run --runtime=nvidia -it $PYENV_IMAGE echo "hello world"
# Ensure the right python is exposed by default
docker run --runtime=nvidia -it $PYENV_IMAGE python --version
# if you have a GPU you can run
docker run --runtime=nvidia -it $PYENV_IMAGE nvidia-smi
Building the GEOWATCH Image¶
Now we build the watch image on top of the pyenv image. To ensure we do this cleanly we will make a fresh clone of your local repo which will ensure you dont accidently bake in any secrets or other large files.
export WATCH_REPO_DPATH=$HOME/code/watch
export STAGING_DPATH=$HOME/temp/container-staging
export PYENV_IMAGE=pyenv:3.11.2
export WATCH_VERSION=$(python -c "import watch; print(watch.__version__)")
export BUILD_STRICT=1
# A descriptive name for our watch image
PYENV_TAG_SUFFIX=$(python -c "print('$PYENV_IMAGE'.replace(':', ''))")
if [[ "$BUILD_STRICT" == "1" ]]; then
export WATCH_IMAGE=watch:$WATCH_VERSION-strict-$PYENV_TAG_SUFFIX
else
export WATCH_IMAGE=watch:$WATCH_VERSION-loose-$PYENV_TAG_SUFFIX
fi
echo "
===========
WATCH_REPO_DPATH = $WATCH_REPO_DPATH
STAGING_DPATH = $STAGING_DPATH
WATCH_VERSION = $WATCH_VERSION
PYENV_IMAGE = $PYENV_IMAGE
BUILD_STRICT = $BUILD_STRICT
-----------
WATCH_IMAGE=$WATCH_IMAGE
===========
"
# Create a directory on the host for context
mkdir -p $STAGING_DPATH
# For watch we make a fresh clone of our local repo
[ -d $STAGING_DPATH/watch ] && rm -rf $STAGING_DPATH/watch
git clone --origin=host-$HOSTNAME $WATCH_REPO_DPATH/.git $STAGING_DPATH/watch
DOCKER_BUILDKIT=1 docker build --progress=plain \
-t "$WATCH_IMAGE" \
--build-arg "BUILD_STRICT=$BUILD_STRICT" \
--build-arg "BASE_IMAGE=$PYENV_IMAGE" \
-f $STAGING_DPATH/watch/dockerfiles/watch.Dockerfile .
# Optional: push this image to kitware and smartgitlab registries
# optional: Push to smartgitlab
docker tag $WATCH_IMAGE registry.smartgitlab.com/kitware/$WATCH_IMAGE
docker push registry.smartgitlab.com/kitware/$WATCH_IMAGE
# optional: Push to gitlab.kitware.com
docker tag $WATCH_IMAGE gitlab.kitware.com:4567/smart/watch/$WATCH_IMAGE
docker push gitlab.kitware.com:4567/smart/watch/$WATCH_IMAGE
It is a good idea to run some tests to ensure the image built properly
# Hello world should always run
docker run --runtime=nvidia -it $WATCH_IMAGE echo "hello world"
# Ensure the right python is exposed by default
docker run --runtime=nvidia -it $WATCH_IMAGE python --version
# Ensure the watch module is exposed by default
docker run --runtime=nvidia -it $WATCH_IMAGE geowatch --version
# if you have a GPU you can run
docker run --runtime=nvidia -it $WATCH_IMAGE nvidia-smi
# run the full test suite
docker run --runtime=nvidia -it $WATCH_IMAGE ./run_tests.py
You may wish to upload this base image to the smartgitlab registry, but we will need to bake in models, so this step is optional, but useful if you want to build the base image on one machine and then bake in models on a different machine.
# Push the container to smartgitlab
docker tag $WATCH_IMAGE registry.smartgitlab.com/kitware/$WATCH_IMAGE
docker push registry.smartgitlab.com/kitware/$WATCH_IMAGE
How to Bake a Model into a Pyenv Dockerfile¶
Assuming that you have already build a pyenv docker image we will add a model to it.
# Set this to the name of the pyenv watch image that you built
IMAGE_NAME=watch:0.4.5-strict-pyenv3.11.2
NEW_IMAGE_NAME=${IMAGE_NAME}-models-2023-03-28
echo $NEW_IMAGE_NAME
# These are more models than we really need, but it will let use resuse this image for more experiments
MODELS_OF_INTEREST="
models/fusion/Drop6-MeanYear10GSD/packages/Drop6_TCombo1Year_BAS_10GSD_split6_V42_cont2/Drop6_TCombo1Year_BAS_10GSD_split6_V42_cont2_epoch3_step941.pt
models/fusion/Drop4-BAS/packages/Drop4_BAS_2022_12_15GSD_BGRN_V10/Drop4_BAS_2022_12_15GSD_BGRN_V10_v0_epoch0_step0.pt
models/fusion/Drop4-BAS/packages/Drop4_BAS_15GSD_BGRNSH_invar_V8/Drop4_BAS_15GSD_BGRNSH_invar_V8_epoch=16-step=8704.pt
models/fusion/Drop4-BAS/packages/Drop4_TuneV323_BAS_30GSD_BGRNSH_V2/package_epoch0_step41.pt.pt
models/fusion/Drop4-SC/packages/Drop4_tune_V30_8GSD_V3/Drop4_tune_V30_8GSD_V3_epoch=2-step=17334.pt.pt \
models/uky/uky_invariants_2022_03_21/pretext_model/pretext_pca_104.pt \
models/uky/uky_invariants_2022_12_17/TA1_pretext_model/pretext_package.pt \
models/landcover/sentinel2.pt
"
DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
# Ensure the models of interest are pulled locally on your machine
(cd $DVC_EXPT_DPATH && dvc pull -r aws $MODELS_OF_INTEREST)
# We are also going to bake the metrics and data DVC into the repo too for
# completeness
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
METRICS_REPO_DPATH=$(python -c "import iarpa_smart_metrics, pathlib; print(pathlib.Path(iarpa_smart_metrics.__file__).parent.parent)")
# Run the base image as a container so we can put stuff into it
# We will use DVC to facilitate the transfer to keep things consistent
# We mount our local experiment directory, and pull relevant files
docker run \
--volume $DVC_EXPT_DPATH:/host-smart_expt_dvc:ro \
--volume $DVC_DATA_DPATH:/host-smart_data_dvc:ro \
--volume $METRICS_REPO_DPATH:/host-metrics_repo:ro \
-td --name temp_container $IMAGE_NAME
docker exec -t temp_container pip install dvc
docker exec -t temp_container mkdir -p /root/data
docker exec -t temp_container git clone /host-smart_expt_dvc/.git /root/data/smart_expt_dvc
docker exec -t temp_container git clone /host-smart_data_dvc/.git /root/data/smart_data_dvc
docker exec -t temp_container git clone /host-metrics_repo/.git /root/code/metrics-and-test-framework
docker exec -w /root/data/smart_expt_dvc -t temp_container \
dvc remote add host /host-smart_expt_dvc/.dvc/cache
# Workaround DVC Issue by removing aws remote
# References: https://github.com/iterative/dvc/issues/9264
docker exec -w /root/data/smart_expt_dvc -t temp_container \
dvc remote remove aws
# Pull in relevant models you want to bake into the container
# These will be specified relative to the experiment DVC repo
docker exec -w /root/data/smart_expt_dvc -t temp_container \
dvc pull --remote host $MODELS_OF_INTEREST
# Save the modified container as a new image
docker commit temp_container $NEW_IMAGE_NAME
# Cleanup the temp container
docker stop temp_container
docker rm temp_container
# Push the container to smartgitlab
docker tag $NEW_IMAGE_NAME registry.smartgitlab.com/kitware/$NEW_IMAGE_NAME
docker push registry.smartgitlab.com/kitware/$NEW_IMAGE_NAME
echo $NEW_IMAGE_NAME
# optional: Push to gitlab.kitware.com
docker tag $WATCH_IMAGE gitlab.kitware.com:4567/smart/watch/$WATCH_IMAGE
docker push gitlab.kitware.com:4567/smart/watch/$WATCH_IMAGE
Update An Existing Image¶
Say you need to make a small change to the code, but don’t want to rebuild the entire model. We can handle that case by mounting the latest repos onto the container, setting the remotes of the repo to point to those, pulling the latest code, and commiting the change as a new image.
export WATCH_REPO_DPATH=$HOME/code/watch
export DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
IMAGE_NAME=watch:0.4.5-strict-pyenv3.11.2-models-2023-03-28
NEW_IMAGE_NAME=watch:0.4.5-strict-pyenv3.11.2-models-2023-03-28-v04
# Mount the image with
docker run \
--volume $DVC_EXPT_DPATH:/host-smart_expt_dvc:ro \
--volume $WATCH_REPO_DPATH:/host-watch_repo:ro \
-td --name temp_container $IMAGE_NAME
docker exec -w /root/code/watch -t temp_container \
git remote add host /host-watch_repo/.git
docker exec -w /root/code/watch -t temp_container \
git pull host dev/0.4.5
# Save the modified container as a new image
docker commit temp_container $NEW_IMAGE_NAME
docker stop temp_container
docker rm temp_container
# Push the container to smartgitlab
echo $NEW_IMAGE_NAME
docker tag $NEW_IMAGE_NAME registry.smartgitlab.com/kitware/$NEW_IMAGE_NAME
docker push registry.smartgitlab.com/kitware/$NEW_IMAGE_NAME
How to Submit a DAG¶
We maintain the airflow DAGS in the watch-smartflow-dags repo. Ensure that you have the DAG repo:
# This is the repo containing the smartflow dags
git clone git@gitlab.kitware.com:smart/watch-smartflow-dags.git $HOME/code/watch-smartflow-dags
Choose a DAG file and modify it as necessary
Note
TODO: Describe in more detail
Once you have a DAG file ready upload it to AWS via:
# The path to our DAG repo
LOCAL_DAG_DPATH=$HOME/code/watch-smartflow-dags
# The name of the DAG file we edited
DAG_FNAME=KIT_TA2_PREEVAL10_PYENV_V13.py
# Upload the DAG file to AWS
aws s3 --profile iarpa cp $LOCAL_DAG_DPATH/$DAG_FNAME \
s3://smartflow-023300502152-us-west-2/smartflow/env/kitware-prod-v4/dags/$DAG_FNAME
If you have not done so ensure that we are forwarding the smartflow web service to your machine:
kubectl -n airflow port-forward service/airflow-webserver 2746:8080
Now, navigate to your airflow GUI in the browser at localhost:2746/home
,
which can be done via the command:
# Not working?
python -c "import webbrowser; webbrowser.open('https://localhost:2746/home', new=1)"
Building / Modifying a DAG¶
Our smartflow DAGs are built as sequences of smartflow CLI commands that wrap our local CLI commands. These smartflow CLI commands live in geowatch/cli/smartflow.
Each of these uses ffsspec to grab manifests of available assets from an s3 bucket, which then points to the data the task could use. It is the scripts job to pull the data, perform the computation, print debugging info, and push results and debug data back to a new output bucket.
See [ComputeInstanceTypes] for details on available instance types.
- References:
Running DAGS¶
In the GUI you can simply search for your dag and hit the run buttom.
To programatically interact with airflow on the command line, you need to exec into the airflow scheduler pod.
JQ_QUERY='.items[] | select(.metadata.name | startswith("airflow-scheduler-")) | .metadata.name'
AIRFLOW_SCHEDULER_POD_NAME=$(kubectl -n airflow get pods -o json | jq -r "$JQ_QUERY")
echo "AIRFLOW_SCHEDULER_POD_NAME=$AIRFLOW_SCHEDULER_POD_NAME"
# Get a shell into the scheduler to run airflow commands
kubectl -n airflow exec -it pods/$AIRFLOW_SCHEDULER_POD_NAME -- /bin/bash
# Inside the airflow shell
echo '
airflow dags list
airflow dags list -o json > dags.json
airflow dags list-jobs
# To run a dag you need to trigger and unpause it.
airflow dags trigger kit_ta2_preeval10_pyenv_t29_batch_AE_R001
airflow dags unpause kit_ta2_preeval10_pyenv_t29_batch_AE_R001
airflow dags trigger kit_ta2_preeval10_pyenv_t29_batch_KW_R001
airflow dags unpause kit_ta2_preeval10_pyenv_t29_batch_KW_R001
REGION_IDS=("KR_R002" "KR_R001" "NZ_R001")
for REGION_ID in "${REGION_IDS[@]}"; do
echo "trigger $REGION_ID"
airflow dags trigger kit_ta2_preeval10_pyenv_t29_batch_$REGION_ID
airflow dags unpause kit_ta2_preeval10_pyenv_t29_batch_$REGION_ID
done
REGION_IDS=("KR_R002" "KR_R001" "NZ_R001" "KW_R001" "AE_R001")
for REGION_ID in "${REGION_IDS[@]}"; do
echo "trigger $REGION_ID"
airflow dags trigger kit_ta2_preeval10_pyenv_t31_batch_$REGION_ID
airflow dags unpause kit_ta2_preeval10_pyenv_t31_batch_$REGION_ID
done
# Status queries
airflow dags list-jobs -d kit_ta2_preeval10_pyenv_t33_post1_batch_KR_R001 -o yaml
airflow dags list-runs -d kit_ta2_preeval10_pyenv_t33_post1_batch_KR_R001 -o yaml
airflow dags list-runs -d kit_eval_11_rerun_batch_AE_R001 -o yaml
'
### Alternative - execute commands from local shell
# Oddly this tends to send outputs with color that we need to strip out.
JQ_QUERY='.items[] | select(.metadata.name | startswith("airflow-scheduler-")) | .metadata.name'
AIRFLOW_SCHEDULER_POD_NAME=$(kubectl -n airflow get pods -o json | jq -r "$JQ_QUERY")
export AIRFLOW_SCHEDULER_POD_NAME
kubectl -n airflow exec -it pods/$AIRFLOW_SCHEDULER_POD_NAME -- airflow dags list -o json > dags.json
cat dags.json | sed -r "s/\x1B\[([0-9]{1,3}(;[0-9]{1,2};?)?)?[mGK]//g" | cat > dags_nocolor.json
airflow dag_state kit_eval_11_rerun_batch_AE_C002
# Note:
# This idea will be further developed in
~/code/watch-smartflow-dags/monitor_dags.py
python -c "if True:
import json
import pathlib
import cmd_queue
# Build pattern to identify the jobs you want to run
import xdev
pattern = xdev.MultiPattern.coerce([
'kit_eval_11_rerun_batch*'
#f'kit_ta2_preeval10_pyenv_t{t}*'
#for t in [31, 35]
])
# FIXME: the json can be output with an error, need to strip it.
text = pathlib.Path('dags_nocolor.json').read_text()
data = json.loads(text[86:])
valid_rows = []
for item in data:
if pattern.match(item['dag_id']):
valid_rows.append(item)
if 0:
# Query the status of the selected dags
import os
AIRFLOW_SCHEDULER_POD_NAME = os.environ['AIRFLOW_SCHEDULER_POD_NAME']
prefix = f'kubectl -n airflow exec -it pods/{AIRFLOW_SCHEDULER_POD_NAME} -- '
import base64
# easy-to-represent char encoding of the strip ansi pattern
pat = base64.b32decode(b'DNOFWKC3GAWTSXL3GEWDG7JIHNNTALJZLV5TCLBSPU5T6KJ7FE7VW3KHJNOQ====').decode('utf8')
import re
pat = re.compile(pat)
from watch.utils.util_yaml import Yaml
row_to_states = {}
for row in valid_rows:
dag_id = row['dag_id']
info = ub.cmd(prefix + f'airflow dags list-runs -d {dag_id} -o yaml', shell=True)
text = pat.sub('', info['out'])
states = Yaml.loads(text)
print(ub.urepr(states))
row_to_states[dag_id] = states
orig_row = {r['dag_id']: r for r in valid_rows}
dag_info_rows = []
for dag_id, states in row_to_states.items():
row = orig_row[dag_id]
if len(states) == 0:
row['status'] = None
else:
mrs = states[-1]
row['status'] = mrs['state']
row['execution_date'] = mrs['execution_date']
row['run_id'] = mrs['run_id']
row['start_date'] = mrs['start_date']
row['end_date'] = mrs['end_date']
dag_info_rows.append(row)
import pandas as pd
df = pd.DataFrame(dag_info_rows)
import rich
rich.print(df)
num_need_run = pd.isna(df['status']).sum()
num_running = (df['status'] == 'running').sum()
print(f'num_need_run={num_need_run}')
print(f'num_running={num_running}')
import pandas as pd
df = pd.DataFrame(valid_rows)
import rich
rich.print(df)
# Build cmd-queue with the commands to execute
queue = cmd_queue.Queue.create(backend='serial')
prefix = 'kubectl -n airflow exec -it pods/$AIRFLOW_SCHEDULER_POD_NAME -- '
for item in data:
if pattern.match(item['dag_id']):
print(item['dag_id'])
queue.submit(prefix + 'airflow dags trigger ' + item['dag_id'])
queue.submit(prefix + 'airflow dags unpause ' + item['dag_id'])
# It is a good idea to comment out the run to check that you
# are doing what you want to do before you actually execute.
queue.print_commands()
queue.run()
"
Debuggging DAGS¶
Here is a useful command to get a list of running pods that contain jobs.
kubectl -n airflow get pods
Given a pod id there are useful commands
# Pod logs
kubectl -n airflow logs pods/{pod_addr}
# Exec into a pod
kubectl -n airflow exec -it pods/{pod_addr} -- bash
Here is a snippet to automatically list pods and allow you to select one to exec info:
kubectl -n airflow get pods
# Find your POD_ADDR
# POD_ADDR=site-cropped-kwcoco-6254ac27fab04f0b8eb302ac19b09745
# kubectl -n airflow exec -it pods/$POD_ADDR -- bash
# Script to list and exec into a running pod
python -c "if True:
import json
import pandas as pd
import rich
import ubelt as ub
info = ub.cmd('kubectl -n airflow get pods -o json')
data = json.loads(info['out'])
from dateutil.parser import isoparse
from datetime import datetime as datetime_cls
utc_now = datetime_cls.utcnow()
rows = []
for item in data['items']:
restart_count = sum([cs['restartCount'] for cs in item['status']['containerStatuses']])
start_time = item['status']['startTime']
start_dt = isoparse(start_time)
utc_now = utc_now.replace(tzinfo=start_dt.tzinfo)
age_delta = utc_now - start_dt
row = {
'name': item['metadata']['name'],
'status': item['status']['phase'],
'startTime': start_time,
'restarts': restart_count,
'age': str(age_delta),
}
rows.append(row)
df = pd.DataFrame(rows)
rich.print(df.to_string())
import rich.prompt
ans = rich.prompt.Prompt.ask('which one?', choices=list(map(str, df.index.to_list())))
idx = int(ans)
pod_addr = df.iloc[idx]['name']
ub.cmd(f'kubectl -n airflow exec -it pods/{pod_addr} -- bash', system=True)
"
Old Notes¶
How to Bake a Model into a Dockerfile (OLD)¶
Must be run in repo root
Ensure whatever variant of the repo you want to be run is checked out.
Need a base directory with a model in
./models
.
DOCKER_BUILDKIT=1 \
docker build --build-arg BUILD_STRICT=1 -f dockerfiles/ta2_features.Dockerfile . \
--tag registry.smartgitlab.com/kitware/watch/ta2:post-jan31-invariant-rescaled-debug4
In the DAG need to change path to point to the new baked in model.
Need to push container to smartgitlab
Running Dags After Containers are Using (OLD)¶
Now we edit a DAG file for airflow
Choose a DAG file in ~/code/watch-smartflow-dags/ then edit it to give it a unique name
.e.g. ~/code/watch-smartflow-dags/KIT_TA2_20221121_BATCH.py
change name of file and then change
EVALUATION
to be a unique string to name it what you want.change the image names / tags e.g.
image="registry.smartgitlab.com/kitware/watch/ta2:Ph2Nov21EvalBatch"
, these are all “pod tasks” create_pod_taskpurpose
is something about the node that it runs on. For a subset of valid options see: https://smartgitlab.com/blacksky/smartflow/-/blob/118140a81362c5721b5e9bb65ab967fb8bd28163/CHANGELOG.mdmake cpu limit a bit less than what is availble on the pod.
Copy the DAG to smartflow S3:
aws s3 --profile iarpa cp Kit_DatasetGeneration.py s3://smartflow-023300502152-us-west-2/smartflow/env/kitware-prod-v2/dags/Kit_DatasetGeneration.py
Need to run service to access airflow gui:
kubectl -n airflow port-forward service/airflow-webserver 2746:8080
navigate to localhost:2746/home
Now dags show up in the GUI.