Introduction

This assignment focused on implementing a Decision Tree classifier to predict heart disease based on a real-world medical dataset used in the first HW.

Key components included choosing and justifying an impurity measure, handling both categorical and continuous attributes, and designing an efficient tree-based data structure.

https://www.datacamp.com/tutorial/decision-tree-classification-python

https://www.datacamp.com/tutorial/decision-tree-classification-python


Code Explanation

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

''' -------------- Preprocessing: Used equal-width bucketing to discretize continuous variable --------------------- '''

def bucketize_continuous_values(value, thresholds):
    """
    Bucketizes a continuous value into 0, 1, 2 based on given thresholds.
    :param value: The value to bucketize.
    :param thresholds: A list of 2 thresholds for bucketing (low, medium, high).
    :return: 0, 1, or 2 depending on the value's range.
    """
    if value <= thresholds[0]:
        return 0  # Low
    elif value <= thresholds[1]:
        return 1  # Medium
    else:
        return 2  # High

def preprocessing(df):
    """
    Preprocesses the dataset by bucketizing continuous attributes into three categories.
    :param df: Original dataset (dataframe).
    :return: Preprocessed dataframe with ordinal attributes instead of continuous attributes.
    """
    # Define the bucketing thresholds
    bucketization_pillar_dict = {
        'age': [40, 60],  # Bucketization for age (e.g. 0-40, 41-60, >60)
        'resting blood pressure': [129, 164],  # min:94, max:200
        'maximum heart rate achieved': [115, 158],  # min:71, max:202
        'serum cholesterol in mg/dl': [272, 418],  # min:126, max:564
        'oldpeak = ST depression induced by exercise relative to rest': [2.1, 4.1]  # min:0.0, max:6.2
    }

    # Iterate through each column in the dataframe
    for column in df.columns:
        # Check if the column needs to be bucketized
        if column in bucketization_pillar_dict:
            thresholds = bucketization_pillar_dict[column]
            # Apply the bucketization for each row in the column
            df[column] = df[column].apply(lambda x: bucketize_continuous_values(x, thresholds))

    return df

''' --------------------------------- Used Information Gain as impurity measure ------------------------------------ '''

def entropy(df):
    """
    Calculates the entropy of a target column in the dataframe.
    :param df: Dataframe containing the target column.
    :return: Entropy value.
    """
    value_count = df.value_counts()
    total = sum(value_count)
    probability = [count_i / total for count_i in value_count]
    return -np.sum([p_i * np.log2(p_i) for p_i in probability])

def information_gain(df, feature):
    """
    Calculates the information gain for a given feature based on the entropy of the dataset.
    :param df: The dataframe to split.
    :param feature: The feature column on which to perform the split.
    :return: Information gain for the feature.
    """
    # Entropy at current node before splitting
    total_entropy = entropy(df['Has heart disease? (Prediction Target)'])
    values = df[feature].unique()
    weighted_entropy = 0

    for value in values:
        subset = df[df[feature] == value]
        # Entropy after splitting with 'feature'
        weighted_entropy += (len(subset) / len(df)) * entropy(subset['Has heart disease? (Prediction Target)'])

    return total_entropy - weighted_entropy

''' ---------------------------------- Build Tree using dictionary datastructure ----------------------------------- '''

def build_tree(df, depth=0):
    """
    Recursively builds a decision tree using Information Gain as the splitting criterion.
    :param df: The dataset to build the tree on.
    :param depth: Current depth of the tree (used for stopping condition).
    :return: The decision tree (recursive structure).
    """
    # Stopping conditions
    if (len(df['Has heart disease? (Prediction Target)'].unique()) == 1 or  # All samples belong to one class
            entropy(df['Has heart disease? (Prediction Target)']) < 0.3 or  # Entropy below threshold
            len(df) < 15 or  # Too few samples
            depth == 5):  # Maximum depth reached
        return df['Has heart disease? (Prediction Target)'].mode()[0]  # Most common class

    # Find the best feature based on information gain
    best_gain = -1
    best_feature = None
    for feature in df.columns:
        if feature != 'Has heart disease? (Prediction Target)' and feature != 'person ID':
            gain = information_gain(df, feature)
            if gain > best_gain:
                best_gain = gain
                best_feature = feature

    # Prevent model from splitting on the same feature twice
    if best_feature not in df.columns:
        return df['Has heart disease? (Prediction Target)'].mode()[0]

    # Split the dataset based on the best feature
    values = df[best_feature].unique()
    branches = {}
    for value in values:
        branches[value] = df[df[best_feature] == value]

    tree = {best_feature: {}}
    for value, subset in branches.items():
        # Drop the used feature
        subset = subset.drop(columns=[best_feature])
        # Recursively build subtrees for each value branch
        tree[best_feature][value] = build_tree(subset, depth + 1)

    return tree

