Predictive Analytics and User Behavior Forecasting

Predictive analytics has transformed from a competitive advantage to a business necessity. SaaS companies implementing predictive user behavior models report 25% reduction in churn and 40% improvement in conversion rates. According to McKinsey research, organizations using advanced analytics outperform their peers by 85% in sales growth and more than 25% in gross margin.

In this comprehensive guide, we'll build intelligent analytics systems using Facebook Prophet and scikit-learn. You'll learn to predict user churn, forecast conversions, identify high-value users, optimize user journeys, detect anomalies, and create proactive engagement strategies. We'll cover complete implementation patterns including real-time prediction APIs, data pipelines, and production deployment.

Understanding Predictive Analytics for Web Applications

Predictive analytics applies statistical algorithms and machine learning to historical data to forecast future outcomes. For web applications, this means anticipating user behavior before it happens, enabling proactive rather than reactive strategies. According to Gartner, by 2025, 75% of companies will shift from piloting to operationalizing AI.

Core Prediction Categories

User behavior predictions fall into several key categories, each requiring different modeling approaches:

// prediction-types.ts

interface PredictionTypes {
    // Classification: Will this happen? (Yes/No)
    churn: ChurnPrediction;
    conversion: ConversionPrediction;

    // Regression: How much/when?
    lifetimeValue: LTVPrediction;
    nextPurchaseTime: TimePrediction;

    // Time Series: What's the trend?
    dailyActiveUsers: TimeSeriesForecast;
    revenueProjection: TimeSeriesForecast;

    // Anomaly Detection: Is this unusual?
    fraudDetection: AnomalyPrediction;
    usageSpike: AnomalyPrediction;
}

interface ChurnPrediction {
    userId: string;
    churnProbability: number;        // 0-1
    riskLevel: 'low' | 'medium' | 'high' | 'critical';
    keyRiskFactors: RiskFactor[];
    recommendedActions: RetentionAction[];
    predictedChurnDate?: Date;
}

interface RiskFactor {
    factor: string;
    impact: number;                  // -1 to 1
    description: string;
    currentValue: number;
    riskThreshold: number;
}

interface ConversionPrediction {
    userId: string;
    conversionProbability: number;
    expectedConversionTime: Date;
    optimalTouchpoint: string;
    recommendedOffer: Offer;
}

interface TimeSeriesForecast {
    metric: string;
    predictions: ForecastPoint[];
    confidenceInterval: {
        lower: number[];
        upper: number[];
    };
    trend: 'increasing' | 'decreasing' | 'stable';
    seasonality: SeasonalComponent[];
}

Building a Churn Prediction System

Churn prediction is often the highest-impact use case for predictive analytics. By identifying at-risk customers early, you can intervene before they leave. Research from Harvard Business Review shows that acquiring a new customer costs 5-25x more than retaining an existing one.

Feature Engineering for Churn

The quality of your features determines model performance. Here's a comprehensive feature engineering pipeline:

# churn_features.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from dataclasses import dataclass

@dataclass
class UserFeatures:
    """Complete feature set for churn prediction"""
    user_id: str

    # Engagement metrics
    login_frequency_7d: float
    login_frequency_30d: float
    login_frequency_trend: float  # 7d vs 30d ratio
    sessions_per_login: float
    avg_session_duration_minutes: float
    session_duration_trend: float

    # Activity metrics
    features_used_count: int
    feature_usage_breadth: float  # unique features / total features
    core_feature_usage_7d: int
    actions_per_session: float

    # Value metrics
    current_plan: str
    plan_value_monthly: float
    days_since_signup: int
    total_lifetime_value: float
    payment_failures_count: int

    # Support metrics
    support_tickets_30d: int
    avg_ticket_resolution_hours: float
    negative_feedback_count: int
    nps_score: float

    # Social metrics
    team_size: int
    active_team_members_ratio: float
    shared_content_count: int

    # Temporal metrics
    days_since_last_login: int
    days_since_last_core_action: int

    # Derived metrics
    engagement_score: float
    health_score: float


class ChurnFeatureEngineer:
    """Engineer features for churn prediction from raw event data"""

    def __init__(self, events_df: pd.DataFrame, users_df: pd.DataFrame):
        self.events = events_df
        self.users = users_df
        self.now = datetime.now()

    def compute_features(self, user_id: str) -> UserFeatures:
        """Compute all features for a single user"""
        user_events = self.events[self.events['user_id'] == user_id]
        user_data = self.users[self.users['user_id'] == user_id].iloc[0]

        return UserFeatures(
            user_id=user_id,
            **self._compute_engagement_features(user_events),
            **self._compute_activity_features(user_events),
            **self._compute_value_features(user_data),
            **self._compute_support_features(user_id),
            **self._compute_social_features(user_id),
            **self._compute_temporal_features(user_events),
            **self._compute_derived_features(user_events, user_data)
        )

    def _compute_engagement_features(self, events: pd.DataFrame) -> Dict:
        """Calculate engagement-related features"""
        logins = events[events['event_type'] == 'login']

        # Login frequency
        logins_7d = len(logins[logins['timestamp'] >= self.now - timedelta(days=7)])
        logins_30d = len(logins[logins['timestamp'] >= self.now - timedelta(days=30)])

        # Trend: if 7d rate is lower than expected from 30d, user is declining
        expected_7d = logins_30d * (7/30)
        login_trend = logins_7d / max(expected_7d, 1)

        # Session analysis
        sessions = events.groupby('session_id').agg({
            'timestamp': ['min', 'max', 'count']
        })
        sessions.columns = ['start', 'end', 'event_count']
        sessions['duration'] = (sessions['end'] - sessions['start']).dt.total_seconds() / 60

        return {
            'login_frequency_7d': logins_7d,
            'login_frequency_30d': logins_30d,
            'login_frequency_trend': login_trend,
            'sessions_per_login': len(sessions) / max(logins_30d, 1),
            'avg_session_duration_minutes': sessions['duration'].mean() or 0,
            'session_duration_trend': self._compute_duration_trend(sessions)
        }

    def _compute_activity_features(self, events: pd.DataFrame) -> Dict:
        """Calculate activity-related features"""
        feature_events = events[events['event_type'] == 'feature_use']

        unique_features = feature_events['feature_name'].nunique()
        total_features = 25  # Total features in product

        core_features = ['dashboard', 'reports', 'export', 'integration']
        core_usage = feature_events[
            (feature_events['feature_name'].isin(core_features)) &
            (feature_events['timestamp'] >= self.now - timedelta(days=7))
        ].shape[0]

        sessions = events.groupby('session_id').size()

        return {
            'features_used_count': unique_features,
            'feature_usage_breadth': unique_features / total_features,
            'core_feature_usage_7d': core_usage,
            'actions_per_session': sessions.mean() if len(sessions) > 0 else 0
        }

    def _compute_derived_features(
        self,
        events: pd.DataFrame,
        user: pd.Series
    ) -> Dict:
        """Calculate composite health and engagement scores"""

        # Engagement score (0-100)
        engagement_factors = {
            'login_recency': max(0, 100 - self._days_since_last_login(events) * 5),
            'session_frequency': min(100, self._sessions_last_7d(events) * 15),
            'feature_breadth': self._feature_breadth(events) * 100,
            'session_depth': min(100, self._avg_session_actions(events) * 10)
        }
        engagement_score = np.mean(list(engagement_factors.values()))

        # Health score (0-100) - weighted composite
        health_factors = {
            'engagement': engagement_score * 0.3,
            'support_sentiment': self._support_sentiment_score(user.user_id) * 0.2,
            'payment_health': (100 - user.get('payment_failures', 0) * 20) * 0.2,
            'growth_trend': self._growth_trend_score(events) * 0.3
        }
        health_score = sum(health_factors.values())

        return {
            'engagement_score': engagement_score,
            'health_score': health_score
        }

    def batch_compute_features(self, user_ids: List[str]) -> pd.DataFrame:
        """Compute features for multiple users efficiently"""
        features_list = []

        for user_id in user_ids:
            try:
                features = self.compute_features(user_id)
                features_list.append(features.__dict__)
            except Exception as e:
                print(f"Error computing features for {user_id}: {e}")
                continue

        return pd.DataFrame(features_list)

