Wine Data Mining: Machine Learning Analysis of Wine Quality
Wine Data Mining: Machine Learning Analysis of Wine Quality
In this post, I’ll share insights from my Wine Data Mining project, which demonstrates comprehensive data mining and machine learning techniques applied to wine quality datasets, showcasing predictive modeling, statistical analysis, and advanced visualization.
Project Overview
The Wine Data Mining project applies advanced machine learning and statistical analysis techniques to wine quality datasets. This project demonstrates data preprocessing, feature engineering, model selection, evaluation, and visualization using Python’s scientific computing ecosystem.
Technical Architecture
Project Structure
WineDataMining/
├── data/
│ ├── raw/
│ │ ├── winequality-red.csv
│ │ ├── winequality-white.csv
│ │ └── winequality-combined.csv
│ ├── processed/
│ │ ├── cleaned_data.csv
│ │ ├── features.csv
│ │ └── scaled_data.csv
│ └── external/
│ ├── wine_reviews.csv
│ └── wine_metadata.json
├── src/
│ ├── data_preprocessing/
│ │ ├── data_loader.py
│ │ ├── data_cleaner.py
│ │ ├── feature_engineering.py
│ │ └── data_scaler.py
│ ├── analysis/
│ │ ├── exploratory_analysis.py
│ │ ├── statistical_analysis.py
│ │ ├── correlation_analysis.py
│ │ └── hypothesis_testing.py
│ ├── modeling/
│ │ ├── model_selection.py
│ │ ├── model_training.py
│ │ ├── model_evaluation.py
│ │ └── hyperparameter_tuning.py
│ ├── visualization/
│ │ ├── data_visualization.py
│ │ ├── model_visualization.py
│ │ └── report_generation.py
│ └── utils/
│ ├── config.py
│ ├── logger.py
│ └── helpers.py
├── notebooks/
│ ├── 01_data_exploration.ipynb
│ ├── 02_feature_engineering.ipynb
│ ├── 03_model_development.ipynb
│ └── 04_model_evaluation.ipynb
├── models/
│ ├── trained_models/
│ ├── model_artifacts/
│ └── model_metadata/
├── reports/
│ ├── analysis_reports/
│ ├── model_reports/
│ └── visualizations/
├── tests/
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── requirements.txt
├── setup.py
└── README.mdCore Implementation
Data Preprocessing Pipeline
# src/data_preprocessing/data_loader.py
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from typing import Dict, List, Tuple, Optional
class WineDataLoader:
def __init__(self, data_dir: str = "data/raw"):
self.data_dir = Path(data_dir)
self.logger = logging.getLogger(__name__)
def load_red_wine_data(self) -> pd.DataFrame:
"""Load red wine quality dataset"""
file_path = self.data_dir / "winequality-red.csv"
if not file_path.exists():
raise FileNotFoundError(f"Red wine data file not found: {file_path}")
df = pd.read_csv(file_path, sep=';')
df['wine_type'] = 'red'
self.logger.info(f"Loaded red wine data: {df.shape}")
return df
def load_white_wine_data(self) -> pd.DataFrame:
"""Load white wine quality dataset"""
file_path = self.data_dir / "winequality-white.csv"
if not file_path.exists():
raise FileNotFoundError(f"White wine data file not found: {file_path}")
df = pd.read_csv(file_path, sep=';')
df['wine_type'] = 'white'
self.logger.info(f"Loaded white wine data: {df.shape}")
return df
def load_combined_data(self) -> pd.DataFrame:
"""Load and combine red and white wine datasets"""
red_wine = self.load_red_wine_data()
white_wine = self.load_white_wine_data()
combined_df = pd.concat([red_wine, white_wine], ignore_index=True)
self.logger.info(f"Combined dataset shape: {combined_df.shape}")
return combined_df
def load_external_data(self) -> pd.DataFrame:
"""Load external wine review data"""
file_path = self.data_dir / "external" / "wine_reviews.csv"
if not file_path.exists():
self.logger.warning("External wine review data not found")
return pd.DataFrame()
df = pd.read_csv(file_path)
self.logger.info(f"Loaded external data: {df.shape}")
return df
def get_data_summary(self, df: pd.DataFrame) -> Dict:
"""Get comprehensive data summary"""
summary = {
'shape': df.shape,
'columns': list(df.columns),
'dtypes': df.dtypes.to_dict(),
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'memory_usage': df.memory_usage(deep=True).sum(),
'numeric_summary': df.describe().to_dict(),
'categorical_summary': {}
}
# Categorical columns summary
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
summary['categorical_summary'][col] = {
'unique_values': df[col].nunique(),
'value_counts': df[col].value_counts().to_dict()
}
return summary
# src/data_preprocessing/data_cleaner.py
import pandas as pd
import numpy as np
from typing import Tuple, List, Dict
import logging
class WineDataCleaner:
def __init__(self):
self.logger = logging.getLogger(__name__)
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Comprehensive data cleaning pipeline"""
original_shape = df.shape
# Remove duplicates
df = self.remove_duplicates(df)
# Handle missing values
df = self.handle_missing_values(df)
# Handle outliers
df = self.handle_outliers(df)
# Standardize column names
df = self.standardize_columns(df)
# Validate data types
df = self.validate_data_types(df)
self.logger.info(f"Data cleaning completed: {original_shape} -> {df.shape}")
return df
def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
"""Remove duplicate rows"""
initial_count = len(df)
df = df.drop_duplicates()
removed_count = initial_count - len(df)
if removed_count > 0:
self.logger.info(f"Removed {removed_count} duplicate rows")
return df
def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values using appropriate strategies"""
missing_counts = df.isnull().sum()
for column, missing_count in missing_counts.items():
if missing_count > 0:
if df[column].dtype in ['float64', 'int64']:
# Use median for numeric columns
median_value = df[column].median()
df[column].fillna(median_value, inplace=True)
self.logger.info(f"Filled {missing_count} missing values in {column} with median: {median_value}")
else:
# Use mode for categorical columns
mode_value = df[column].mode().iloc[0] if not df[column].mode().empty else 'Unknown'
df[column].fillna(mode_value, inplace=True)
self.logger.info(f"Filled {missing_count} missing values in {column} with mode: {mode_value}")
return df
def handle_outliers(self, df: pd.DataFrame, method: str = 'iqr') -> pd.DataFrame:
"""Handle outliers using specified method"""
numeric_columns = df.select_dtypes(include=[np.number]).columns
for column in numeric_columns:
if column == 'quality': # Don't remove outliers from target variable
continue
if method == 'iqr':
df = self.remove_outliers_iqr(df, column)
elif method == 'zscore':
df = self.remove_outliers_zscore(df, column)
elif method == 'isolation_forest':
df = self.remove_outliers_isolation_forest(df, column)
return df
def remove_outliers_iqr(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
"""Remove outliers using IQR method"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
initial_count = len(df)
df = df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
removed_count = initial_count - len(df)
if removed_count > 0:
self.logger.info(f"Removed {removed_count} outliers from {column} using IQR method")
return df
def remove_outliers_zscore(self, df: pd.DataFrame, column: str, threshold: float = 3.0) -> pd.DataFrame:
"""Remove outliers using Z-score method"""
z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
initial_count = len(df)
df = df[z_scores < threshold]
removed_count = initial_count - len(df)
if removed_count > 0:
self.logger.info(f"Removed {removed_count} outliers from {column} using Z-score method")
return df
def remove_outliers_isolation_forest(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
"""Remove outliers using Isolation Forest"""
from sklearn.ensemble import IsolationForest
# Reshape data for Isolation Forest
data = df[[column]].values
# Fit Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
outlier_labels = iso_forest.fit_predict(data)
initial_count = len(df)
df = df[outlier_labels == 1]
removed_count = initial_count - len(df)
if removed_count > 0:
self.logger.info(f"Removed {removed_count} outliers from {column} using Isolation Forest")
return df
def standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Standardize column names"""
df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')
return df
def validate_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate and convert data types"""
# Convert numeric columns
numeric_columns = ['fixed_acidity', 'volatile_acidity', 'citric_acid',
'residual_sugar', 'chlorides', 'free_sulfur_dioxide',
'total_sulfur_dioxide', 'density', 'ph', 'sulphates', 'alcohol']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Convert categorical columns
if 'wine_type' in df.columns:
df['wine_type'] = df['wine_type'].astype('category')
return df
# src/data_preprocessing/feature_engineering.py
import pandas as pd
import numpy as np
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from typing import Tuple, List, Dict
import logging
class WineFeatureEngineer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.scaler = StandardScaler()
self.poly_features = PolynomialFeatures(degree=2, include_bias=False)
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Comprehensive feature engineering pipeline"""
df = df.copy()
# Create derived features
df = self.create_derived_features(df)
# Create interaction features
df = self.create_interaction_features(df)
# Create polynomial features
df = self.create_polynomial_features(df)
# Create statistical features
df = self.create_statistical_features(df)
# Create quality-based features
df = self.create_quality_features(df)
self.logger.info(f"Feature engineering completed. New shape: {df.shape}")
return df
def create_derived_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create derived features from existing ones"""
# Acidity ratio
df['acidity_ratio'] = df['fixed_acidity'] / (df['volatile_acidity'] + 1e-8)
# Sulfur dioxide ratio
df['sulfur_ratio'] = df['free_sulfur_dioxide'] / (df['total_sulfur_dioxide'] + 1e-8)
# Alcohol to acidity ratio
df['alcohol_acidity_ratio'] = df['alcohol'] / (df['fixed_acidity'] + 1e-8)
# Sugar to alcohol ratio
df['sugar_alcohol_ratio'] = df['residual_sugar'] / (df['alcohol'] + 1e-8)
# Total acidity
df['total_acidity'] = df['fixed_acidity'] + df['volatile_acidity'] + df['citric_acid']
# Sulfur dioxide efficiency
df['sulfur_efficiency'] = df['free_sulfur_dioxide'] / (df['total_sulfur_dioxide'] + 1e-8)
return df
def create_interaction_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create interaction features between important variables"""
# Alcohol and acidity interaction
df['alcohol_volatile_acidity'] = df['alcohol'] * df['volatile_acidity']
# Alcohol and sulfur dioxide interaction
df['alcohol_sulfur'] = df['alcohol'] * df['total_sulfur_dioxide']
# pH and acidity interaction
df['ph_acidity'] = df['ph'] * df['fixed_acidity']
# Density and alcohol interaction
df['density_alcohol'] = df['density'] * df['alcohol']
return df
def create_polynomial_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create polynomial features for important variables"""
important_features = ['alcohol', 'volatile_acidity', 'sulphates', 'total_sulfur_dioxide']
for feature in important_features:
if feature in df.columns:
df[f'{feature}_squared'] = df[feature] ** 2
df[f'{feature}_cubed'] = df[feature] ** 3
return df
def create_statistical_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create statistical features"""
numeric_columns = df.select_dtypes(include=[np.number]).columns
# Rolling statistics (if we had time series data)
# For now, create features based on distribution properties
for col in numeric_columns:
if col != 'quality': # Don't create features from target
# Z-score normalization
df[f'{col}_zscore'] = (df[col] - df[col].mean()) / df[col].std()
# Min-max normalization
df[f'{col}_minmax'] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
return df
def create_quality_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create features based on quality categories"""
# Quality categories
df['quality_category'] = pd.cut(df['quality'],
bins=[0, 4, 6, 10],
labels=['low', 'medium', 'high'])
# Quality binary classification
df['quality_binary'] = (df['quality'] >= 6).astype(int)
# Quality percentiles
df['quality_percentile'] = df['quality'].rank(pct=True)
return df
def select_features(self, X: pd.DataFrame, y: pd.Series,
method: str = 'mutual_info', k: int = 20) -> Tuple[pd.DataFrame, List[str]]:
"""Select best features using specified method"""
if method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_regression, k=k)
elif method == 'f_regression':
selector = SelectKBest(score_func=f_regression, k=k)
else:
raise ValueError("Method must be 'mutual_info' or 'f_regression'")
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
self.logger.info(f"Selected {len(selected_features)} features using {method}")
return pd.DataFrame(X_selected, columns=selected_features), selected_features
def scale_features(self, X: pd.DataFrame, fit: bool = True) -> pd.DataFrame:
"""Scale features using StandardScaler"""
if fit:
X_scaled = self.scaler.fit_transform(X)
else:
X_scaled = self.scaler.transform(X)
return pd.DataFrame(X_scaled, columns=X.columns, index=X.index)Machine Learning Models
# src/modeling/model_training.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, AdaBoostRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from typing import Dict, List, Tuple, Any
import logging
import joblib
from pathlib import Path
class WineModelTrainer:
def __init__(self, models_dir: str = "models/trained_models"):
self.models_dir = Path(models_dir)
self.models_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
self.models = {}
self.model_scores = {}
def prepare_data(self, df: pd.DataFrame, target_column: str = 'quality') -> Tuple[pd.DataFrame, pd.Series]:
"""Prepare data for training"""
# Separate features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# Remove non-numeric columns for now
X = X.select_dtypes(include=[np.number])
self.logger.info(f"Prepared data: X shape {X.shape}, y shape {y.shape}")
return X, y
def train_models(self, X: pd.DataFrame, y: pd.Series,
test_size: float = 0.2, random_state: int = 42) -> Dict[str, Any]:
"""Train multiple models and compare performance"""
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
# Define models
models = {
'linear_regression': LinearRegression(),
'ridge': Ridge(alpha=1.0),
'lasso': Lasso(alpha=0.1),
'elastic_net': ElasticNet(alpha=0.1, l1_ratio=0.5),
'random_forest': RandomForestRegressor(n_estimators=100, random_state=random_state),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=random_state),
'ada_boost': AdaBoostRegressor(n_estimators=100, random_state=random_state),
'svr': SVR(kernel='rbf'),
'knn': KNeighborsRegressor(n_neighbors=5),
'decision_tree': DecisionTreeRegressor(random_state=random_state)
}
results = {}
for name, model in models.items():
self.logger.info(f"Training {name}...")
# Train model
model.fit(X_train, y_train)
# Make predictions
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Calculate metrics
train_metrics = self.calculate_metrics(y_train, y_pred_train)
test_metrics = self.calculate_metrics(y_test, y_pred_test)
# Cross-validation score
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='neg_mean_squared_error')
cv_mean = -cv_scores.mean()
cv_std = cv_scores.std()
# Store results
results[name] = {
'model': model,
'train_metrics': train_metrics,
'test_metrics': test_metrics,
'cv_mean': cv_mean,
'cv_std': cv_std,
'predictions': {
'train': y_pred_train,
'test': y_pred_test
}
}
# Save model
self.save_model(model, name)
self.logger.info(f"{name} - Test RMSE: {test_metrics['rmse']:.3f}, R²: {test_metrics['r2']:.3f}")
self.models = results
return results
def hyperparameter_tuning(self, X: pd.DataFrame, y: pd.Series,
model_name: str = 'random_forest') -> Dict[str, Any]:
"""Perform hyperparameter tuning for specified model"""
# Define parameter grids
param_grids = {
'random_forest': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
},
'gradient_boosting': {
'n_estimators': [50, 100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'max_depth': [3, 5, 7],
'subsample': [0.8, 0.9, 1.0]
},
'svr': {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
'kernel': ['rbf', 'linear', 'poly']
},
'ridge': {
'alpha': [0.1, 1, 10, 100, 1000]
},
'lasso': {
'alpha': [0.01, 0.1, 1, 10, 100]
}
}
if model_name not in param_grids:
raise ValueError(f"Hyperparameter tuning not supported for {model_name}")
# Get base model
base_models = {
'random_forest': RandomForestRegressor(random_state=42),
'gradient_boosting': GradientBoostingRegressor(random_state=42),
'svr': SVR(),
'ridge': Ridge(),
'lasso': Lasso()
}
base_model = base_models[model_name]
param_grid = param_grids[model_name]
# Perform grid search
grid_search = GridSearchCV(
base_model,
param_grid,
cv=5,
scoring='neg_mean_squared_error',
n_jobs=-1,
verbose=1
)
self.logger.info(f"Starting hyperparameter tuning for {model_name}...")
grid_search.fit(X, y)
# Get best model and results
best_model = grid_search.best_estimator_
best_params = grid_search.best_params_
best_score = -grid_search.best_score_
self.logger.info(f"Best parameters for {model_name}: {best_params}")
self.logger.info(f"Best CV score: {best_score:.3f}")
return {
'best_model': best_model,
'best_params': best_params,
'best_score': best_score,
'cv_results': grid_search.cv_results_
}
def calculate_metrics(self, y_true: pd.Series, y_pred: np.ndarray) -> Dict[str, float]:
"""Calculate regression metrics"""
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
return {
'mse': mse,
'rmse': rmse,
'mae': mae,
'r2': r2
}
def save_model(self, model: Any, name: str) -> None:
"""Save trained model"""
model_path = self.models_dir / f"{name}_model.pkl"
joblib.dump(model, model_path)
self.logger.info(f"Model saved: {model_path}")
def load_model(self, name: str) -> Any:
"""Load trained model"""
model_path = self.models_dir / f"{name}_model.pkl"
if not model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
model = joblib.load(model_path)
self.logger.info(f"Model loaded: {model_path}")
return model
def get_feature_importance(self, model_name: str, feature_names: List[str]) -> pd.DataFrame:
"""Get feature importance for tree-based models"""
if model_name not in self.models:
raise ValueError(f"Model {model_name} not found in trained models")
model = self.models[model_name]['model']
if hasattr(model, 'feature_importances_'):
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
return importance_df
else:
self.logger.warning(f"Model {model_name} does not have feature importance")
return pd.DataFrame()
def predict_quality(self, X: pd.DataFrame, model_name: str = 'random_forest') -> np.ndarray:
"""Predict wine quality using specified model"""
model = self.load_model(model_name)
predictions = model.predict(X)
return predictions
def get_model_comparison(self) -> pd.DataFrame:
"""Get comparison of all trained models"""
comparison_data = []
for name, results in self.models.items():
comparison_data.append({
'model': name,
'train_rmse': results['train_metrics']['rmse'],
'test_rmse': results['test_metrics']['rmse'],
'train_r2': results['train_metrics']['r2'],
'test_r2': results['test_metrics']['r2'],
'cv_mean': results['cv_mean'],
'cv_std': results['cv_std']
})
comparison_df = pd.DataFrame(comparison_data)
comparison_df = comparison_df.sort_values('test_r2', ascending=False)
return comparison_dfAdvanced Analysis and Visualization
# src/analysis/exploratory_analysis.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import logging
from typing import Dict, List, Tuple
class WineExploratoryAnalysis:
def __init__(self):
self.logger = logging.getLogger(__name__)
plt.style.use('seaborn-v0_8')
def comprehensive_analysis(self, df: pd.DataFrame) -> Dict:
"""Perform comprehensive exploratory data analysis"""
analysis_results = {}
# Basic statistics
analysis_results['basic_stats'] = self.basic_statistics(df)
# Quality distribution analysis
analysis_results['quality_analysis'] = self.analyze_quality_distribution(df)
# Correlation analysis
analysis_results['correlation'] = self.correlation_analysis(df)
# Feature distribution analysis
analysis_results['feature_distributions'] = self.analyze_feature_distributions(df)
# Wine type comparison
analysis_results['wine_type_comparison'] = self.compare_wine_types(df)
# Outlier analysis
analysis_results['outlier_analysis'] = self.analyze_outliers(df)
return analysis_results
def basic_statistics(self, df: pd.DataFrame) -> Dict:
"""Calculate basic statistics"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
stats_summary = {
'shape': df.shape,
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'numeric_summary': df[numeric_cols].describe().to_dict(),
'categorical_summary': {}
}
# Categorical columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
stats_summary['categorical_summary'][col] = {
'unique_values': df[col].nunique(),
'value_counts': df[col].value_counts().to_dict()
}
return stats_summary
def analyze_quality_distribution(self, df: pd.DataFrame) -> Dict:
"""Analyze quality distribution"""
quality_stats = {
'distribution': df['quality'].value_counts().sort_index().to_dict(),
'mean': df['quality'].mean(),
'median': df['quality'].median(),
'std': df['quality'].std(),
'skewness': stats.skew(df['quality']),
'kurtosis': stats.kurtosis(df['quality'])
}
# Quality categories
quality_stats['categories'] = {
'low_quality': len(df[df['quality'] <= 4]),
'medium_quality': len(df[(df['quality'] >= 5) & (df['quality'] <= 6)]),
'high_quality': len(df[df['quality'] >= 7])
}
return quality_stats
def correlation_analysis(self, df: pd.DataFrame) -> Dict:
"""Perform correlation analysis"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
correlation_matrix = df[numeric_cols].corr()
# Find strong correlations with quality
quality_correlations = correlation_matrix['quality'].abs().sort_values(ascending=False)
# Find feature correlations
feature_correlations = []
for i in range(len(correlation_matrix.columns)):
for j in range(i+1, len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if abs(corr_value) > 0.5: # Strong correlation threshold
feature_correlations.append({
'feature1': correlation_matrix.columns[i],
'feature2': correlation_matrix.columns[j],
'correlation': corr_value
})
return {
'correlation_matrix': correlation_matrix,
'quality_correlations': quality_correlations,
'feature_correlations': feature_correlations
}
def analyze_feature_distributions(self, df: pd.DataFrame) -> Dict:
"""Analyze feature distributions"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
distributions = {}
for col in numeric_cols:
if col != 'quality': # Skip target variable
distributions[col] = {
'mean': df[col].mean(),
'median': df[col].median(),
'std': df[col].std(),
'skewness': stats.skew(df[col]),
'kurtosis': stats.kurtosis(df[col]),
'normality_test': stats.normaltest(df[col])
}
return distributions
def compare_wine_types(self, df: pd.DataFrame) -> Dict:
"""Compare red and white wine characteristics"""
if 'wine_type' not in df.columns:
return {}
comparison = {}
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
red_data = df[df['wine_type'] == 'red'][col]
white_data = df[df['wine_type'] == 'white'][col]
# Statistical test
t_stat, p_value = stats.ttest_ind(red_data, white_data)
comparison[col] = {
'red_mean': red_data.mean(),
'white_mean': white_data.mean(),
'red_std': red_data.std(),
'white_std': white_data.std(),
't_statistic': t_stat,
'p_value': p_value,
'significant': p_value < 0.05
}
return comparison
def analyze_outliers(self, df: pd.DataFrame) -> Dict:
"""Analyze outliers in the dataset"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
outlier_analysis = {}
for col in numeric_cols:
if col != 'quality':
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
outlier_analysis[col] = {
'outlier_count': len(outliers),
'outlier_percentage': len(outliers) / len(df) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound,
'outlier_indices': outliers.index.tolist()
}
return outlier_analysis
def create_visualizations(self, df: pd.DataFrame, save_path: str = "reports/visualizations") -> None:
"""Create comprehensive visualizations"""
import os
os.makedirs(save_path, exist_ok=True)
# Quality distribution
self.plot_quality_distribution(df, save_path)
# Feature distributions
self.plot_feature_distributions(df, save_path)
# Correlation heatmap
self.plot_correlation_heatmap(df, save_path)
# Wine type comparison
if 'wine_type' in df.columns:
self.plot_wine_type_comparison(df, save_path)
# Quality vs features scatter plots
self.plot_quality_vs_features(df, save_path)
# Box plots for outlier analysis
self.plot_outlier_analysis(df, save_path)
def plot_quality_distribution(self, df: pd.DataFrame, save_path: str) -> None:
"""Plot quality distribution"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Quality histogram
axes[0, 0].hist(df['quality'], bins=10, alpha=0.7, color='skyblue', edgecolor='black')
axes[0, 0].set_title('Quality Distribution')
axes[0, 0].set_xlabel('Quality Score')
axes[0, 0].set_ylabel('Frequency')
# Quality box plot
axes[0, 1].boxplot(df['quality'])
axes[0, 1].set_title('Quality Box Plot')
axes[0, 1].set_ylabel('Quality Score')
# Quality by wine type
if 'wine_type' in df.columns:
df.boxplot(column='quality', by='wine_type', ax=axes[1, 0])
axes[1, 0].set_title('Quality by Wine Type')
axes[1, 0].set_xlabel('Wine Type')
axes[1, 0].set_ylabel('Quality Score')
# Quality categories
quality_categories = pd.cut(df['quality'], bins=[0, 4, 6, 10], labels=['Low', 'Medium', 'High'])
quality_categories.value_counts().plot(kind='bar', ax=axes[1, 1], color='lightcoral')
axes[1, 1].set_title('Quality Categories')
axes[1, 1].set_xlabel('Quality Category')
axes[1, 1].set_ylabel('Count')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(f"{save_path}/quality_distribution.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_correlation_heatmap(self, df: pd.DataFrame, save_path: str) -> None:
"""Plot correlation heatmap"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
correlation_matrix = df[numeric_cols].corr()
plt.figure(figsize=(12, 10))
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
sns.heatmap(correlation_matrix, mask=mask, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=0.5, cbar_kws={"shrink": 0.8})
plt.title('Feature Correlation Heatmap')
plt.tight_layout()
plt.savefig(f"{save_path}/correlation_heatmap.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_quality_vs_features(self, df: pd.DataFrame, save_path: str) -> None:
"""Plot quality vs important features"""
important_features = ['alcohol', 'volatile_acidity', 'sulphates', 'total_sulfur_dioxide']
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.ravel()
for i, feature in enumerate(important_features):
if feature in df.columns:
axes[i].scatter(df[feature], df['quality'], alpha=0.6, color='steelblue')
axes[i].set_xlabel(feature)
axes[i].set_ylabel('Quality')
axes[i].set_title(f'Quality vs {feature}')
# Add trend line
z = np.polyfit(df[feature], df['quality'], 1)
p = np.poly1d(z)
axes[i].plot(df[feature], p(df[feature]), "r--", alpha=0.8)
plt.tight_layout()
plt.savefig(f"{save_path}/quality_vs_features.png", dpi=300, bbox_inches='tight')
plt.close()Lessons Learned
Data Mining and Machine Learning
- Data Preprocessing: Comprehensive data cleaning and feature engineering
- Model Selection: Systematic comparison of multiple algorithms
- Hyperparameter Tuning: Optimizing model performance through grid search
- Evaluation Metrics: Proper use of regression metrics and cross-validation
Statistical Analysis
- Exploratory Data Analysis: Comprehensive EDA techniques
- Correlation Analysis: Understanding feature relationships
- Hypothesis Testing: Statistical significance testing
- Outlier Detection: Multiple outlier detection methods
Python Data Science
- Pandas: Advanced data manipulation and analysis
- Scikit-learn: Machine learning pipeline implementation
- Matplotlib/Seaborn: Data visualization techniques
- Jupyter Notebooks: Interactive data analysis
Future Enhancements
Advanced Features
- Deep Learning: Neural network implementation for wine quality prediction
- Ensemble Methods: Advanced ensemble techniques
- Feature Selection: Automated feature selection algorithms
- Model Interpretability: SHAP and LIME for model explanation
Data Sources
- External Data: Integration with wine review datasets
- Time Series: Analysis of wine aging and quality over time
- Geographic Data: Regional wine quality analysis
- Sensory Data: Integration with sensory evaluation data
Conclusion
The Wine Data Mining project demonstrates comprehensive data science and machine learning skills applied to real-world datasets. Key achievements include:
- Data Preprocessing: Advanced data cleaning and feature engineering
- Machine Learning: Multiple algorithm implementation and comparison
- Statistical Analysis: Comprehensive exploratory data analysis
- Visualization: Advanced data visualization techniques
- Model Evaluation: Proper model evaluation and validation
- Documentation: Clear documentation and reproducible analysis
The project is available on GitHub and serves as a comprehensive example of data science project implementation and machine learning best practices.
This project represents my comprehensive approach to data science and demonstrates how machine learning can be applied to solve real-world problems. The lessons learned here continue to influence my approach to data analysis and predictive modeling.