''' ------------------------------------ Make Predictions and Evaluate the Tree ------------------------------------ '''

def predict(tree, sample):
    """
    Predicts the class label for a given sample using the decision tree.
    :param tree: The decision tree.
    :param sample: The sample to predict (dictionary format).
    :return: Predicted class label.
    """
    while isinstance(tree, dict):  # Continue until a leaf node is reached
        feature = next(iter(tree))  # Get the first key (best feature)
        value = sample.get(feature)  # Get the feature value in the sample

        if value not in tree[feature]:  # Handle unseen values
            return 'Yes'

        tree = tree[feature][value]

    return tree  # Leaf node, return class label

''' ---------------------------------------- Print tree structure ---------------------------------------- '''

def print_tree(tree, depth=0):
    """
    Recursively prints the decision tree structure in a readable format.
    :param tree: The decision tree dictionary.
    :param depth: Current depth level for indentation.
    """
    if not isinstance(tree, dict):  # If the tree is a leaf node (a class label)
        print("  " * depth + f"--> {tree}")
        return

    # Extract the feature used for splitting
    feature = next(iter(tree))
    print("  " * depth + f"[Feature: {feature}]")  # Print the feature at current depth

    for value, subtree in tree[feature].items():  # Iterate through branches
        print("  " * depth + f"  ├── {feature} = {value}")
        print_tree(subtree, depth + 1)  # Recursively print subtrees

''' ----------------------------------------------- Main Execution ------------------------------------------------- '''

def main():
    """
    Main function to run the decision tree algorithm using data.csv file with 80-20 stratified split.
    """
    # Load the dataset
    file_path = 'data.csv'
    df = pd.read_csv(file_path)

    # Preprocess the data
    df = preprocessing(df)

    # Separate features and target
    X = df.drop('Has heart disease? (Prediction Target)', axis=1)
    y = df['Has heart disease? (Prediction Target)']

    # Create stratified train-test split (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

    # Recombine features and target for training
    train_df = pd.concat([X_train, y_train], axis=1)

    # Build the decision tree
    decision_tree = build_tree(train_df)

    # Print the tree structure
    print("\\n================ Decision Tree Structure ================\\n")
    print_tree(decision_tree)
    print("\\n========================================================\\n")

    # Create test dataframe for evaluation
    test_df = pd.concat([X_test, y_test], axis=1)

    # Model Evaluation
    tp = 0  # True Positive
    tn = 0  # True Negative
    fp = 0  # False Positive
    fn = 0  # False Negative

    predictions = []

    for i in range(len(test_df)):
        sample = test_df.iloc[i].to_dict()  # Convert row to dictionary
        actual = sample['Has heart disease? (Prediction Target)']
        predicted = predict(decision_tree, sample)

        # Store prediction
        patient_id = sample.get('person ID', i)
        predictions.append(f"{patient_id} {predicted}")

        # Update the counts based on the prediction
        if actual == 'Yes' and predicted == 'Yes':  # TP
            tp += 1
        elif actual == 'No' and predicted == 'No':  # TN
            tn += 1
        elif actual == 'No' and predicted == 'Yes':  # FP
            fp += 1
        elif actual == 'Yes' and predicted == 'No':  # FN
            fn += 1

    # Save predictions to file
    with open('predictions.txt', 'w') as f:
        for pred in predictions:
            f.write(f"{pred}\\n")

    # Calculate and print the metrics
    total_tests = len(test_df)
    accuracy = (tp + tn) / total_tests if total_tests > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

    print("------------------Model Evaluation Result----------------------")
    print(f"True Positive (TP): {tp}")
    print(f"True Negative (TN): {tn}")
    print(f"False Positive (FP): {fp}")
    print(f"False Negative (FN): {fn}")
    print(f"\\nAccuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall (Sensitivity): {recall:.4f}")
    print(f"F1 Score: {f1_score:.4f}")
    print(f"Specificity: {specificity:.4f}")
    print("------------------------------------------------------------------")

if __name__ == "__main__":
    main()

1. Preprocessing – Equal-Width Bucketing

2. Impurity Measure – Information Gain