Training the Churn Model with scikit-learn

With features engineered, we can train a production-grade churn prediction model:

# churn_model.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import (
    classification_report, roc_auc_score, precision_recall_curve,
    average_precision_score
)
import joblib
from typing import Tuple, Dict, List
import shap

class ChurnPredictionModel:
    """Production-ready churn prediction model"""

    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.label_encoders = {}
        self.feature_names = []
        self.shap_explainer = None

        # Features to use
        self.numeric_features = [
            'login_frequency_7d', 'login_frequency_30d', 'login_frequency_trend',
            'sessions_per_login', 'avg_session_duration_minutes', 'session_duration_trend',
            'features_used_count', 'feature_usage_breadth', 'core_feature_usage_7d',
            'actions_per_session', 'plan_value_monthly', 'days_since_signup',
            'total_lifetime_value', 'payment_failures_count', 'support_tickets_30d',
            'avg_ticket_resolution_hours', 'negative_feedback_count', 'nps_score',
            'team_size', 'active_team_members_ratio', 'shared_content_count',
            'days_since_last_login', 'days_since_last_core_action',
            'engagement_score', 'health_score'
        ]
        self.categorical_features = ['current_plan']

    def prepare_features(
        self,
        df: pd.DataFrame,
        fit: bool = False
    ) -> np.ndarray:
        """Prepare features for training or prediction"""
        X_numeric = df[self.numeric_features].copy()

        # Handle categorical features
        X_categorical = pd.DataFrame()
        for col in self.categorical_features:
            if fit:
                self.label_encoders[col] = LabelEncoder()
                X_categorical[col] = self.label_encoders[col].fit_transform(
                    df[col].astype(str)
                )
            else:
                X_categorical[col] = self.label_encoders[col].transform(
                    df[col].astype(str)
                )

        X = pd.concat([X_numeric, X_categorical], axis=1)
        self.feature_names = list(X.columns)

        # Handle missing values
        X = X.fillna(X.median())

        # Scale features
        if fit:
            X_scaled = self.scaler.fit_transform(X)
        else:
            X_scaled = self.scaler.transform(X)

        return X_scaled

    def train(
        self,
        df: pd.DataFrame,
        target_col: str = 'churned'
    ) -> Dict:
        """Train the churn prediction model"""

        # Prepare data
        X = self.prepare_features(df, fit=True)
        y = df[target_col].values

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Train base model with hyperparameters tuned for churn
        base_model = GradientBoostingClassifier(
            n_estimators=200,
            max_depth=5,
            learning_rate=0.1,
            min_samples_split=50,
            min_samples_leaf=20,
            subsample=0.8,
            random_state=42
        )

        # Calibrate probabilities for accurate confidence scores
        self.model = CalibratedClassifierCV(
            base_model,
            method='isotonic',
            cv=5
        )
        self.model.fit(X_train, y_train)

        # Evaluate
        y_pred = self.model.predict(X_test)
        y_proba = self.model.predict_proba(X_test)[:, 1]

        # Calculate metrics
        metrics = {
            'roc_auc': roc_auc_score(y_test, y_proba),
            'average_precision': average_precision_score(y_test, y_proba),
            'classification_report': classification_report(y_test, y_pred),
            'cross_val_scores': cross_val_score(
                self.model, X, y, cv=5, scoring='roc_auc'
            ).tolist()
        }

        # Initialize SHAP explainer for feature importance
        self.shap_explainer = shap.TreeExplainer(
            base_model,
            feature_names=self.feature_names
        )

        return metrics

    def predict(self, df: pd.DataFrame) -> List[ChurnPrediction]:
        """Predict churn probability for users"""
        X = self.prepare_features(df, fit=False)
        probabilities = self.model.predict_proba(X)[:, 1]

        predictions = []
        for idx, (_, row) in enumerate(df.iterrows()):
            prob = probabilities[idx]

            # Get SHAP values for explanation
            shap_values = self.shap_explainer.shap_values(X[idx:idx+1])
            risk_factors = self._get_risk_factors(
                shap_values[0] if isinstance(shap_values, list) else shap_values[0],
                row
            )

            prediction = ChurnPrediction(
                user_id=row['user_id'],
                churn_probability=float(prob),
                risk_level=self._get_risk_level(prob),
                key_risk_factors=risk_factors,
                recommended_actions=self._get_recommended_actions(risk_factors, prob),
                predicted_churn_date=self._estimate_churn_date(prob, row)
            )
            predictions.append(prediction)

        return predictions

    def _get_risk_level(self, probability: float) -> str:
        """Categorize churn risk level"""
        if probability < 0.2:
            return 'low'
        elif probability < 0.5:
            return 'medium'
        elif probability < 0.8:
            return 'high'
        return 'critical'

    def _get_risk_factors(
        self,
        shap_values: np.ndarray,
        row: pd.Series
    ) -> List[RiskFactor]:
        """Extract top risk factors from SHAP values"""
        feature_importance = list(zip(self.feature_names, shap_values))
        # Sort by absolute impact (negative SHAP = reduces churn)
        sorted_factors = sorted(
            feature_importance,
            key=lambda x: abs(x[1]),
            reverse=True
        )[:5]

        risk_factors = []
        for feature, impact in sorted_factors:
            risk_factors.append(RiskFactor(
                factor=feature,
                impact=float(impact),
                description=self._get_factor_description(feature, impact),
                current_value=float(row.get(feature, 0)),
                risk_threshold=self._get_risk_threshold(feature)
            ))

        return risk_factors

    def _get_recommended_actions(
        self,
        risk_factors: List[RiskFactor],
        probability: float
    ) -> List[RetentionAction]:
        """Generate recommended retention actions based on risk factors"""
        actions = []

        for factor in risk_factors:
            if factor.impact > 0:  # Positive impact = increases churn
                if 'login' in factor.factor.lower():
                    actions.append(RetentionAction(
                        action='re_engagement_email',
                        priority='high' if probability > 0.5 else 'medium',
                        description='Send personalized re-engagement email with feature highlights',
                        expected_impact=0.15
                    ))
                elif 'support' in factor.factor.lower():
                    actions.append(RetentionAction(
                        action='customer_success_call',
                        priority='high',
                        description='Schedule proactive customer success check-in',
                        expected_impact=0.20
                    ))
                elif 'feature' in factor.factor.lower():
                    actions.append(RetentionAction(
                        action='feature_adoption_campaign',
                        priority='medium',
                        description='Trigger in-app feature discovery tour',
                        expected_impact=0.10
                    ))

        return actions[:3]  # Top 3 actions

    def save(self, path: str):
        """Save model artifacts"""
        joblib.dump({
            'model': self.model,
            'scaler': self.scaler,
            'label_encoders': self.label_encoders,
            'feature_names': self.feature_names
        }, path)

    @classmethod
    def load(cls, path: str) -> 'ChurnPredictionModel':
        """Load model from disk"""
        instance = cls()
        artifacts = joblib.load(path)
        instance.model = artifacts['model']
        instance.scaler = artifacts['scaler']
        instance.label_encoders = artifacts['label_encoders']
        instance.feature_names = artifacts['feature_names']
        return instance

