## Introduction

Scikit-learn (or sklearn) is the machine learning tool of choice for exploratory analysis by data scientists. It has over 45k stars on GitHub and was downloaded over 7 million times in the last month (March 2021) Their fit / transform / predict API is now ubiquitous in the python machine learning ecosystem with many other open source projects choosing to be compatible with that API.

### Motivation

In order to leverage the deeper features of the sklearn platform, it is useful to build custom data transformation pipelines using the provided classes. In this blog post, we will focus on using Custom Transformers and Pipelines which are essential to delivering replicable results.

When dealing with real-world data, it is often difficult to convert data manipulations into repeatable steps: this is where custom transformers come into play. Additionally, you might then want to apply these custom transformations in sequence. This is where Pipelines become useful. Putting these two tools together, you have a powerful playbook to handle all the messy data the real world throws at you.

Note: This blog post assumes that you already have a basic understanding of the sklearn API and concepts such as fit / transform / predict and so on. If some of that is new to you, please check out this fantastic page in the sklearn docs.
# Install Required packages
# Python 3.8
!pip install kaggle==1.5.12
!pip install scikit-learn==0.24.1
!pip install pandas==1.2.3
!pip install python-dotenv==0.17.0
!pip install fastcore==1.3.19

from pathlib import Path
from pprint import pprint
from typing import Any, Union

import numpy as np
import pandas as pd
from fastcore.basics import patch
from pandas import json_normalize
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin, clone
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, OrdinalEncoder
from sklearn.utils.validation import check_array, check_is_fitted

np.random.seed(42)

SklearnInput = Union[list, np.ndarray, pd.DataFrame, pd.Series]


## The Dataset

In this blog post we'll be using the motivating example of the Prescription Based Prediction (PBP) dataset. The PBP dataset combines the frequencies of prescribed medication under Medicare part D with practitioner data from the National Provider Identifier dataset. The authors of this aggregate dataset had a corresponding blog post explaining the dataset in a bit more detail. It is no longer live, but it is still available on The Wayback Machine.

Note: You'll need to configure your Kaggle API credentials to download the dataset or, you can simply go to Kaggle and manually unzip the data into the notebook folder. Simply uncomment the next line and replace it with your kaggle credentials. Read more about this here.


from dotenv import load_dotenv

import kaggle

kaggle.api.authenticate()
"roamresearch/prescriptionbasedprediction", unzip=True
)


The data itself is in the uncommon .jsonl format which is a newline delimited dataset. Each line contains a json object. We can use pandas to read this dataset into a pd.DataFrame.

FPATH = Path("/work/roam_prescription_based_prediction.jsonl")

