{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Foundations of Machine Learning - Session 05\n",
"\n",
"- *Course*: Foundations of Machine Learning\n",
"- *Session*: 04\n",
"- *Unit*: Classification Trees\n",
"\n",
"This notebook develops tree-based classifiers using Numpy. Both the ID3 and CART algorithm will be constructed and their results compared."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from collections import Counter # Counter([\"a\", \"b\", \"a\"]) == {\"a\": 2, \"b\":1}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data\n",
"\n",
"For this notebooks' classifiction task, we are going to use the famous Iris toy dataset. The data set contains 3 classes of 50 instances each, where each class refers to a type of iris plant. Each sample consist of four features, each describing one property of the flower: `sepal_width`, `sepal_length`, `petal_width`, `petal_length`. One class is linearly separable from the other 2; the latter are NOT linearly separable from each other.\n",
"\n",
"The code below is pre-filled to load the Iris dataset using `sklearn`, and randomly split it into 80:20 train/test subset."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.datasets import load_iris\n",
"# Load Iris dataset from sklearn\n",
"data = load_iris()\n",
"# Assemble D\n",
"D = list(zip(\n",
" data.data,\n",
" data.target\n",
"))\n",
"# Shuffle D\n",
"np.random.shuffle(D)\n",
"# 80:20 split into train and test set\n",
"D_train = D[:int(len(D) * 0.8)]\n",
"D_test = D[int(len(D) * 0.8):]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we want to train both CART and ID3, one problem arises: ID3 can only deal with categorical data, yet the Iris dataset features numerical values. We therefore need to *quantize* the data.\n",
"\n",
"**Exercise**: quantize each feature separately by representing each value by the index of its quartile, i.e. a value in the 4th quartile is represented by `3`. Then, quantize the train and test sets to form `D_train_quant` and `D_test_quant`.\n",
"\n",
"*Hints*:\n",
"- use `np.quantile` to find the quartiles for each feature\n",
"- use `np.digitize` to find the correct quartile for each value\n",
"- `np.apply_along_axis` allows you to apply a function separately to each axis of an array."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def quantize(arr, q=4):\n",
" \"\"\"\n",
" Quantizes the input array into the specified number of quantiles.\n",
" :param arr: array to quantize\n",
" :param q: number of quantiles to quantize with\n",
" :return: quantized array\n",
" \"\"\"\n",
" bins = np.quantile(arr, np.linspace(0, 1, q+1)[1:-1])\n",
" return np.digitize(arr, bins)\n",
"\n",
"# Quantize data\n",
"D_quant = list(zip(\n",
" np.apply_along_axis(quantize, 0, data.data),\n",
" data.target\n",
"))\n",
"\n",
"# Shuffle D_quant\n",
"np.random.shuffle(D_quant)\n",
"# 80:20 split into quantized train and test set\n",
"D_quant_train = D_quant[:int(len(D) * 0.8)]\n",
"D_quant_test = D_quant[int(len(D) * 0.8):]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Entropy\n",
"\n",
"For both ID3 and CART, we will use entropy as splitting criterion. It is given as:\n",
"\n",
"$$ \\iota(D) = -\\sum_{i=1}^k \\frac{|\\{\\mathbf{x}, c_i)\\in D\\}|}{|D|}\\cdot\\log_2\\frac{|\\{\\mathbf{x}, c_i)\\in D\\}|}{|D|} $$\n",
"\n",
"**Exercise**: implement a function to calculate the entropy of a given dataset D.\n",
"\n",
"*Hints*:\n",
"- use `Counter` to get the number of instances for each class."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def entropy(D):\n",
" \"\"\"\n",
" Computes the entropy for a given set of data\n",
" :param D: the dataset to compute entropy for. Consists of (feature_vector, class) tuples.\n",
" :return:\n",
" \"\"\"\n",
" # Compute total number of samples\n",
" n_samples = len(D)\n",
" # Count class occurrences\n",
" class_counts = Counter([c for _, c in D])\n",
" # Transform class counts into probabilities\n",
" class_counts = {k: v/n_samples for k,v in class_counts.items()}\n",
" # Compute entropy of complete Dataset\n",
" h = -sum(class_counts[k] * np.log2(class_counts[k]) for k in class_counts.keys())\n",
" return h"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## ID3\n",
"\n",
"The first tree algorithm we will implement is ID3. The implementation is analogous to the lecture and split into the actual ID3 algorithm `train_id3`, and the information gain function that returns the entropy delta for a given feature on D (`information_gain_id3`). Information gain is formulated as:\n",
"\n",
"$$\\Delta\\iota = \\iota(D) - \\sum_{l=1}^m\\frac{|D_l|}{D}\\cdot\\iota(D_l) $$\n",
"\n",
"\n",
"\n",
"**Exercise** implement `information_gain_id3`, which calculates the information gain of D and a give feature."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def information_gain_id3(D, feature):\n",
" \"\"\"\n",
" Computes the information gain of feature f on dataset D.\n",
" :param D: dataset to compute the information gain with.\n",
" :param feature: feature to compute the information gain for.\n",
" :return: the information gain.\n",
" \"\"\"\n",
" # Compute total entropy\n",
" h_t = entropy(D)\n",
" # Compute feature expressions\n",
" F = Counter([d[0][feature] for d in D])\n",
" # Add weighted entropy of each feature expression to total entropy\n",
" h_f = 0\n",
" for f in F.keys():\n",
" h_f += F[feature]/len(D) * -entropy(list(filter(lambda x: x[0][feature] == f, D)))\n",
" return h_t - h_f"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we will implement the (recursive) ID3 training function. The tree itself is to be represented by nested dictionaries. Each has three keys:\n",
"1. `label` – specifies the label of this tree node\n",
"2. `feature` - specifies the feature that is used for splitting at this node\n",
"3. `children` - contains a nested dict, where keys are the feature values of the splitting feature, and values are the subtrees (dictionaries) for each.\n",
"\n",
"The `train_id3` function is recursive: the `children` of a node are determined by recursively applying `train_id3` on the reduced data- and feature set. Don't forget the base case and exit criteria as specified in the lecture, to avoid infinite recursion!\n",
"\n",
"**Exercise**: implement `train_id3`.\n",
"\n",
"*Hints*:\n",
"- leave the `feature` and `children` keys of the node dictionary at `None` for leaf nodes\n",
"- use `np.argmax` to find the index of the maximum element in a list/array\n",
"- you can use set comprehensions to calculate the domain of a feature\n",
"- you can use either list comprehensions of `filter()` to build the data subsets for recursion"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"\n",
"def train_id3(D, features: list):\n",
" # Create a new node\n",
" t = {\n",
" \"label\": None,\n",
" \"feature\": None,\n",
" \"children\": {}\n",
" }\n",
" # Count the occurrences of each class in D\n",
" class_counts = Counter([c for _, c in D])\n",
" # Find the most common class in D and assign it to t\n",
" t[\"label\"] = max(class_counts, key=class_counts.get)\n",
" # If D is pure, return t\n",
" if len(class_counts) == 1:\n",
" return t\n",
" # No features are left\n",
" if not features:\n",
" return t\n",
" # Compute feature with maximum information gain\n",
" idx_max_f = np.argmax(list(map(lambda f: information_gain_id3(D, f), features)))\n",
" t[\"feature\"] = features[idx_max_f]\n",
" # Remove max feature from feature set\n",
" features.remove(t[\"feature\"])\n",
" # Calculate domain of max feature on D\n",
" dom_A = {d[0][t[\"feature\"]] for d in D}\n",
" # Calculate subtrees for every feature expression\n",
" for a in dom_A:\n",
" # Subset data\n",
" D_a = list(filter(lambda x: x[0][t[\"feature\"]] == a, D))\n",
" # If no data is left\n",
" if len(D_a) > 0:\n",
" # Recursive call\n",
" t[\"children\"][a] = train_id3(D_a, features)\n",
" return t"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Exercise**: apply `train_id3` to `D_train_quant` and all classes (`[0,1,2]`)."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"id3_model = train_id3(D_quant_train, [0,1,2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To predict a class for new data using the tree inferred with ID3, we need to write a function that traverses the tree until it reaches a leaf node, i.e. class decision.\n",
"\n",
"**Exercise** implement `predict_id3`, which should traverse the decision tree and follow each node's decision rule given the input data.\n",
"\n",
"*Hints*:\n",
"- the implementation should be recursive\n",
"- leaf nodes can be identified by checking if `feature is None`, i.e., a node has no decision rule"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"def predict_id3(model, d):\n",
" \"\"\"\n",
" Predict a class for a given sample using the given ID3 model.\n",
" :param model: an ID3 decision tree model\n",
" :param d: a feature vector\n",
" :return: the class prediction\n",
" \"\"\"\n",
" feature = model[\"feature\"]\n",
" if feature is None:\n",
" return model[\"label\"]\n",
" else:\n",
" return predict_id3(model[\"children\"][d[feature]], d)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Like in previous weeks, apply either your own or `sklearns` implementation of evaluation functions to evaluate the effectiveness of your ID3 implementation.\n",
"\n",
"**Exercise** evaluate the ID3 model.\n",
"\n",
"*Hints*:\n",
"- pay attention to use the correct train/test splits\n",
"- use the quantized version of both train and test data"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" precision recall f1-score support\n",
"\n",
" 0 1.00 0.58 0.74 12\n",
" 1 0.62 0.71 0.67 7\n",
" 2 0.53 0.73 0.62 11\n",
"\n",
" accuracy 0.67 30\n",
" macro avg 0.72 0.67 0.67 30\n",
"weighted avg 0.74 0.67 0.68 30\n",
"\n"
]
}
],
"source": [
"from sklearn.metrics import classification_report\n",
"\n",
"print(classification_report([d[1] for d in D_quant_test], [predict_id3(id3_model, d[0]) for d in D_quant_test]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## CART\n",
"\n",
"Next, implement the CART algorithm. As for ID3, we will implement it in two steps: one function to calculate the information gain (`information_gain_cart`) and one for the actual training loop (`train_cart`). CART differs from ID3 in that it will figure out an optimal decision boundary at each node, thus the information gain function should not only return the entropy delta, but also the optimal threshold for the specified feature.\n",
"\n",
"The optimal threshold is determined by the following formula:\n",
"\n",
"$$ \\Delta\\iota(D(t), \\{D(t_L), D(t_R)\\} = \\iota(D(t)) - \\frac{|D(t_L)|}{|D|} \\cdot \\iota(D(t_L)) - \\frac{|D(t_R|}{|D|} \\cdot \\iota(D(t_R))$$\n",
"\n",
"**Exercise**: implement `information_gain_cart`.\n",
"\n",
"*Hints*:\n",
"- use the `entropy` function defined previously, applying it to the relevant subsets (left/right) of each split\n",
"- calculate the entropy for each possible threshold to identify the optimal one"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"def information_gain_cart(D, feature):\n",
" \"\"\"\n",
" Computes the information gain of feature f on dataset D by finding and optimal splitting.\n",
" :param D: dataset to compute the information gain with.\n",
" :param feature: feature to compute the information gain for.\n",
" :return: the information gain and the threshold\n",
" \"\"\"\n",
" # Compute total entropy\n",
" h_t = entropy(D)\n",
" # Compute splittings\n",
" values = sorted(list(set([d[0][feature] for d in D])))\n",
" splits = [0.5 * (values[i] + values[i+1]) for i in range(len(values) - 1)]\n",
"\n",
" # Find threshold with maximum information gain\n",
" h_max, threshold = 0, 0\n",
" for split in splits:\n",
" # Build synthetic data labels for each side of the split\n",
" D_l = list(filter(lambda x: x[0][feature] < split, D))\n",
" D_r = list(filter(lambda x: x[0][feature] > split, D))\n",
" h = h_t - (len(D_l)/len(D) * entropy(D_l)) - (len(D_r)/len(D) * entropy(D_r))\n",
" if h > h_max:\n",
" h_max = h\n",
" threshold = split\n",
" # Return delta\n",
" return h_max, threshold"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given the modified information gain formula, the training function also has to be modified. Make three important changes to transform ID3 into CART:\n",
"1. the tree representation is different to ID3. The dictionary now contains 5 keys; `label` and `feature` stay the same as before, `threshold` specifies the numerical decision boundary in a node, and `right child`/`left_child` the two subtrees.\n",
"2. the information gain function also returns the threshold. Remember to track not only the optimal feature, but also its threshold and persist it in the node dictionary.\n",
"3. CART is a binary tree, i.e. the recursion is applied once to the left (feature lower than threshold) and right (feature higher than threshold) data subset. Each is a separate recursion call and is to be persisted in the node dictionary.\n",
"\n",
"**Exercise**: implement `train_cart`"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"def train_cart(D):\n",
" # Create a new node (\n",
" t = {\n",
" \"label\": None,\n",
" \"feature\": None,\n",
" \"threshold\": 0,\n",
" \"left_child\": None,\n",
" \"right_child\": None\n",
" }\n",
" # Compute the set of features\n",
" features = [0,1,2]\n",
" # Count the occurrences of each class in D\n",
" class_counts = Counter([c for _, c in D])\n",
" # Find the most common class in D and assign it to t\n",
" t[\"label\"] = max(class_counts, key=class_counts.get)\n",
" # If D is pure, return t\n",
" if len(class_counts) == 1:\n",
" return t\n",
" # No features are left\n",
" if not features:\n",
" return t\n",
"\n",
" # Compute max feature and threshold\n",
" splits = [information_gain_cart(D, f) for f in features]\n",
" entropies = [x[0] for x in splits]\n",
" thresholds = [x[1] for x in splits]\n",
" idx_max_f = np.argmax(entropies)\n",
" t[\"feature\"] = features[idx_max_f]\n",
" t[\"threshold\"] = thresholds[idx_max_f]\n",
"\n",
" # Calculate subtrees for left side of the threshold\n",
" D_l = list(filter(lambda x: x[0][t[\"feature\"]] < t[\"threshold\"], D))\n",
" if len(D_l) > 0:\n",
" t[\"left_child\"] = train_cart(D_l)\n",
"\n",
" # Calculate subtrees for right side of the threshold\n",
" D_r = list(filter(lambda x: x[0][t[\"feature\"]] > t[\"threshold\"], D))\n",
" if len(D_r) > 0:\n",
" t[\"right_child\"] = train_cart(D_r)\n",
"\n",
" return t"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Exercise** train a CART model by applying `train_cart` to `D_train`.\n",
"\n",
"*Hint*\n",
"- use the unquantized version of the training data\n",
"- remember to use only the train split"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"cart_model = train_cart(D_train)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Exercise**: implement `predict_cart` to traverse the CART tree and assign a class to data samples.\n",
"\n",
"*Hints*:\n",
"- implementation is similar to `predict_id3`\n",
"- for the specified feature, determine whether to traverse the right or left subtree based on the decision threshold (left: <, right: >)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"def predict_cart(model, d):\n",
" \"\"\"\n",
" Predict a class for a given sample using the CART given model.\n",
" :param model: an ID3 decision tree model\n",
" :param d: a feature vector\n",
" :return: the class prediction\n",
" \"\"\"\n",
" feature = model[\"feature\"]\n",
" if feature is None:\n",
" return model[\"label\"]\n",
" if d[feature] < model[\"threshold\"]:\n",
" return predict_cart(model[\"left_child\"], d)\n",
" else:\n",
" return predict_cart(model[\"right_child\"], d)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Exercise**: evaluate CART on `D_test`."
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" precision recall f1-score support\n",
"\n",
" 0 1.00 1.00 1.00 8\n",
" 1 0.90 1.00 0.95 9\n",
" 2 1.00 0.92 0.96 13\n",
"\n",
" accuracy 0.97 30\n",
" macro avg 0.97 0.97 0.97 30\n",
"weighted avg 0.97 0.97 0.97 30\n",
"\n"
]
}
],
"source": [
"from sklearn.metrics import classification_report\n",
"\n",
"print(classification_report([d[1] for d in D_test], [predict_cart(cart_model, d[0]) for d in D_test]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Exercise**: compare the evaluation results of ID3 and CART. Guess why CART performs better."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}