Time Series Forecasting with Prophet

Facebook Prophet excels at forecasting business metrics with strong seasonal patterns. It's particularly useful for predicting user activity, revenue trends, and capacity planning.

# time_series_forecasting.py
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple

class UserActivityForecaster:
    """Forecast user activity metrics using Prophet"""

    def __init__(self):
        self.models: Dict[str, Prophet] = {}
        self.metrics_config = {
            'daily_active_users': {
                'seasonality_mode': 'multiplicative',
                'changepoint_prior_scale': 0.1,
                'yearly_seasonality': True,
                'weekly_seasonality': True,
                'daily_seasonality': False
            },
            'revenue': {
                'seasonality_mode': 'multiplicative',
                'changepoint_prior_scale': 0.05,
                'yearly_seasonality': True,
                'weekly_seasonality': True,
                'daily_seasonality': False
            },
            'signups': {
                'seasonality_mode': 'additive',
                'changepoint_prior_scale': 0.15,
                'yearly_seasonality': True,
                'weekly_seasonality': True,
                'daily_seasonality': False
            }
        }

    def prepare_data(self, df: pd.DataFrame, metric_col: str) -> pd.DataFrame:
        """Prepare data for Prophet (requires 'ds' and 'y' columns)"""
        prophet_df = df[['date', metric_col]].copy()
        prophet_df.columns = ['ds', 'y']
        prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
        return prophet_df

    def train(
        self,
        df: pd.DataFrame,
        metric: str,
        holidays_df: pd.DataFrame = None,
        custom_seasonalities: List[Dict] = None
    ) -> Dict:
        """Train Prophet model for a specific metric"""

        # Prepare data
        prophet_df = self.prepare_data(df, metric)

        # Get config for this metric
        config = self.metrics_config.get(metric, {
            'seasonality_mode': 'additive',
            'changepoint_prior_scale': 0.1
        })

        # Initialize Prophet with configuration
        model = Prophet(
            seasonality_mode=config.get('seasonality_mode', 'additive'),
            changepoint_prior_scale=config.get('changepoint_prior_scale', 0.1),
            yearly_seasonality=config.get('yearly_seasonality', True),
            weekly_seasonality=config.get('weekly_seasonality', True),
            daily_seasonality=config.get('daily_seasonality', False),
            interval_width=0.95  # 95% confidence interval
        )

        # Add holidays if provided
        if holidays_df is not None:
            model = Prophet(
                holidays=holidays_df,
                **config
            )

        # Add custom seasonalities (e.g., monthly billing cycles)
        if custom_seasonalities:
            for seasonality in custom_seasonalities:
                model.add_seasonality(**seasonality)

        # Add regressor for external factors
        # e.g., marketing spend, product launches
        if 'marketing_spend' in df.columns:
            prophet_df['marketing_spend'] = df['marketing_spend']
            model.add_regressor('marketing_spend')

        # Train model
        model.fit(prophet_df)
        self.models[metric] = model

        # Cross-validation for performance metrics
        cv_results = cross_validation(
            model,
            initial='365 days',
            period='30 days',
            horizon='90 days'
        )
        perf_metrics = performance_metrics(cv_results)

        return {
            'mape': perf_metrics['mape'].mean(),
            'rmse': perf_metrics['rmse'].mean(),
            'coverage': perf_metrics['coverage'].mean(),
            'cv_results': cv_results
        }

    def forecast(
        self,
        metric: str,
        periods: int = 90,
        include_history: bool = False
    ) -> pd.DataFrame:
        """Generate forecast for specified periods ahead"""

        if metric not in self.models:
            raise ValueError(f"No model trained for metric: {metric}")

        model = self.models[metric]

        # Create future dataframe
        future = model.make_future_dataframe(periods=periods)

        # If we have regressors, need to provide future values
        # This would typically come from planned marketing spend, etc.

        # Generate forecast
        forecast = model.predict(future)

        if not include_history:
            forecast = forecast.tail(periods)

        return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper', 'trend']]

    def get_components(self, metric: str) -> Dict:
        """Get forecast components (trend, seasonality, etc.)"""
        if metric not in self.models:
            raise ValueError(f"No model trained for metric: {metric}")

        model = self.models[metric]

        # Get the last forecast
        future = model.make_future_dataframe(periods=30)
        forecast = model.predict(future)

        components = {
            'trend': forecast[['ds', 'trend']],
            'weekly': forecast[['ds', 'weekly']] if 'weekly' in forecast else None,
            'yearly': forecast[['ds', 'yearly']] if 'yearly' in forecast else None
        }

        return components

    def detect_anomalies(
        self,
        df: pd.DataFrame,
        metric: str,
        threshold: float = 0.95
    ) -> pd.DataFrame:
        """Detect anomalies by comparing actual vs predicted values"""

        if metric not in self.models:
            raise ValueError(f"No model trained for metric: {metric}")

        model = self.models[metric]
        prophet_df = self.prepare_data(df, metric)

        # Predict on historical data
        forecast = model.predict(prophet_df)

        # Merge with actual values
        result = prophet_df.merge(
            forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']],
            on='ds'
        )

        # Flag anomalies (outside confidence interval)
        result['is_anomaly'] = (
            (result['y'] < result['yhat_lower']) |
            (result['y'] > result['yhat_upper'])
        )

        # Calculate deviation
        result['deviation'] = (result['y'] - result['yhat']) / result['yhat']
        result['deviation_severity'] = np.abs(result['deviation'])

        return result[result['is_anomaly']].sort_values(
            'deviation_severity',
            ascending=False
        )


