Unit Testing Data Validation Microservices for Production ML Pipelines

Unit testing is a vital element of production software engineering. After all, how do we know for sure that our code always returns the expected result regardless of input?

Unit testing is especially important in production machine learning because model training and pre-processing functions do not always throw exceptions when they should. Instead, the errors are often “absorbed” and ML pipeline execution seems to complete successfully.

For example, if a neural network layer expects a 3D tensor as input, and we have a bug where two of the dimensions are in the wrong order, training will complete “successfully” and yield a reasonable model (sad but true according to Andrej Karpathy, Senior Director of AI at Tesla).

Or, if an entire feature column comes in null due to a broken data engineering pipeline, df.fillna(…) will naively handle it without us knowing about the root problem. Validating upstream data quality systematically is paramount before deploying machine learning models to production.

In our previous article, we covered how to validate upstream data quality as the 2nd step of a machine learning training pipeline. This Validator microservice is essential before proceeding to the Transformer microservice for pre-processing and feature engineering:

No alt text provided for this image

As we write data validation functions, a best practice is to structure the code so that we can unit test each function independently, incrementally as we go.

The data validation microservice is special in the sense that each function contains assert statements to validate upstream data quality. However, these assert statements are not unit tests. Real unit testing requires several test cases per function to verify different inputs produce success or failure as expected.

These are the steps my team and I follow to unit test each data validation function:

  1. Make it succeed with the expected input dataset quality
  2. Brainstorm the most important test cases where we would want the data validation microservice to fail and notify upstream data engineers
  3. Make it fail correctly with a “low quality” input dataset for each test case to confirm the function does catch the data quality issues as expected
  4. Expand the unit test suite incrementally as new errors or edge case bugs appear in production (similar to the incremental improvement of error handling workflows)

We begin by extracting a high quality training dataset snapshot from our transactional data lake and subsampling a relatively small number of records:

df = dataset.extract()

test_df = df.sample(withReplacement=True, fraction=0.01, seed=1)

baseline_counts = test_df.count())

Next, we add a few test cases per data validation function (showing a subset for simplicity). Each test case is a “fake” PySpark DataFrame that has been transformed to lower data quality for the given test:

data_type_error = test_df.withColumn('StringFeature', df.StringFeature.cast(IntegerType())

missing_column_error = test_df.drop('StringFeature')

extra_column_error = test_df.withColumn('ExtraBaseFeature', f.lit(10))

different_order = test_df.select("Label","Feature_2","Feature_3", "Feature_4", ... , "Feature_1")

schema_test_cases = {
    'data_type_error': data_type_error,
    'missing_column_error': missing_column_error,
    'extra_column_error': extra_column_error,
    'different_order': different_order

constant_column_error = test_df.withColumn('Label', f.lit(1))
categorical_column_error = test_df.withColumn('StringFeature', f.lit('WA'))

useful_test_cases = {
    'constant_column_error': constant_column_error,
    'categorical_column_error': categorical_column_error

empty_num_col_error = test_df.withColumn('DecimalFeature', f.lit(None).cast(StringType()))

empty_array_col_error = test_df.withColumn('ArrayFeature', f.lit(None).cast(StringType()))

empty_test_cases = {
    'empty_num_col_error': empty_num_col_error,
    'empty_array_col_error': empty_array_col_error

negative_decimal_error = test_df.withColumn('DecimalFeature', f.abs(f.rand(seed=1)) * -10.0)

negative_int_error = test_df.withColumn('IntegerFeature', f.round(f.abs(f.rand(seed=1)) * -10.0, 0).cast(IntegerType()))

invalid_dates_error = test_df.withColumn('DateFeature', f.date_sub(f.col('DateFeature'), 3650))

invalid_labels_error = test_df.withColumn('Label', f.round(f.abs(f.rand(seed=1)) * 10.0, 0).cast(IntegerType()))

numeric_test_cases = {
    'negative_decimal_error': negative_decimal_error,
    'negative_int_error': negative_int_error,
    'invalid_dates_error': invalid_dates_error,
    'invalid_labels_error': invalid_labels_error

Finally, we execute all unit tests per data validation function and verify they all “fail correctly”:

print("Verifying input dataset schema...\n")

for name, test_case_df in schema_test_cases.items():
        print(f'verify_input_schema correctly failed with {name}')

print("Verifying useful columns...\n")

for name, test_case_df in useful_test_cases.items():
        print(f'verify_useful_columns correctly failed with {name}')

print("Verifying no empty columns...\n")

for name, test_case_df in empty_test_cases.items():
        verify_no_empty_columns(test_case_df, baseline_counts)
        print(f'verify_no_empty_columns correctly failed with {name}')

print("Verifying numeric feature ranges...\n")

for name, test_case_df in numeric_test_cases.items():
        verify_numeric_features(test_case_df, baseline_counts)
        print(f'verify_numeric_features correctly failed with {name}'))

We run unit tests for a given ML pipeline microservice during the test phase of our cross-account CI/CD pipeline, based on git commit events. The build phase containerizes the microservice, the test phase runs unit tests & integration testing, and the deploy phase pushes the code to production:

No alt text provided for this image

Make sure each unit test is meaningful and useful. Write as many as possible (within reason) during the brainstorming step and prune as you go. We recommend establishing a unit test suite baseline and iteratively enhancing it as needed.

I like adding unit testing to the acceptance criteria of technical tasks during a 2-week sprint to be implemented naturally during development. This way my team and I develop the habit and continuously release well-tested, high quality code.

We will cover unit testing of more “traditional” functions, such as feature engineering functions and window functions in a future article.

What do you think about this unit testing approach for data validation microservices? Is there anything you would do differently? Let us know in the comments!

Subscribe to our weekly LinkedIn newsletter: Machine Learning In Production

Reach out if you need help:

  • Maximizing the business value of your data to improve core business KPIs
  • Deploying & monetizing your ML models in production
  • Building Well-Architected production ML software solutions
  • Implementing cloud-native MLOps
  • Training your teams to systematically take models from research to production
  • Identifying new DS/ML opportunities in your company or taking existing projects to the next level
  • Anything else we can help you with

Would you like me to speak at your event? Email me at info@carloslaraai.com

Subscribe to our blog: https://gradientgroup.ai/blog/

Leave a Reply

%d bloggers like this: