This assignment focused on implementing a Decision Tree classifier to predict heart disease based on a real-world medical dataset used in the first HW.
Key components included choosing and justifying an impurity measure, handling both categorical and continuous attributes, and designing an efficient tree-based data structure.

https://www.datacamp.com/tutorial/decision-tree-classification-python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
''' -------------- Preprocessing: Used equal-width bucketing to discretize continuous variable --------------------- '''
def bucketize_continuous_values(value, thresholds):
"""
Bucketizes a continuous value into 0, 1, 2 based on given thresholds.
:param value: The value to bucketize.
:param thresholds: A list of 2 thresholds for bucketing (low, medium, high).
:return: 0, 1, or 2 depending on the value's range.
"""
if value <= thresholds[0]:
return 0 # Low
elif value <= thresholds[1]:
return 1 # Medium
else:
return 2 # High
def preprocessing(df):
"""
Preprocesses the dataset by bucketizing continuous attributes into three categories.
:param df: Original dataset (dataframe).
:return: Preprocessed dataframe with ordinal attributes instead of continuous attributes.
"""
# Define the bucketing thresholds
bucketization_pillar_dict = {
'age': [40, 60], # Bucketization for age (e.g. 0-40, 41-60, >60)
'resting blood pressure': [129, 164], # min:94, max:200
'maximum heart rate achieved': [115, 158], # min:71, max:202
'serum cholesterol in mg/dl': [272, 418], # min:126, max:564
'oldpeak = ST depression induced by exercise relative to rest': [2.1, 4.1] # min:0.0, max:6.2
}
# Iterate through each column in the dataframe
for column in df.columns:
# Check if the column needs to be bucketized
if column in bucketization_pillar_dict:
thresholds = bucketization_pillar_dict[column]
# Apply the bucketization for each row in the column
df[column] = df[column].apply(lambda x: bucketize_continuous_values(x, thresholds))
return df
''' --------------------------------- Used Information Gain as impurity measure ------------------------------------ '''
def entropy(df):
"""
Calculates the entropy of a target column in the dataframe.
:param df: Dataframe containing the target column.
:return: Entropy value.
"""
value_count = df.value_counts()
total = sum(value_count)
probability = [count_i / total for count_i in value_count]
return -np.sum([p_i * np.log2(p_i) for p_i in probability])
def information_gain(df, feature):
"""
Calculates the information gain for a given feature based on the entropy of the dataset.
:param df: The dataframe to split.
:param feature: The feature column on which to perform the split.
:return: Information gain for the feature.
"""
# Entropy at current node before splitting
total_entropy = entropy(df['Has heart disease? (Prediction Target)'])
values = df[feature].unique()
weighted_entropy = 0
for value in values:
subset = df[df[feature] == value]
# Entropy after splitting with 'feature'
weighted_entropy += (len(subset) / len(df)) * entropy(subset['Has heart disease? (Prediction Target)'])
return total_entropy - weighted_entropy
''' ---------------------------------- Build Tree using dictionary datastructure ----------------------------------- '''
def build_tree(df, depth=0):
"""
Recursively builds a decision tree using Information Gain as the splitting criterion.
:param df: The dataset to build the tree on.
:param depth: Current depth of the tree (used for stopping condition).
:return: The decision tree (recursive structure).
"""
# Stopping conditions
if (len(df['Has heart disease? (Prediction Target)'].unique()) == 1 or # All samples belong to one class
entropy(df['Has heart disease? (Prediction Target)']) < 0.3 or # Entropy below threshold
len(df) < 15 or # Too few samples
depth == 5): # Maximum depth reached
return df['Has heart disease? (Prediction Target)'].mode()[0] # Most common class
# Find the best feature based on information gain
best_gain = -1
best_feature = None
for feature in df.columns:
if feature != 'Has heart disease? (Prediction Target)' and feature != 'person ID':
gain = information_gain(df, feature)
if gain > best_gain:
best_gain = gain
best_feature = feature
# Prevent model from splitting on the same feature twice
if best_feature not in df.columns:
return df['Has heart disease? (Prediction Target)'].mode()[0]
# Split the dataset based on the best feature
values = df[best_feature].unique()
branches = {}
for value in values:
branches[value] = df[df[best_feature] == value]
tree = {best_feature: {}}
for value, subset in branches.items():
# Drop the used feature
subset = subset.drop(columns=[best_feature])
# Recursively build subtrees for each value branch
tree[best_feature][value] = build_tree(subset, depth + 1)
return tree
''' ------------------------------------ Make Predictions and Evaluate the Tree ------------------------------------ '''
def predict(tree, sample):
"""
Predicts the class label for a given sample using the decision tree.
:param tree: The decision tree.
:param sample: The sample to predict (dictionary format).
:return: Predicted class label.
"""
while isinstance(tree, dict): # Continue until a leaf node is reached
feature = next(iter(tree)) # Get the first key (best feature)
value = sample.get(feature) # Get the feature value in the sample
if value not in tree[feature]: # Handle unseen values
return 'Yes'
tree = tree[feature][value]
return tree # Leaf node, return class label
''' ---------------------------------------- Print tree structure ---------------------------------------- '''
def print_tree(tree, depth=0):
"""
Recursively prints the decision tree structure in a readable format.
:param tree: The decision tree dictionary.
:param depth: Current depth level for indentation.
"""
if not isinstance(tree, dict): # If the tree is a leaf node (a class label)
print(" " * depth + f"--> {tree}")
return
# Extract the feature used for splitting
feature = next(iter(tree))
print(" " * depth + f"[Feature: {feature}]") # Print the feature at current depth
for value, subtree in tree[feature].items(): # Iterate through branches
print(" " * depth + f" ├── {feature} = {value}")
print_tree(subtree, depth + 1) # Recursively print subtrees
''' ----------------------------------------------- Main Execution ------------------------------------------------- '''
def main():
"""
Main function to run the decision tree algorithm using data.csv file with 80-20 stratified split.
"""
# Load the dataset
file_path = 'data.csv'
df = pd.read_csv(file_path)
# Preprocess the data
df = preprocessing(df)
# Separate features and target
X = df.drop('Has heart disease? (Prediction Target)', axis=1)
y = df['Has heart disease? (Prediction Target)']
# Create stratified train-test split (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
# Recombine features and target for training
train_df = pd.concat([X_train, y_train], axis=1)
# Build the decision tree
decision_tree = build_tree(train_df)
# Print the tree structure
print("\\n================ Decision Tree Structure ================\\n")
print_tree(decision_tree)
print("\\n========================================================\\n")
# Create test dataframe for evaluation
test_df = pd.concat([X_test, y_test], axis=1)
# Model Evaluation
tp = 0 # True Positive
tn = 0 # True Negative
fp = 0 # False Positive
fn = 0 # False Negative
predictions = []
for i in range(len(test_df)):
sample = test_df.iloc[i].to_dict() # Convert row to dictionary
actual = sample['Has heart disease? (Prediction Target)']
predicted = predict(decision_tree, sample)
# Store prediction
patient_id = sample.get('person ID', i)
predictions.append(f"{patient_id} {predicted}")
# Update the counts based on the prediction
if actual == 'Yes' and predicted == 'Yes': # TP
tp += 1
elif actual == 'No' and predicted == 'No': # TN
tn += 1
elif actual == 'No' and predicted == 'Yes': # FP
fp += 1
elif actual == 'Yes' and predicted == 'No': # FN
fn += 1
# Save predictions to file
with open('predictions.txt', 'w') as f:
for pred in predictions:
f.write(f"{pred}\\n")
# Calculate and print the metrics
total_tests = len(test_df)
accuracy = (tp + tn) / total_tests if total_tests > 0 else 0
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
print("------------------Model Evaluation Result----------------------")
print(f"True Positive (TP): {tp}")
print(f"True Negative (TN): {tn}")
print(f"False Positive (FP): {fp}")
print(f"False Negative (FN): {fn}")
print(f"\\nAccuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall (Sensitivity): {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Specificity: {specificity:.4f}")
print("------------------------------------------------------------------")
if __name__ == "__main__":
main()