# Example: Forecasting with holidays and marketing spend
def create_holiday_dataframe() -> pd.DataFrame:
    """Create holiday dataframe for Prophet"""
    holidays = pd.DataFrame({
        'holiday': 'black_friday',
        'ds': pd.to_datetime([
            '2023-11-24', '2024-11-29', '2025-11-28'
        ]),
        'lower_window': 0,
        'upper_window': 3  # Effect lasts through Cyber Monday
    })

    # Add more holidays
    christmas = pd.DataFrame({
        'holiday': 'christmas_season',
        'ds': pd.to_datetime([
            '2023-12-20', '2024-12-20', '2025-12-20'
        ]),
        'lower_window': -5,
        'upper_window': 5
    })

    return pd.concat([holidays, christmas])

Conversion Probability Forecasting

Predicting which users will convert enables targeted engagement at the right moment. Here's a complete conversion prediction system:

# conversion_prediction.py
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from typing import Dict, List, Tuple
import xgboost as xgb

class ConversionPredictor:
    """Predict user conversion probability and optimal intervention timing"""

    def __init__(self):
        self.model = None
        self.feature_pipeline = None
        self.conversion_funnel_stages = [
            'anonymous',
            'registered',
            'activated',
            'engaged',
            'converted'
        ]

    def compute_funnel_features(self, user_events: pd.DataFrame) -> Dict:
        """Compute conversion funnel-specific features"""

        features = {}

        # Time in each funnel stage
        stage_times = self._compute_stage_durations(user_events)
        for stage, duration in stage_times.items():
            features[f'{stage}_duration_hours'] = duration

        # Actions per funnel stage
        stage_actions = self._compute_stage_actions(user_events)
        for stage, count in stage_actions.items():
            features[f'{stage}_action_count'] = count

        # Key conversion indicators
        features['viewed_pricing_page'] = int(
            'pricing_page_view' in user_events['event_type'].values
        )
        features['pricing_page_time_seconds'] = self._get_page_time(
            user_events, 'pricing'
        )
        features['compared_plans'] = int(
            'plan_comparison' in user_events['event_type'].values
        )
        features['started_trial'] = int(
            'trial_started' in user_events['event_type'].values
        )
        features['trial_days_remaining'] = self._get_trial_days_remaining(
            user_events
        )
        features['features_during_trial'] = self._count_trial_features(
            user_events
        )

        # Engagement quality
        features['return_visit_count'] = user_events['session_id'].nunique()
        features['avg_session_depth'] = (
            user_events.groupby('session_id').size().mean()
        )

        # Social proof interaction
        features['viewed_testimonials'] = int(
            'testimonials_view' in user_events['event_type'].values
        )
        features['viewed_case_studies'] = int(
            'case_study_view' in user_events['event_type'].values
        )

        return features

    def train(
        self,
        features_df: pd.DataFrame,
        converted: pd.Series
    ) -> Dict:
        """Train conversion prediction model"""

        # Use XGBoost for better handling of imbalanced data
        self.model = xgb.XGBClassifier(
            n_estimators=200,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=len(converted[converted==0]) / len(converted[converted==1]),
            eval_metric='auc',
            use_label_encoder=False,
            random_state=42
        )

        # Create preprocessing pipeline
        self.feature_pipeline = Pipeline([
            ('scaler', StandardScaler())
        ])

        X = self.feature_pipeline.fit_transform(features_df)
        y = converted.values

        # Train with early stopping
        X_train, X_val, y_train, y_val = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        self.model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            early_stopping_rounds=20,
            verbose=False
        )

        # Return metrics
        y_pred_proba = self.model.predict_proba(X_val)[:, 1]
        return {
            'auc_roc': roc_auc_score(y_val, y_pred_proba),
            'feature_importance': dict(zip(
                features_df.columns,
                self.model.feature_importances_
            ))
        }

    def predict_conversion(
        self,
        user_features: pd.DataFrame
    ) -> List[ConversionPrediction]:
        """Predict conversion probability for users"""

        X = self.feature_pipeline.transform(user_features)
        probabilities = self.model.predict_proba(X)[:, 1]

        predictions = []
        for idx, (_, row) in enumerate(user_features.iterrows()):
            prob = probabilities[idx]

            predictions.append(ConversionPrediction(
                user_id=row['user_id'],
                conversion_probability=float(prob),
                expected_conversion_time=self._estimate_conversion_time(row, prob),
                optimal_touchpoint=self._determine_optimal_touchpoint(row, prob),
                recommended_offer=self._recommend_offer(row, prob)
            ))

        return predictions

    def _determine_optimal_touchpoint(
        self,
        features: pd.Series,
        probability: float
    ) -> str:
        """Determine the best intervention point"""

        if features.get('trial_days_remaining', 0) <= 3:
            return 'trial_expiry_reminder'
        elif features.get('viewed_pricing_page', 0) and probability > 0.6:
            return 'sales_outreach'
        elif features.get('features_during_trial', 0) < 3:
            return 'feature_discovery_email'
        elif probability > 0.4:
            return 'personalized_demo_offer'
        else:
            return 'educational_content'

    def _recommend_offer(
        self,
        features: pd.Series,
        probability: float
    ) -> Offer:
        """Recommend the best conversion offer"""

        if probability > 0.7:
            # High probability - minimal discount needed
            return Offer(
                type='standard',
                discount_percent=0,
                message='Start your subscription today'
            )
        elif probability > 0.5:
            # Medium probability - small incentive
            return Offer(
                type='time_limited',
                discount_percent=10,
                message='Get 10% off if you subscribe this week',
                valid_days=7
            )
        else:
            # Lower probability - stronger incentive
            return Offer(
                type='extended_trial',
                discount_percent=20,
                message='Extend your trial by 14 days or get 20% off annual plan',
                valid_days=14
            )

Building a Real-Time Prediction API

Production systems need real-time predictions served via APIs. Here's a complete FastAPI implementation with caching and monitoring:

# prediction_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import redis
import json
from datetime import datetime, timedelta
import asyncio
from prometheus_client import Counter, Histogram, generate_latest

app = FastAPI(title="Predictive Analytics API")

# Metrics
prediction_counter = Counter(
    'predictions_total',
    'Total predictions made',
    ['model_type', 'risk_level']
)
prediction_latency = Histogram(
    'prediction_latency_seconds',
    'Prediction latency in seconds',
    ['model_type']
)

# Redis cache
redis_client = redis.Redis(host='localhost', port=6379, db=0)
CACHE_TTL = 300  # 5 minutes

# Load models at startup
churn_model = ChurnPredictionModel.load('models/churn_model.joblib')
conversion_model = ConversionPredictor.load('models/conversion_model.joblib')

class UserPredictionRequest(BaseModel):
    user_id: str
    include_explanations: bool = True

class BatchPredictionRequest(BaseModel):
    user_ids: List[str]
    model_type: str  # 'churn' or 'conversion'

class PredictionResponse(BaseModel):
    user_id: str
    prediction_type: str
    probability: float
    risk_level: str
    risk_factors: Optional[List[dict]]
    recommended_actions: Optional[List[dict]]
    cached: bool
    timestamp: str


@app.get("/health")
async def health_check():
    return {"status": "healthy", "models_loaded": True}


@app.post("/predict/churn", response_model=PredictionResponse)
async def predict_churn(request: UserPredictionRequest):
    """Get churn prediction for a single user"""

    cache_key = f"churn:{request.user_id}"

    # Check cache
    cached = redis_client.get(cache_key)
    if cached:
        response = json.loads(cached)
        response['cached'] = True
        return response

    # Get user features
    with prediction_latency.labels(model_type='churn').time():
        features = await get_user_features(request.user_id)

        if features is None:
            raise HTTPException(status_code=404, detail="User not found")

        # Make prediction
        prediction = churn_model.predict(features)[0]

        response = PredictionResponse(
            user_id=request.user_id,
            prediction_type='churn',
            probability=prediction.churn_probability,
            risk_level=prediction.risk_level,
            risk_factors=[rf.__dict__ for rf in prediction.key_risk_factors]
                if request.include_explanations else None,
            recommended_actions=[ra.__dict__ for ra in prediction.recommended_actions]
                if request.include_explanations else None,
            cached=False,
            timestamp=datetime.utcnow().isoformat()
        )

    # Update metrics
    prediction_counter.labels(
        model_type='churn',
        risk_level=prediction.risk_level
    ).inc()

    # Cache result
    redis_client.setex(
        cache_key,
        CACHE_TTL,
        json.dumps(response.dict())
    )

    return response


@app.post("/predict/conversion", response_model=PredictionResponse)
async def predict_conversion(request: UserPredictionRequest):
    """Get conversion prediction for a single user"""

    cache_key = f"conversion:{request.user_id}"

    # Check cache
    cached = redis_client.get(cache_key)
    if cached:
        response = json.loads(cached)
        response['cached'] = True
        return response

    with prediction_latency.labels(model_type='conversion').time():
        features = await get_user_features(request.user_id, for_conversion=True)

        if features is None:
            raise HTTPException(status_code=404, detail="User not found")

        prediction = conversion_model.predict_conversion(features)[0]

        response = PredictionResponse(
            user_id=request.user_id,
            prediction_type='conversion',
            probability=prediction.conversion_probability,
            risk_level=_get_conversion_urgency(prediction.conversion_probability),
            risk_factors=None,
            recommended_actions=[{
                'touchpoint': prediction.optimal_touchpoint,
                'offer': prediction.recommended_offer.__dict__
            }] if request.include_explanations else None,
            cached=False,
            timestamp=datetime.utcnow().isoformat()
        )

    prediction_counter.labels(
        model_type='conversion',
        risk_level=response.risk_level
    ).inc()

    redis_client.setex(cache_key, CACHE_TTL, json.dumps(response.dict()))

    return response


@app.post("/predict/batch")
async def batch_predict(
    request: BatchPredictionRequest,
    background_tasks: BackgroundTasks
):
    """Batch prediction for multiple users (async processing)"""

    job_id = f"batch_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{len(request.user_ids)}"

    # Store job status
    redis_client.hset(f"job:{job_id}", mapping={
        'status': 'processing',
        'total': len(request.user_ids),
        'completed': 0,
        'model_type': request.model_type
    })

    # Process in background
    background_tasks.add_task(
        process_batch_predictions,
        job_id,
        request.user_ids,
        request.model_type
    )

    return {
        "job_id": job_id,
        "status": "processing",
        "status_url": f"/jobs/{job_id}"
    }


@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
    """Check batch prediction job status"""

    job_data = redis_client.hgetall(f"job:{job_id}")
    if not job_data:
        raise HTTPException(status_code=404, detail="Job not found")

    return {
        "job_id": job_id,
        "status": job_data.get(b'status', b'unknown').decode(),
        "progress": {
            "completed": int(job_data.get(b'completed', 0)),
            "total": int(job_data.get(b'total', 0))
        }
    }


@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return Response(
        generate_latest(),
        media_type="text/plain"
    )


async def process_batch_predictions(
    job_id: str,
    user_ids: List[str],
    model_type: str
):
    """Background task for batch predictions"""

    results = []

    for i, user_id in enumerate(user_ids):
        try:
            features = await get_user_features(user_id)

            if model_type == 'churn':
                prediction = churn_model.predict(features)[0]
                results.append({
                    'user_id': user_id,
                    'probability': prediction.churn_probability,
                    'risk_level': prediction.risk_level
                })
            else:
                prediction = conversion_model.predict_conversion(features)[0]
                results.append({
                    'user_id': user_id,
                    'probability': prediction.conversion_probability
                })

            # Update progress
            redis_client.hset(f"job:{job_id}", 'completed', i + 1)

        except Exception as e:
            results.append({
                'user_id': user_id,
                'error': str(e)
            })

    # Store results
    redis_client.hset(f"job:{job_id}", mapping={
        'status': 'completed',
        'results': json.dumps(results)
    })
    redis_client.expire(f"job:{job_id}", 3600)  # Keep for 1 hour


async def get_user_features(
    user_id: str,
    for_conversion: bool = False
) -> Optional[pd.DataFrame]:
    """Fetch user features from feature store"""
    # In production, this would query your feature store
    # (e.g., Feast, Tecton, or custom solution)
    pass

User Journey Optimization

Optimizing user journeys requires understanding the paths users take and predicting optimal next actions:

// user-journey-optimizer.ts
import * as tf from '@tensorflow/tfjs-node';

interface JourneyStep {
    userId: string;
    stepIndex: number;
    action: string;
    timestamp: number;
    metadata: Record;
}

interface JourneyPrediction {
    userId: string;
    currentStep: string;
    predictedNextSteps: Array<{
        action: string;
        probability: number;
    }>;
    optimalPath: string[];
    dropoffRisk: number;
    recommendedIntervention?: Intervention;
}

interface Intervention {
    type: 'tooltip' | 'modal' | 'email' | 'notification';
    content: string;
    triggerCondition: string;
    expectedImpact: number;
}

class UserJourneyOptimizer {
    private model: tf.LayersModel | null = null;
    private actionEncoder: Map = new Map();
    private actionDecoder: Map = new Map();
    private sequenceLength: number = 10;
    private embeddingDim: number = 32;

    async initialize(): Promise {
        // Load pre-trained model or create new one
        try {
            this.model = await tf.loadLayersModel('file://models/journey-model/model.json');
        } catch {
            this.model = this.buildModel();
        }
    }

    private buildModel(): tf.LayersModel {
        // LSTM-based sequence model for next action prediction
        const input = tf.input({ shape: [this.sequenceLength, 1] });

        // Embedding layer for actions
        let x = tf.layers.embedding({
            inputDim: 100, // Max unique actions
            outputDim: this.embeddingDim
        }).apply(input) as tf.SymbolicTensor;

        // Reshape for LSTM
        x = tf.layers.reshape({
            targetShape: [this.sequenceLength, this.embeddingDim]
        }).apply(x) as tf.SymbolicTensor;

        // Bidirectional LSTM for better context understanding
        x = tf.layers.bidirectional({
            layer: tf.layers.lstm({
                units: 64,
                returnSequences: true,
                dropout: 0.2,
                recurrentDropout: 0.2
            })
        }).apply(x) as tf.SymbolicTensor;

        x = tf.layers.bidirectional({
            layer: tf.layers.lstm({
                units: 32,
                returnSequences: false,
                dropout: 0.2
            })
        }).apply(x) as tf.SymbolicTensor;

        // Dense layers
        x = tf.layers.dense({ units: 64, activation: 'relu' }).apply(x) as tf.SymbolicTensor;
        x = tf.layers.dropout({ rate: 0.3 }).apply(x) as tf.SymbolicTensor;

        // Output: probability distribution over next actions
        const output = tf.layers.dense({
            units: 100, // Number of possible actions
            activation: 'softmax'
        }).apply(x) as tf.SymbolicTensor;

        const model = tf.model({ inputs: input, outputs: output });

        model.compile({
            optimizer: tf.train.adam(0.001),
            loss: 'categoricalCrossentropy',
            metrics: ['accuracy']
        });

        return model;
    }

    async predictNextSteps(
        userId: string,
        recentActions: string[]
    ): Promise {
        if (!this.model) {
            throw new Error('Model not initialized');
        }

        // Encode actions to numbers
        const encodedSequence = recentActions
            .slice(-this.sequenceLength)
            .map(action => this.encodeAction(action));

        // Pad sequence if needed
        while (encodedSequence.length < this.sequenceLength) {
            encodedSequence.unshift(0); // Pad with zeros
        }

        // Predict
        const inputTensor = tf.tensor3d([encodedSequence.map(x => [x])]);
        const predictions = this.model.predict(inputTensor) as tf.Tensor;
        const probs = await predictions.data();
        inputTensor.dispose();
        predictions.dispose();

        // Get top 5 predicted actions
        const actionProbs = Array.from(probs)
            .map((prob, idx) => ({ action: this.decodeAction(idx), probability: prob }))
            .filter(ap => ap.action !== 'unknown')
            .sort((a, b) => b.probability - a.probability)
            .slice(0, 5);

        // Calculate dropoff risk
        const dropoffRisk = this.calculateDropoffRisk(recentActions, actionProbs);

        // Determine optimal path
        const optimalPath = await this.computeOptimalPath(
            recentActions[recentActions.length - 1],
            'conversion' // Target state
        );

        return {
            userId,
            currentStep: recentActions[recentActions.length - 1],
            predictedNextSteps: actionProbs,
            optimalPath,
            dropoffRisk,
            recommendedIntervention: dropoffRisk > 0.5
                ? this.getIntervention(recentActions, dropoffRisk)
                : undefined
        };
    }

    private calculateDropoffRisk(
        recentActions: string[],
        predictions: Array<{ action: string; probability: number }>
    ): number {
        // High-risk indicators
        const riskSignals = {
            stagnation: this.detectStagnation(recentActions),
            backtracking: this.detectBacktracking(recentActions),
            exitIntent: predictions.some(p =>
                ['exit', 'close', 'abandon'].includes(p.action) && p.probability > 0.2
            ),
            lowEngagement: recentActions.length < 3
        };

        // Weighted risk score
        return (
            riskSignals.stagnation * 0.3 +
            riskSignals.backtracking * 0.25 +
            (riskSignals.exitIntent ? 0.35 : 0) +
            (riskSignals.lowEngagement ? 0.1 : 0)
        );
    }