cms_prescription_counts provider_variables npi
0 {'DOXAZOSIN MESYLATE': 26, 'MIDODRINE HCL': 12... {'settlement_type': 'non-urban', 'generic_rx_c... 1295763035
1 {'CEPHALEXIN': 23, 'AMOXICILLIN': 52, 'HYDROCO... {'settlement_type': 'non-urban', 'generic_rx_c... 1992715205
2 {'CEPHALEXIN': 28, 'AMOXICILLIN': 73, 'CLINDAM... {'settlement_type': 'non-urban', 'generic_rx_c... 1578587630

To understand this data a bit better, lets explode both of the json variables in the first row. We can use the pandas builtin function json_normalize which converts a json dictionary into a flat table.

json_normalize(df.iloc[0].cms_prescription_counts)

DOXAZOSIN MESYLATE MIDODRINE HCL MEGESTROL ACETATE BENAZEPRIL HCL METOLAZONE NOVOLOG DIAZEPAM HYDRALAZINE HCL SENSIPAR LABETALOL HCL ... RENVELA ABILIFY SERTRALINE HCL CIPROFLOXACIN HCL SIMVASTATIN PRAVASTATIN SODIUM ATENOLOL-CHLORTHALIDONE ALPRAZOLAM AZITHROMYCIN TRAMADOL HCL
0 26 12 11 11 73 12 24 50 94 28 ... 117 11 14 19 14 13 53 45 18 11

1 rows × 68 columns

json_normalize(df.iloc[0].provider_variables)

settlement_type generic_rx_count specialty years_practicing gender region brand_name_rx_count
0 non-urban 2287 Nephrology 7 M South 384

Cool, let's now see what the distribution of those variables looks like.

json_normalize(df["provider_variables"]).settlement_type.value_counts()

non-urban    152270
urban         87660
Name: settlement_type, dtype: int64
json_normalize(df["provider_variables"]).region.value_counts()

South        80562
Northeast    59012
West         50279
Midwest      50077
Name: region, dtype: int64

It looks like the cms_prescription_counts column contains a json dictionary with the names of each drug followed by the count. The provider_variables column seems to have a json dictionary that contains information about the provider who prescribed the drugs.

It might be interesting to build a few models that predict some of these variables given the prescribed drugs and counts. In this blog post, we'll focus on estimating the settlement_type and the region.

Let's set the feature and response variables to their appropriate column and convert them to native Python lists. Let's also split the data into a train and validation set.

TEST_PCT = 0.2
X = df.cms_prescription_counts.tolist()
y = df.provider_variables.tolist()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_PCT)


## Custom Transformers

Developing sklearn API compatible transformers will allow us to make full use of other aspects of the library. In this example, we'll implement two custom transformers that leverage sklearn base classes to transform the response variable in the dataset.

### JSONTargetSelector

First, let's build a way to quickly extract a desired response variable from y. We start by defining a class that inherits from TransformerMixin which gives us the fit_transform method if we define the fit and transform methods.

Next up, we define the constructor which requires two variables that tell us the key to extract from each json row and the dimensions of the data output. We also include an is_fit_ attribute which lets sklearn helper functions know that this transformer is already been "fitted" (i.e. it doesn't need to be fit)

class JSONTargetSelector(TransformerMixin):
"""From a list of json blobs select a particular key and compose a new list of those values.

Args:
target_name: The json key to extract in each row
two_d_out: Whether the output transformation should be 2d

"""

def __init__(self, target_name: str, two_d_out: bool = False):
self.target_name = target_name
self.two_d_out = two_d_out
self.is_fit_ = True


Now, let's add the fit method. It is empty because we already have all the information we need to transform input data from the constructor alone. We return self (the object instance) as per convention.

Note: Here we use the neat @patch decorator to add instance methods to the class so that we don't have to have all of the code in a single jupyter cell.
@patch
def fit(self: JSONTargetSelector, X: SklearnInput, y: SklearnInput = None):
return self


Now let's add the transform functionality. We break this up into the two methods. The _extract_target method allows us to pull a particular key out of an input dictionary while also validating that the inputs are as we expect them. The transform method does the meat of the data conversion. It turns a list of json objects to a flat list of selected values.

@patch
def _extract_target(self: JSONTargetSelector, index: int, json_dict: dict):
"""Get the target key from a dictionary.

Args:
index: the index of the current row
json_dict: potentially a json dictionary which has target_name as a key

Returns:
The value of the target_name key.

Raises:
ValueError: If json_dict is not a dictionary
ValueError: If target_name is not in the json_dict

"""
if not isinstance(json_dict, dict):
raise ValueError(f"item at index {index} is not a dictionary.")
if self.target_name not in json_dict:
raise ValueError(f"index {index} is missing target {self.target_name}.")
return json_dict.get(self.target_name)

@patch
def transform(self: JSONTargetSelector, X: SklearnInput, y: SklearnInput = None):
X = check_array(
X, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype="object"
)
out = np.array([self._extract_target(i, Xi) for i, Xi in enumerate(np.squeeze(X))])
if self.two_d_out:
out = out.reshape((-1, 1))
return out


Lastly, let's add the identity function as the inverse transform. There is no meaningful inverse since we lose data during the transform and future sections will require this function to have an implementation.

@patch
def inverse_transform(self: JSONTargetSelector, X: SklearnInput):
return X


We can test that the above implementation works with some fake data. Given we selects "c" as the desired key, we should see [2 4 8] in the output which is exactly what we observe.

d = {k: i for i, k in enumerate(list("abcd"))}
list_of_dict = [d, {k: v**2 for k, v in d.items()}, {k: v**3 for k, v in d.items()}]

print("List of Dicts")
pprint(list_of_dict)
print()

selected_key = "c"
jts = JSONTargetSelector(selected_key)
jts_trans = jts.fit_transform(list_of_dict)

print("Transformed Data")
print(jts_trans)

List of Dicts
[{'a': 0, 'b': 1, 'c': 2, 'd': 3},
{'a': 0, 'b': 1, 'c': 4, 'd': 9},
{'a': 0, 'b': 1, 'c': 8, 'd': 27}]

Transformed Data
[2 4 8]


### TransformedTargetClassifier

Phew, alright now that we've got that class completed we need to build a wrapper class that will allow us to actually use it in downstream applications.

Why, you may ask? In our case, we want a simple way to both transform the response variable and estimate on the response variable. Building a composite meta estimator is the way to go here considering the broader API constraints. To be honest, it's a bit of a quirk in the way the sklearn API was originally designed. By that, I mean that the original API design does not really take into account the possibility of transforming the response variable (specifically using Pipelines which we'll see in just a bit).

sklearn already has addressed that flaw with a built-in meta estimator called TransformedTargetRegressor, but unfortunately, it only works for regression estimators. (e.g. applying a log transform to your response variable) In our case, we are looking at a binary classification problem, so we'll need to roll our own.

First, let's define a constructor which takes in an instance of a classifier and a transformer.

class TransformedTargetClassifier(ClassifierMixin, BaseEstimator):
"""A meta estimator that both transforms the response variable.

Args:
classifier: An instance of a class that inherits from BaseEstimator
transformer: An instance of a class that inherits from TransformerMixin

"""

def __init__(
self, classifier: BaseEstimator, transformer: Union[TransformerMixin, Pipeline]
):
self.classifier = classifier
self.transformer = transformer


Now when fitting this meta estimator, we need to fit both the classifier and the transformer. But first, we need to quickly validate the input y array. Stealing a bit of code from TransformTargetRegressor we can validate both the datatypes and shape of the inputs.

We use check_array which is a validation function from sklearn which converts inputs into np.ndarray form and validates a bunch of other things for us. We follow this with a bit of code to make sure that the dimensions of the response variable are correct. This is so that we can handle either the user passing a row vector or a column vector.

@patch
def fit(  # noqa: F811
self: TransformedTargetClassifier,
X: SklearnInput,
y: SklearnInput,
**fit_params: Any,
):
y = check_array(
y, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype="object"
)
# store the original dimensions of y
self._training_dim = y.ndim
if y.ndim == 1:
y_2d = y.reshape(-1, 1)
else:
y_2d = y
self.transformer.fit(y_2d)
y_trans = self.transformer.transform(y_2d)
self.classifier.fit(X, y_trans, **fit_params)
self.is_fit_ = True

return self


With the fit method in place, we turn our attention to predict which is where the magic happens.

We use the check_is_fitted method to make sure that the particular transformer is fit. sklearn has an internal convention which checks for a variable to exist with a trailing underscore (_) to know if an instance has been fit. Since we don't know the exact classifier or transformer, we cannot get any more specific than that. Unfortunately, Pipelines do not set the conventional attributes at fit time so we have no way of knowing if it has been fitted.

Next, we run the predict step of the classifier and run an inverse transformation on its output. (running the fit steps in reverse) This was why we made sure to define the inverse_transform method in our previous custom transformer. After that, we make sure to handle data dimension issues before returning the output.

@patch
def predict(self: TransformedTargetClassifier, X: SklearnInput):
check_is_fitted(self.classifier)
if not isinstance(self.transformer, Pipeline):
check_is_fitted(self.transformer)

pred = self.classifier.predict(X)

if pred.ndim == 1:
pred = pred.reshape(-1, 1)

pred_trans = self.transformer.inverse_transform(pred)

if self._training_dim == 1 and pred_trans.ndim == 2 and pred_trans.shape[1] == 1:
pred_trans = pred_trans.squeeze(axis=1)

return pred_trans


Lastly, we have to add an updated score method. We cannot simply just use the inherited scoring method since we need to transform the ground truth variable before scoring. We use accuracy just the same as the super class.

@patch
def score(
self: TransformedTargetClassifier,
X: SklearnInput,
y: SklearnInput,
sample_weight: Union[list, np.ndarray] = None,
):
y = check_array(
y, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype=None
)
if y.ndim == 1:
y = y.reshape((-1, 1))
y = self.transformer.transform(y)

return accuracy_score(y, self.classifier.predict(X), sample_weight=sample_weight)


Finally, we're ready to see if this class works!

Let's generate some random data with 100 samples to test in a Pipeline. We'll cover exactly how Pipelines work in the next section so don't worry about the details here. We just want to make sure that we see strings instead of numbers as the output of a prediction, which, we do indeed see.

_y = np.array(["a", "b"] * 50)
_X = np.random.normal(size=(100, 3))
_X_test = np.random.normal(size=(3, 3))

oe = Pipeline(
[
("oe", OrdinalEncoder()),
("reshaper", FunctionTransformer(lambda x: np.squeeze(x))),
]
)

lr = LogisticRegression()
ttc = TransformedTargetClassifier(lr, oe)

ttc.fit(_X, _y)
ttc.predict(_X_test)

array(['a', 'a', 'a'], dtype=object)

## Pipelines

Pipelines allow you to run multiple operations on an input dataset in succession before an estimation is performed. It was originally designed to be a linear step-by-step transformation template but there are now additional tools in the Pipeline toolkit that allow for horizontal joining of "columns". This is out of scope for this blog post but you can check out the docs for that here.

The main use case for this approach becomes apparent when we look to perform experiments on a particular machine learning approach. Pipelines make it straightforward to perform cross validation and grid searching across both the feature engineering and estimator hyperparameters. Pipelines make it very hard to accidentally get your data into an improper state which often happens when you have the same input variable being modified across multiple jupyter notebook cells.

In the next two cells we define a Pipeline which is applied to our PBP dataset. Since the last step in the pipeline inherits from BaseEstimtor the pipeline behaves exactly like an estimator. (enabling methods like predict and score) When we call fit, each of the steps fits the transformer and transforms the data before passing it to the next step.

The following are descriptions of each of the steps across both estimators

• Feature Transformation
• DictVectorizer: Converts the input column of dictionary values into a sparse nxm matrix where n is the number of rows and m is the number of unique prescriptions. The value in each cell is the frequency with which the drug was prescribed by a practitioner.
• TfidfTransformer: We can think of the above dictionary almost like a "document" with frequency counts of terms. Hence, we can use term frequency, inverse document frequency (TFIDF) to process those terms into a form that would be more meaningful for a classification model
• Response Transformation ( TransformedTargetClassifier )
• Transformer (also a Pipeline)
• JSONTargetSelector: We supply this step a chosen response variable
• OrdinalEncoder: This step converts a list of strings into numbers incrementing one by one.
• FunctionTransformer: This step is a convenience method that allows us to apply a lambda function to the entire dataset. We make sure to reshape the data to a column vector as is required by most sklearn estimators
• Classifier
• LogisticRegression: We choose to use logistic regression because it is the simplest classification model

Here we also clone settlement_pipe which duplicates its structure and parameters without keeping its state. We then swap out JSONTargetSelector with an instance that has the right target for the second pipeline.

settlement_pipe = Pipeline(
[
("dv", DictVectorizer()),
("tfidf", TfidfTransformer()),
(
"ttc",
TransformedTargetClassifier(
classifier=LogisticRegression(max_iter=1000),
transformer=Pipeline(
[
["jts", JSONTargetSelector("settlement_type", two_d_out=True)],
("enc", OrdinalEncoder()),
("ft", FunctionTransformer(lambda x: np.squeeze(x))),
]
),
),
),
]
)
region_pipe = clone(settlement_pipe)
region_pipe.steps[2][1].transformer.steps[0][1] = JSONTargetSelector(
"region", two_d_out=True
)

settlement_pipe.fit(X_train, y_train);

region_pipe.fit(X_train, y_train);


Let's do some quick evaluations to see what the output looks like. Carefully selecting some specific rows which do not have too many prescriptions, we find the following output. Looks like 100% accuracy on this cherry picked, dataset... time to call it a day?

N = slice(1, 4)
sample_X = json_normalize(X[N])

preds_df = pd.DataFrame(
zip(
json_normalize(y[N]).settlement_type,
settlement_pipe.predict(X[N]),
json_normalize(y[N]).region,
region_pipe.predict(X[N]),
),
columns=["settlement_type", "settlement_type_hat", "region", "region_hat"],
)
rslts_df = pd.concat([sample_X, preds_df], axis=1)
rslts_df

CEPHALEXIN AMOXICILLIN HYDROCODONE-ACETAMINOPHEN CLINDAMYCIN HCL settlement_type settlement_type_hat region region_hat
0 23.0 52 28.0 NaN non-urban non-urban South South
1 28.0 73 NaN 11.0 non-urban non-urban Midwest Midwest
2 NaN 63 NaN NaN non-urban non-urban South South

Anyways, when we actually score the models, we find the following training accuracies. Not too bad for no hyperparameter tuning.

s_train_score = settlement_pipe.score(X_train, y_train)
r_train_score = region_pipe.score(X_train, y_train)
s_test_score = settlement_pipe.score(X_test, y_test)
r_test_score = region_pipe.score(X_test, y_test)

scores_df = pd.DataFrame(
{
"settlement_type": [f"{s_train_score:.3f}", f"{s_test_score:.3f}"],
"region": [f"{r_train_score:.3f}", f"{r_test_score:.3f}"],
},
index=["train", "test"],
)

scores_df

settlement_type region
train 0.646 0.462
test 0.636 0.453

## Further Resources

Future Work

• One interesting improvement here could be to add an extra custom transformer before the DictVectorizer. One that removes frequent and infrequent medications may improve performance. You should have all the tools to add this step to the pipeline.
• Another improvement to deepen your understanding of pipelines would be to try and see how you could use GridSearchCV to find a more optimal model. It would be especially useful to see how tuning interplays with your custom transformer from the previous step

The best place to learn more about these advanced sklearn features is actually their documentation. They have a user guide which is quite good, and their API is well documented as well. Additionally, I picked up most of the transformer tricks by reading the source code of the feature extraction transformers. If you have access to the PyCharm debugger, it is also really useful to step through the code as Pipeline.fit runs since a lot of functionality is abstracted away.

## Appendix

from nbformat import read

# Default path for this notebook
with open("/work/notebook.ipynb", "r", encoding="utf-8") as f:

# Count the number of lines in markdown or heading cells
word_count = sum(
[
len(cell["source"].replace("#", "").lstrip().split(" "))
for cell in nb["cells"]
]
)

# Count number of lines in the notebook and subtract the number of
# lines in this cell
line_count = (
sum(
[
# Filter out cells that are comments or are empty
len(
list(
filter(
lambda line: not (line.lstrip().startswith("#")),
cell["source"].split("\n"),
)
)
)
for cell in nb["cells"]
if cell.cell_type == "code"
]
)
- 27
)

print(f"Word Count: {word_count:,}")
print(f"Line Count: {line_count:,}")

Word Count: 2,114
Line Count: 206