This is the second part in a series of articles demonstrating best practices for engineering ML pipelines and deploying them to production. In the first part we focused on project setup - everything from codebase structure to configuring a CI/CD pipeline and making an initial deployment of a skeleton pipeline.
In this part we are going to focus on developing a fully-operational pipeline and will cover:
All of the code referred to in this series of posts is available on GitHub, with a dedicated branch for each part, so you can explore the code in its various stages of development. Have a quick look before reading on.
To recap, the data engineering team will deliver the latest tranche of training data to an AWS S3 bucket, in CSV format. They will take responsibility for verifying that these files have the correct schema and contain no unexpected errors. Each filename will contain the timestamp of its creation, in ISO format, so that the datasets in the bucket will look as follows:
The train-model stage of the pipeline will only need to download the latest file for training a new model. We could stop here and rely solely on the filenames as a lightweight versioning strategy, but it is safer to enable versioning for the S3 bucket and to track of the hash of the dataset used for training, which is computed automatically for every object stored on S3 (the MD5 hash of an object is stored as its Entity Tag or ETag). This allows us to defend against accidental deletes and/or overwrites and enables us to locate the precise dataset associated with a trained model.
Because this concept of a dataset is bigger than just an arbitrarily named file on S3, we will need to develop a custom `Dataset` class for representing files on S3 and retrieving their hashes, together with functions/methods for getting and putting `Dataset` objects to S3. All of this can be developed on top of the boto3 AWS client library for Python.
Trained models will be serialised to file using Python’s pickle module (this works well for SciKit-Learn models), and uploaded to the same AWS bucket, using the same timestamped file-naming convention:
When triggered, the serve-model stage of the pipeline will only need to download the most recently persisted model, to ensure that it will generate predictions using the model from the output of the train-model stage. As with the datasets, we could stop here and rely solely on the filenames as a lightweight versioning strategy, but auditing and debugging predictions will be made much easier if we can access model metadata, such as the details of the exact dataset used for training.
The concept of a model becomes bigger than just the trained model in isolation, so we will also need to develop a custom `Model` class. This needs to ‘wrap’ the trained model object, so that it can be associated with all of the metadata that we need to operate our basic model versioning system. As with the custom `Dataset` class, we will need to develop functions/methods for getting and putting the `Model` object to S3.
There is a significant development effort required for implementing the functionality described above and it is likely that this will be repeated in many projects. We are going to cover how to handle reusable code in the section below, but you can see our implementations for the `Dataset` and `Model` classes using the links below, which we have also reproduced at the end of this article.
The canonical way for distributing reusable Python modules, is by implementing them within a Python package that can be installed into any project that benefits from the functionality. This is what we have done for the dataset and model versioning functionality described in the previous section, and for configuring the logger used in both stages (so we can can enforce a common log format across projects). You can explore the codebase for this package, named `bodywork-pipeline-utils`, on GitHub. The functions and classes within it are shown below,
A discussion of best practices for developing a Python package is beyond the scope of these articles, but you can use `bodywork-pipeline-utils` as a template and/or refer to the Python Packaging Authority. The Scikit-Learn team has also published their insights into API design for machine learning software, which we recommend reading.
The easiest way to distribute Python packages within an organisation is directly from your Version Control System (VCS) - e.g. a remote Git repository hosted on GitHub. You do not need to host an internal PyPI server, unless you have a specific reason to do so. To install a Python package from a remote Git repo you can use,
Where v0.1.5 is the release tag, but could also be a Git commit hash. This will need to be specified in `requrements_pipe.txt` as,
Pip supports many VCSs and protocols - e.g. private Git repositories can be accessed via SSH by using git+ssh and ensuring that the machine making the request has the appropriate SSH keys available. Refer to the documentation for pip for more information.
Pipelines can experience many types of error - here are some examples:
When developing pipeline stages, it is critical that error events such as these are identified and logged to aid with debugging, and that the pipeline is not allowed to proceed. Our chosen pattern for handling errors is demonstrated in this snippet from `train_model.py`,
The pipeline is defined in the `main` function, which is executed within a `try... except` block. If it executes without error, then we signal this to Kubernetes with an exit-code of `0` . If any error is encountered, then the exception is caught, we log the details and signal this to Kubernetes with an exit-code of 1 (so it can attempt a retry, if this has been configured).
Exceptions within `main` are likely to be raised from within 3rd party packages that we’ve installed - e.g. if `bodywork-pipeline-utils` can’t access AWS or if Scikit-Learn fails to train a model. We recommend reading the documentation (or source code) for external functions and classes to understand what exceptions they raise and if the pipeline would benefit from custom handling and logging.
Sometimes, however, we need to look for the error ourselves and raise the exception manually, as shown below when the key test metric falls below a pre-configured threshold level,
This works as follows:
Using logs to communicate pipeline state will take on additional importance later on in Part Three of this series, when we add monitoring, observability and alerting to our pipeline.
Pipelines can benefit from parametrisation to make them re-usable across deployment environments (and potentially tenants, if this makes sense for your project). For example, passing the S3 bucket as an external argument to each stage, enables the pipeline to operate both in a staging environment, as well as in production. Similarly, external arguments can be used to set thresholds for defining when warnings and alerts are triggered, based on model training metrics, which can make testing the pipeline much easier.
Each stage of our pipeline is defined by an executable Python module. The easiest way to pass arguments to a module is via the command line. For example,
Passes an array of strings, `["time-to-dispatch", "0.9", "0.8"]` to `train_model.py`, that can be retrieved from `sys.argv` as demonstrated in the excerpt from `train_model.py` below.
Note how we cast the numeric arguments to `float` types before performing basic input validation to ensure that users can’t accidentally specify invalided arguments that could lead to unintended consequences.
When deployed by Bodywork, `train_model.py` will be executed in a dedicated container on Kubernetes. The required arguments can be passed via the `args` parameter in the `bodywork.yaml` file that describes the deployment, as shown below.
The core task here is to engineer the ML solution in the time_to_dispatch_model.ipynb notebook, provided to us by the data scientist who worked on this task, into the pipeline stage defined in pipeline/train_model.py (reproduced in the Appendix below). The central workflow is defined in the `main` function,
This splits the job into smaller sub-tasks, such as preparing the data, that can be delegated to specialised functions that are easier to write (unit) tests for. All interaction with cloud object storage (AWS S3), for retrieving datasets and persisting trained models, is handled by functions imported from the bodywork-pipeline-utils package, leaving three key functions that we will discuss in turn:
The `persist_model` function creates the `Model` object and calls its `put_model_to_S3` method. It will be tested implicitly in the functional tests for `main`, which we will look at later on.
This purpose of this function is to start with the dataset as a `DataFrame`, split the features from the labels and then partition each of these into ‘test’ and ‘train ‘subsets. We return the results as a `NamedTuple` called `FeaturesAndLabels`, which facilitates easier access within functions that consume these data structures.
This is tested in tests/test_train_model.py as follows,
To help with testing, we have saved a snapshot of CSV data to `tests/resources/dataset.csv` within the project repository, and made it available as a `DataFrame` to all tests in this model, via a Pytest fixture called `dataset`. There is only one unit test for this function and it tests that `prepare_data` splits labels from features, for both ‘test’ and ‘train’ sets, and that it doesn’t lose any rows of data in the process. If we refactor `prepare_data` in the future, then this test will help prevent us from accidentally leaking the label into the features.
Given a `FeaturesAndLabels` object together with a grid of hyper-parameters, this function will yield a trained model, together with the model’s performance metrics for the ‘test’ set . The hyper-parameter grid is an input to this function, so that when testing we can use a single point, but can specify many more points for the actual job, when training time is less of a constraint. The metrics are contained within a `NamedTuple` called `TaskMetrics`, to make passing them between functions easier and less prone to error.
We have further delegated the task of pre-processing the features for the model (in this case just mapping categories to integers), to a dedicated function called `preprocess`. The `train_model` function is tested in tests/test_train_model.py as follows,
Which tests that `train_model` returns a fitted model and acceptable performance metrics, given a reasonably sized tranche of data.
Note, that we haven’t relied on `prepare_data` to create the `FeatureAndLabels` object - we have created this manually in another fixture that relies on the `dataset` fixture discussed earlier. This is a deliberate choice made with the aim of decoupling the outcome of this test from the behaviour of `prepare_data`. Tests that are dependent on multiple functions can be ‘brittle’ and lead to cascades of failing tests when only a single function or method is raising an error. We cannot stress enough how important it is to structure your code in such a way that it can be easily tested.
For completeness, we also provide a simple test for `preprocess`,
The goal of the pipeline is to automate the process of training a new model and deploying it - i.e. to take the data scientist out-of-the-loop. Consequently, we need to exercise caution before deploying the latest model. Although the final go/no-go decision on deploying the model will be based on performance metrics, we should also sense-check the model based on basic behaviours we expect it to have. The `validate_trained_model_logic` function performs three logical tests of the model and will raise an exception if it finds an issue (thereby terminating the pipeline before deployment). The three checks are:
Note, that we perform all three checks before raising the exception, so that the error message and the logs that will be generated from it, can be maximally informative when it comes to debugging.
The associated test can also be found in tests/test_train_model.py. This is the most complex test thus far, because we have to use Scikit-Learn’s `DummyRegressor` to create models that will fail each one of the tests individually, as can be seen below.
We’ve tested the individual sub-tasks within `main` , but how do we know that we’ve assembled them correctly, so that `persist_model` will upload the expected `Model` object to cloud storage? We now need to turn our attention to testing `main` from end-to-end - i.e. functional tests for the train-model stage.
The `main` function will try to access AWS S3 to get a dataset and then save a pickled `Model` object to S3. We could setup a S3 bucket for testing this integration, but this constitutes an integration test and is not our current aim. We will disable the calls to AWS by mocking the `bodywork_pipeline_utils.aws` module using the `patch` function from the Python standard library’s unittest.mock module.
Decorating our test with `@patch("pipeline.train_model.aws")`, causes `bodywork_pipeline_utils.aws` (which we import into `train_model.py`) to be replaced by a `MagicMock` object that we've called `mock_aws`. This allows us to perform a number of useful tasks:
You can see this in action below.
This test also makes use of Pytest’s caplog fixture, enabling us to test that `main` yields the expected log records when everything goes according to plan (i.e. the ‘happy path’). This gives us confidence that model artefacts will be persisted as expected, when run in production.
What about the ‘unhappy paths’ - when performance metrics fall below warning and error thresholds? We need to test that `main` will behave as we expect it too, and so we will have to write tests for these scenarios, as well.
These tests work by setting the thresholds artificially high (or low) and checking that exceptions are raised or that warning messages are logged. Note, that this testing strategy only works because `main` accepts the thresholds as arguments, which was one of the key motivations for designing it in this way.
The train-model stage works by executing `train_model.py`, which requires three arguments to be passed to it (as discussed earlier on). These inputs are validated and this validation needs to be tested for completeness. This is a long and boring test, so we will not reproduce the whole thing, but instead discuss the testing strategy (which is a bit more interesting).
The approach to testing input validation, is to run `test_model.py` as Bodywork would run it within a container on Kubernetes, by calling `python pipeline/train_model.py` from the command line. We can replicate this using `subprocess.run` from the Python standard library and capturing the output. We can then pass invalid arguments and check the output for the expected error messages. You can see this pattern in-action below, for the case when no arguments are passed.
In Part One of this series we developed a skeleton web service that returned a hard-coded value whenever the API was called. Our task in this part is to extend this to downloading the latest model persisted to cloud object storage (AWS S3), and then use the model for generating predictions. Unlike the train-model stage, the effort required for this task is relatively small and so we will reproduce `serve_model.py` in full and then discuss it in more detail afterwards.
The key changes from the version in Part One are as follows:
If we start the server locally,
Then we can send a test request,
Which should return a response along the lines of,
We only need to add one more (small) test to tests/test_serve_model.py, but we will have to modify the existing tests to take into account that we are now using a trained model to generate predictions, as opposed to returning fixed values. This introduces a complication, because we need to inject a working model into the module.
To facilitate testing, we have persisted a valid `Model` object to `tests/resources/model.pkl`, which will be loaded in a function called `wrapped_model` and injected into the module at test-time as a new object, using `unittest.mock.patch`. We are unable to use `patch` as we did in `train_model.py`, because the model is only loaded when `serve_model.py` is executed, whereas our tests rely only the FastAPI test client.
The modified test for a valid request is shown
This works by checking the output from the API against the output from the model loaded from the test resources, to make sure that they are identical. Next, we modify the test that covers the API data validation, to reflect the extra constraints we have placed on requests.
Finally, we add one more test to cover the input validation for the `serve_model.py` module, using the same strategy as we did for the equivalent test for `train_model.py`.
The last task we need to complete before we can commit all changes, push to GitHub and trigger the CI/CD pipeline, is to update the deployment configuration in `bodywork.yaml`. This requires three changes:
This will instruct Bodywork to look for `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` and `AWS_DEFAULT_REGION` in a secret record called `aws-credentials`, so that it can inject these secrets into the containers running the stages of our pipeline (as environment variables that will be detected silently). So, these will have to be created, which can be done as follows,
Now you’re ready to push this branch to your remote Git repo! If your tests pass and your colleagues approve the merge, the CD part of the CI/CD pipeline we setup in Part One will ensure the new pipeline is deployed to Kubernetes by Bodywork and executed immediately. Bodywork will perform a rolling-deployment that will ensure zero down-time and automatically roll-back failed deployments to the previous version. When Bodywork has finished, test the new web API,
Where you should observe the same response you received when testing locally,
At this point, the pipeline will have deployed a model using the most recent dataset made available for this task. We know, however, that new data will arrive every Friday evening and so we’d like to schedule the pipeline to run just after the data is expected. We can achieve this using Bodywork cronjobs, as follows,
In this second part we have gone from a skeleton “Hello, Production!” deployment to a fully-functional train-and-deploy pipeline, that automates re-training and re-deployment in a production environment, on a periodic basis. We have factored-out common code so that it can be re-used across projects and discussed various strategies for developing automated tests for both stages of the pipeline, ensuring that subsequent modifications can be reliably integrated and deployed, with relative ease.
In the final part of this series we will cover monitoring and observability and aim to to answer the question, “How will I know when something has gone wrong?”.
Reproduced from the ml-pipeline-engineering repository.
Learn about the latest features and releases.