    private detectStagnation(actions: string[]): number {
        // Check if user is repeating same actions
        const uniqueActions = new Set(actions.slice(-5));
        return 1 - (uniqueActions.size / Math.min(5, actions.length));
    }

    private detectBacktracking(actions: string[]): number {
        // Check if user is going back in the funnel
        const funnelOrder = ['landing', 'features', 'pricing', 'signup', 'onboarding'];
        let backtrackCount = 0;

        for (let i = 1; i < actions.length; i++) {
            const prevIdx = funnelOrder.indexOf(actions[i - 1]);
            const currIdx = funnelOrder.indexOf(actions[i]);

            if (prevIdx > -1 && currIdx > -1 && currIdx < prevIdx) {
                backtrackCount++;
            }
        }

        return Math.min(1, backtrackCount / actions.length);
    }

    private getIntervention(
        actions: string[],
        dropoffRisk: number
    ): Intervention {
        const lastAction = actions[actions.length - 1];

        // Context-aware interventions
        if (lastAction === 'pricing' && dropoffRisk > 0.6) {
            return {
                type: 'modal',
                content: 'Have questions about pricing? Chat with us!',
                triggerCondition: 'pricing_page_dwell > 30s',
                expectedImpact: 0.15
            };
        }

        if (this.detectStagnation(actions) > 0.5) {
            return {
                type: 'tooltip',
                content: 'Not sure what to do next? Try our guided tour.',
                triggerCondition: 'idle_time > 10s',
                expectedImpact: 0.12
            };
        }

        return {
            type: 'notification',
            content: 'Need help? Our support team is here for you.',
            triggerCondition: 'session_time > 120s && no_conversion',
            expectedImpact: 0.08
        };
    }

    async computeOptimalPath(
        currentState: string,
        targetState: string
    ): Promise {
        // Use beam search to find optimal path
        const beamWidth = 5;
        const maxDepth = 10;

        interface PathState {
            path: string[];
            probability: number;
        }

        let beam: PathState[] = [{
            path: [currentState],
            probability: 1.0
        }];

        for (let depth = 0; depth < maxDepth; depth++) {
            const candidates: PathState[] = [];

            for (const state of beam) {
                if (state.path[state.path.length - 1] === targetState) {
                    candidates.push(state);
                    continue;
                }

                // Get predictions for current path
                const prediction = await this.predictNextSteps(
                    'optimizer',
                    state.path
                );

                for (const nextStep of prediction.predictedNextSteps) {
                    candidates.push({
                        path: [...state.path, nextStep.action],
                        probability: state.probability * nextStep.probability
                    });
                }
            }

            // Keep top beamWidth candidates
            beam = candidates
                .sort((a, b) => b.probability - a.probability)
                .slice(0, beamWidth);

            // Early exit if all paths reached target
            if (beam.every(s => s.path[s.path.length - 1] === targetState)) {
                break;
            }
        }

        return beam[0]?.path || [currentState];
    }

    private encodeAction(action: string): number {
        if (!this.actionEncoder.has(action)) {
            const id = this.actionEncoder.size + 1;
            this.actionEncoder.set(action, id);
            this.actionDecoder.set(id, action);
        }
        return this.actionEncoder.get(action)!;
    }

    private decodeAction(id: number): string {
        return this.actionDecoder.get(id) || 'unknown';
    }
}

Anomaly Detection in User Behavior

Detecting unusual patterns helps identify fraud, system issues, or opportunities. Here's an anomaly detection system using Isolation Forest and statistical methods:

# anomaly_detection.py
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
from typing import List, Dict, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Anomaly:
    timestamp: datetime
    metric: str
    actual_value: float
    expected_value: float
    deviation_score: float
    anomaly_type: str  # 'spike', 'drop', 'pattern_change'
    severity: str  # 'low', 'medium', 'high', 'critical'
    context: Dict

class AnomalyDetector:
    """Multi-method anomaly detection for user behavior"""

    def __init__(self):
        self.isolation_forest = IsolationForest(
            contamination=0.05,  # Expected 5% anomalies
            random_state=42,
            n_estimators=200
        )
        self.scaler = StandardScaler()
        self.baseline_stats: Dict[str, Dict] = {}

    def fit(self, df: pd.DataFrame, metric_columns: List[str]):
        """Fit detector on historical data"""

        # Scale features
        X = self.scaler.fit_transform(df[metric_columns])

        # Train Isolation Forest
        self.isolation_forest.fit(X)

        # Compute baseline statistics for each metric
        for col in metric_columns:
            self.baseline_stats[col] = {
                'mean': df[col].mean(),
                'std': df[col].std(),
                'median': df[col].median(),
                'q1': df[col].quantile(0.25),
                'q3': df[col].quantile(0.75),
                'iqr': df[col].quantile(0.75) - df[col].quantile(0.25),
                'hourly_pattern': df.groupby(df['timestamp'].dt.hour)[col].mean().to_dict(),
                'daily_pattern': df.groupby(df['timestamp'].dt.dayofweek)[col].mean().to_dict()
            }

    def detect(
        self,
        df: pd.DataFrame,
        metric_columns: List[str]
    ) -> List[Anomaly]:
        """Detect anomalies using multiple methods"""

        anomalies = []

        # Method 1: Isolation Forest for multivariate anomalies
        X = self.scaler.transform(df[metric_columns])
        if_predictions = self.isolation_forest.predict(X)
        if_scores = self.isolation_forest.decision_function(X)

        for idx, (prediction, score) in enumerate(zip(if_predictions, if_scores)):
            if prediction == -1:  # Anomaly
                row = df.iloc[idx]
                # Find which metric is most anomalous
                most_anomalous = self._find_most_anomalous_metric(
                    row, metric_columns
                )

                anomalies.append(Anomaly(
                    timestamp=row['timestamp'],
                    metric=most_anomalous['metric'],
                    actual_value=most_anomalous['value'],
                    expected_value=most_anomalous['expected'],
                    deviation_score=abs(score),
                    anomaly_type=most_anomalous['type'],
                    severity=self._get_severity(abs(score)),
                    context={'isolation_forest_score': float(score)}
                ))

        # Method 2: Statistical anomalies (z-score based)
        for col in metric_columns:
            stats_anomalies = self._detect_statistical_anomalies(df, col)
            anomalies.extend(stats_anomalies)

        # Method 3: Pattern-based anomalies
        for col in metric_columns:
            pattern_anomalies = self._detect_pattern_anomalies(df, col)
            anomalies.extend(pattern_anomalies)

        # Deduplicate and rank
        return self._deduplicate_anomalies(anomalies)

    def _detect_statistical_anomalies(
        self,
        df: pd.DataFrame,
        metric: str
    ) -> List[Anomaly]:
        """Detect anomalies using z-scores and IQR"""

        anomalies = []
        stats = self.baseline_stats[metric]

        for idx, row in df.iterrows():
            value = row[metric]

            # Z-score method
            z_score = (value - stats['mean']) / stats['std'] if stats['std'] > 0 else 0

            # IQR method
            lower_bound = stats['q1'] - 1.5 * stats['iqr']
            upper_bound = stats['q3'] + 1.5 * stats['iqr']

            if abs(z_score) > 3 or value < lower_bound or value > upper_bound:
                anomaly_type = 'spike' if value > stats['mean'] else 'drop'

                anomalies.append(Anomaly(
                    timestamp=row['timestamp'],
                    metric=metric,
                    actual_value=value,
                    expected_value=stats['mean'],
                    deviation_score=abs(z_score),
                    anomaly_type=anomaly_type,
                    severity=self._get_severity(abs(z_score)),
                    context={
                        'z_score': float(z_score),
                        'lower_bound': float(lower_bound),
                        'upper_bound': float(upper_bound)
                    }
                ))

        return anomalies

    def _detect_pattern_anomalies(
        self,
        df: pd.DataFrame,
        metric: str
    ) -> List[Anomaly]:
        """Detect anomalies based on hourly/daily patterns"""

        anomalies = []
        stats = self.baseline_stats[metric]

        for idx, row in df.iterrows():
            value = row[metric]
            hour = row['timestamp'].hour
            day = row['timestamp'].dayofweek

            # Expected value based on time patterns
            hourly_expected = stats['hourly_pattern'].get(hour, stats['mean'])
            daily_expected = stats['daily_pattern'].get(day, stats['mean'])
            expected = (hourly_expected + daily_expected) / 2

            # Check deviation from expected pattern
            if stats['std'] > 0:
                pattern_z = abs(value - expected) / stats['std']

                if pattern_z > 2.5:  # Significant deviation from pattern
                    anomalies.append(Anomaly(
                        timestamp=row['timestamp'],
                        metric=metric,
                        actual_value=value,
                        expected_value=expected,
                        deviation_score=pattern_z,
                        anomaly_type='pattern_change',
                        severity=self._get_severity(pattern_z),
                        context={
                            'hour': hour,
                            'day': day,
                            'hourly_expected': float(hourly_expected),
                            'daily_expected': float(daily_expected)
                        }
                    ))

        return anomalies

    def _get_severity(self, score: float) -> str:
        """Map deviation score to severity level"""
        if score > 5:
            return 'critical'
        elif score > 3:
            return 'high'
        elif score > 2:
            return 'medium'
        return 'low'

    def _find_most_anomalous_metric(
        self,
        row: pd.Series,
        metrics: List[str]
    ) -> Dict:
        """Find which metric contributed most to anomaly"""

        max_deviation = 0
        result = None

        for metric in metrics:
            stats = self.baseline_stats[metric]
            value = row[metric]
            expected = stats['mean']

            if stats['std'] > 0:
                deviation = abs(value - expected) / stats['std']

                if deviation > max_deviation:
                    max_deviation = deviation
                    result = {
                        'metric': metric,
                        'value': value,
                        'expected': expected,
                        'type': 'spike' if value > expected else 'drop'
                    }

        return result or {'metric': metrics[0], 'value': 0, 'expected': 0, 'type': 'unknown'}

    def _deduplicate_anomalies(
        self,
        anomalies: List[Anomaly]
    ) -> List[Anomaly]:
        """Remove duplicate anomalies, keeping highest severity"""

        seen = {}

        for anomaly in anomalies:
            key = (anomaly.timestamp, anomaly.metric)

            if key not in seen or self._severity_rank(anomaly.severity) > \
               self._severity_rank(seen[key].severity):
                seen[key] = anomaly

        return sorted(
            seen.values(),
            key=lambda a: (self._severity_rank(a.severity), a.deviation_score),
            reverse=True
        )

    def _severity_rank(self, severity: str) -> int:
        return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(severity, 0)

Key Takeaways

Remember These Points

  • Feature engineering is critical: The quality of your predictive models depends heavily on well-crafted features that capture user behavior patterns
  • Use calibrated probabilities: Ensure your churn and conversion scores are well-calibrated for accurate risk assessment and intervention planning
  • Prophet excels at time series: Use Facebook Prophet for forecasting metrics with seasonal patterns, holidays, and external regressors
  • Explain predictions with SHAP: Provide interpretable risk factors using SHAP values to enable actionable interventions
  • Build real-time prediction APIs: Use FastAPI with Redis caching for low-latency predictions in production
  • Optimize user journeys proactively: Predict next actions and identify dropoff risk to intervene before users leave
  • Detect anomalies with multiple methods: Combine Isolation Forest, statistical methods, and pattern analysis for comprehensive anomaly detection
  • Monitor model performance continuously: Track prediction accuracy and retrain models as user behavior evolves

Conclusion

Predictive analytics transforms web applications from reactive to proactive systems. By implementing churn prediction, conversion forecasting, and user journey optimization, SaaS companies routinely achieve 25% churn reduction and 40% conversion improvements. The combination of Prophet for time series forecasting and scikit-learn for classification provides a robust foundation for production-grade predictive systems.

The key to success is starting with high-quality feature engineering, building explainable models that enable action, and deploying real-time prediction APIs that integrate seamlessly with your product. For deeper learning, explore Andrew Ng's Machine Learning course and Google's ML Crash Course.

Remember that predictive models require continuous monitoring and retraining as user behavior evolves. Implement drift detection, track prediction accuracy over time, and establish automated retraining pipelines. The investment in predictive analytics infrastructure pays dividends in customer retention, conversion optimization, and data-driven decision making.