Predictive analytics has transformed from a competitive advantage to a business necessity. SaaS companies implementing predictive user behavior models report 25% reduction in churn and 40% improvement in conversion rates. According to McKinsey research, organizations using advanced analytics outperform their peers by 85% in sales growth and more than 25% in gross margin.
In this comprehensive guide, we'll build intelligent analytics systems using Facebook Prophet and scikit-learn. You'll learn to predict user churn, forecast conversions, identify high-value users, optimize user journeys, detect anomalies, and create proactive engagement strategies. We'll cover complete implementation patterns including real-time prediction APIs, data pipelines, and production deployment.
Understanding Predictive Analytics for Web Applications
Predictive analytics applies statistical algorithms and machine learning to historical data to forecast future outcomes. For web applications, this means anticipating user behavior before it happens, enabling proactive rather than reactive strategies. According to Gartner, by 2025, 75% of companies will shift from piloting to operationalizing AI.
Core Prediction Categories
User behavior predictions fall into several key categories, each requiring different modeling approaches:
// prediction-types.ts
interface PredictionTypes {
// Classification: Will this happen? (Yes/No)
churn: ChurnPrediction;
conversion: ConversionPrediction;
// Regression: How much/when?
lifetimeValue: LTVPrediction;
nextPurchaseTime: TimePrediction;
// Time Series: What's the trend?
dailyActiveUsers: TimeSeriesForecast;
revenueProjection: TimeSeriesForecast;
// Anomaly Detection: Is this unusual?
fraudDetection: AnomalyPrediction;
usageSpike: AnomalyPrediction;
}
interface ChurnPrediction {
userId: string;
churnProbability: number; // 0-1
riskLevel: 'low' | 'medium' | 'high' | 'critical';
keyRiskFactors: RiskFactor[];
recommendedActions: RetentionAction[];
predictedChurnDate?: Date;
}
interface RiskFactor {
factor: string;
impact: number; // -1 to 1
description: string;
currentValue: number;
riskThreshold: number;
}
interface ConversionPrediction {
userId: string;
conversionProbability: number;
expectedConversionTime: Date;
optimalTouchpoint: string;
recommendedOffer: Offer;
}
interface TimeSeriesForecast {
metric: string;
predictions: ForecastPoint[];
confidenceInterval: {
lower: number[];
upper: number[];
};
trend: 'increasing' | 'decreasing' | 'stable';
seasonality: SeasonalComponent[];
}
Building a Churn Prediction System
Churn prediction is often the highest-impact use case for predictive analytics. By identifying at-risk customers early, you can intervene before they leave. Research from Harvard Business Review shows that acquiring a new customer costs 5-25x more than retaining an existing one.
Feature Engineering for Churn
The quality of your features determines model performance. Here's a comprehensive feature engineering pipeline:
# churn_features.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class UserFeatures:
"""Complete feature set for churn prediction"""
user_id: str
# Engagement metrics
login_frequency_7d: float
login_frequency_30d: float
login_frequency_trend: float # 7d vs 30d ratio
sessions_per_login: float
avg_session_duration_minutes: float
session_duration_trend: float
# Activity metrics
features_used_count: int
feature_usage_breadth: float # unique features / total features
core_feature_usage_7d: int
actions_per_session: float
# Value metrics
current_plan: str
plan_value_monthly: float
days_since_signup: int
total_lifetime_value: float
payment_failures_count: int
# Support metrics
support_tickets_30d: int
avg_ticket_resolution_hours: float
negative_feedback_count: int
nps_score: float
# Social metrics
team_size: int
active_team_members_ratio: float
shared_content_count: int
# Temporal metrics
days_since_last_login: int
days_since_last_core_action: int
# Derived metrics
engagement_score: float
health_score: float
class ChurnFeatureEngineer:
"""Engineer features for churn prediction from raw event data"""
def __init__(self, events_df: pd.DataFrame, users_df: pd.DataFrame):
self.events = events_df
self.users = users_df
self.now = datetime.now()
def compute_features(self, user_id: str) -> UserFeatures:
"""Compute all features for a single user"""
user_events = self.events[self.events['user_id'] == user_id]
user_data = self.users[self.users['user_id'] == user_id].iloc[0]
return UserFeatures(
user_id=user_id,
**self._compute_engagement_features(user_events),
**self._compute_activity_features(user_events),
**self._compute_value_features(user_data),
**self._compute_support_features(user_id),
**self._compute_social_features(user_id),
**self._compute_temporal_features(user_events),
**self._compute_derived_features(user_events, user_data)
)
def _compute_engagement_features(self, events: pd.DataFrame) -> Dict:
"""Calculate engagement-related features"""
logins = events[events['event_type'] == 'login']
# Login frequency
logins_7d = len(logins[logins['timestamp'] >= self.now - timedelta(days=7)])
logins_30d = len(logins[logins['timestamp'] >= self.now - timedelta(days=30)])
# Trend: if 7d rate is lower than expected from 30d, user is declining
expected_7d = logins_30d * (7/30)
login_trend = logins_7d / max(expected_7d, 1)
# Session analysis
sessions = events.groupby('session_id').agg({
'timestamp': ['min', 'max', 'count']
})
sessions.columns = ['start', 'end', 'event_count']
sessions['duration'] = (sessions['end'] - sessions['start']).dt.total_seconds() / 60
return {
'login_frequency_7d': logins_7d,
'login_frequency_30d': logins_30d,
'login_frequency_trend': login_trend,
'sessions_per_login': len(sessions) / max(logins_30d, 1),
'avg_session_duration_minutes': sessions['duration'].mean() or 0,
'session_duration_trend': self._compute_duration_trend(sessions)
}
def _compute_activity_features(self, events: pd.DataFrame) -> Dict:
"""Calculate activity-related features"""
feature_events = events[events['event_type'] == 'feature_use']
unique_features = feature_events['feature_name'].nunique()
total_features = 25 # Total features in product
core_features = ['dashboard', 'reports', 'export', 'integration']
core_usage = feature_events[
(feature_events['feature_name'].isin(core_features)) &
(feature_events['timestamp'] >= self.now - timedelta(days=7))
].shape[0]
sessions = events.groupby('session_id').size()
return {
'features_used_count': unique_features,
'feature_usage_breadth': unique_features / total_features,
'core_feature_usage_7d': core_usage,
'actions_per_session': sessions.mean() if len(sessions) > 0 else 0
}
def _compute_derived_features(
self,
events: pd.DataFrame,
user: pd.Series
) -> Dict:
"""Calculate composite health and engagement scores"""
# Engagement score (0-100)
engagement_factors = {
'login_recency': max(0, 100 - self._days_since_last_login(events) * 5),
'session_frequency': min(100, self._sessions_last_7d(events) * 15),
'feature_breadth': self._feature_breadth(events) * 100,
'session_depth': min(100, self._avg_session_actions(events) * 10)
}
engagement_score = np.mean(list(engagement_factors.values()))
# Health score (0-100) - weighted composite
health_factors = {
'engagement': engagement_score * 0.3,
'support_sentiment': self._support_sentiment_score(user.user_id) * 0.2,
'payment_health': (100 - user.get('payment_failures', 0) * 20) * 0.2,
'growth_trend': self._growth_trend_score(events) * 0.3
}
health_score = sum(health_factors.values())
return {
'engagement_score': engagement_score,
'health_score': health_score
}
def batch_compute_features(self, user_ids: List[str]) -> pd.DataFrame:
"""Compute features for multiple users efficiently"""
features_list = []
for user_id in user_ids:
try:
features = self.compute_features(user_id)
features_list.append(features.__dict__)
except Exception as e:
print(f"Error computing features for {user_id}: {e}")
continue
return pd.DataFrame(features_list)
Training the Churn Model with scikit-learn
With features engineered, we can train a production-grade churn prediction model:
# churn_model.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import (
classification_report, roc_auc_score, precision_recall_curve,
average_precision_score
)
import joblib
from typing import Tuple, Dict, List
import shap
class ChurnPredictionModel:
"""Production-ready churn prediction model"""
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.label_encoders = {}
self.feature_names = []
self.shap_explainer = None
# Features to use
self.numeric_features = [
'login_frequency_7d', 'login_frequency_30d', 'login_frequency_trend',
'sessions_per_login', 'avg_session_duration_minutes', 'session_duration_trend',
'features_used_count', 'feature_usage_breadth', 'core_feature_usage_7d',
'actions_per_session', 'plan_value_monthly', 'days_since_signup',
'total_lifetime_value', 'payment_failures_count', 'support_tickets_30d',
'avg_ticket_resolution_hours', 'negative_feedback_count', 'nps_score',
'team_size', 'active_team_members_ratio', 'shared_content_count',
'days_since_last_login', 'days_since_last_core_action',
'engagement_score', 'health_score'
]
self.categorical_features = ['current_plan']
def prepare_features(
self,
df: pd.DataFrame,
fit: bool = False
) -> np.ndarray:
"""Prepare features for training or prediction"""
X_numeric = df[self.numeric_features].copy()
# Handle categorical features
X_categorical = pd.DataFrame()
for col in self.categorical_features:
if fit:
self.label_encoders[col] = LabelEncoder()
X_categorical[col] = self.label_encoders[col].fit_transform(
df[col].astype(str)
)
else:
X_categorical[col] = self.label_encoders[col].transform(
df[col].astype(str)
)
X = pd.concat([X_numeric, X_categorical], axis=1)
self.feature_names = list(X.columns)
# Handle missing values
X = X.fillna(X.median())
# Scale features
if fit:
X_scaled = self.scaler.fit_transform(X)
else:
X_scaled = self.scaler.transform(X)
return X_scaled
def train(
self,
df: pd.DataFrame,
target_col: str = 'churned'
) -> Dict:
"""Train the churn prediction model"""
# Prepare data
X = self.prepare_features(df, fit=True)
y = df[target_col].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train base model with hyperparameters tuned for churn
base_model = GradientBoostingClassifier(
n_estimators=200,
max_depth=5,
learning_rate=0.1,
min_samples_split=50,
min_samples_leaf=20,
subsample=0.8,
random_state=42
)
# Calibrate probabilities for accurate confidence scores
self.model = CalibratedClassifierCV(
base_model,
method='isotonic',
cv=5
)
self.model.fit(X_train, y_train)
# Evaluate
y_pred = self.model.predict(X_test)
y_proba = self.model.predict_proba(X_test)[:, 1]
# Calculate metrics
metrics = {
'roc_auc': roc_auc_score(y_test, y_proba),
'average_precision': average_precision_score(y_test, y_proba),
'classification_report': classification_report(y_test, y_pred),
'cross_val_scores': cross_val_score(
self.model, X, y, cv=5, scoring='roc_auc'
).tolist()
}
# Initialize SHAP explainer for feature importance
self.shap_explainer = shap.TreeExplainer(
base_model,
feature_names=self.feature_names
)
return metrics
def predict(self, df: pd.DataFrame) -> List[ChurnPrediction]:
"""Predict churn probability for users"""
X = self.prepare_features(df, fit=False)
probabilities = self.model.predict_proba(X)[:, 1]
predictions = []
for idx, (_, row) in enumerate(df.iterrows()):
prob = probabilities[idx]
# Get SHAP values for explanation
shap_values = self.shap_explainer.shap_values(X[idx:idx+1])
risk_factors = self._get_risk_factors(
shap_values[0] if isinstance(shap_values, list) else shap_values[0],
row
)
prediction = ChurnPrediction(
user_id=row['user_id'],
churn_probability=float(prob),
risk_level=self._get_risk_level(prob),
key_risk_factors=risk_factors,
recommended_actions=self._get_recommended_actions(risk_factors, prob),
predicted_churn_date=self._estimate_churn_date(prob, row)
)
predictions.append(prediction)
return predictions
def _get_risk_level(self, probability: float) -> str:
"""Categorize churn risk level"""
if probability < 0.2:
return 'low'
elif probability < 0.5:
return 'medium'
elif probability < 0.8:
return 'high'
return 'critical'
def _get_risk_factors(
self,
shap_values: np.ndarray,
row: pd.Series
) -> List[RiskFactor]:
"""Extract top risk factors from SHAP values"""
feature_importance = list(zip(self.feature_names, shap_values))
# Sort by absolute impact (negative SHAP = reduces churn)
sorted_factors = sorted(
feature_importance,
key=lambda x: abs(x[1]),
reverse=True
)[:5]
risk_factors = []
for feature, impact in sorted_factors:
risk_factors.append(RiskFactor(
factor=feature,
impact=float(impact),
description=self._get_factor_description(feature, impact),
current_value=float(row.get(feature, 0)),
risk_threshold=self._get_risk_threshold(feature)
))
return risk_factors
def _get_recommended_actions(
self,
risk_factors: List[RiskFactor],
probability: float
) -> List[RetentionAction]:
"""Generate recommended retention actions based on risk factors"""
actions = []
for factor in risk_factors:
if factor.impact > 0: # Positive impact = increases churn
if 'login' in factor.factor.lower():
actions.append(RetentionAction(
action='re_engagement_email',
priority='high' if probability > 0.5 else 'medium',
description='Send personalized re-engagement email with feature highlights',
expected_impact=0.15
))
elif 'support' in factor.factor.lower():
actions.append(RetentionAction(
action='customer_success_call',
priority='high',
description='Schedule proactive customer success check-in',
expected_impact=0.20
))
elif 'feature' in factor.factor.lower():
actions.append(RetentionAction(
action='feature_adoption_campaign',
priority='medium',
description='Trigger in-app feature discovery tour',
expected_impact=0.10
))
return actions[:3] # Top 3 actions
def save(self, path: str):
"""Save model artifacts"""
joblib.dump({
'model': self.model,
'scaler': self.scaler,
'label_encoders': self.label_encoders,
'feature_names': self.feature_names
}, path)
@classmethod
def load(cls, path: str) -> 'ChurnPredictionModel':
"""Load model from disk"""
instance = cls()
artifacts = joblib.load(path)
instance.model = artifacts['model']
instance.scaler = artifacts['scaler']
instance.label_encoders = artifacts['label_encoders']
instance.feature_names = artifacts['feature_names']
return instance
Time Series Forecasting with Prophet
Facebook Prophet excels at forecasting business metrics with strong seasonal patterns. It's particularly useful for predicting user activity, revenue trends, and capacity planning.
# time_series_forecasting.py
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
class UserActivityForecaster:
"""Forecast user activity metrics using Prophet"""
def __init__(self):
self.models: Dict[str, Prophet] = {}
self.metrics_config = {
'daily_active_users': {
'seasonality_mode': 'multiplicative',
'changepoint_prior_scale': 0.1,
'yearly_seasonality': True,
'weekly_seasonality': True,
'daily_seasonality': False
},
'revenue': {
'seasonality_mode': 'multiplicative',
'changepoint_prior_scale': 0.05,
'yearly_seasonality': True,
'weekly_seasonality': True,
'daily_seasonality': False
},
'signups': {
'seasonality_mode': 'additive',
'changepoint_prior_scale': 0.15,
'yearly_seasonality': True,
'weekly_seasonality': True,
'daily_seasonality': False
}
}
def prepare_data(self, df: pd.DataFrame, metric_col: str) -> pd.DataFrame:
"""Prepare data for Prophet (requires 'ds' and 'y' columns)"""
prophet_df = df[['date', metric_col]].copy()
prophet_df.columns = ['ds', 'y']
prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
return prophet_df
def train(
self,
df: pd.DataFrame,
metric: str,
holidays_df: pd.DataFrame = None,
custom_seasonalities: List[Dict] = None
) -> Dict:
"""Train Prophet model for a specific metric"""
# Prepare data
prophet_df = self.prepare_data(df, metric)
# Get config for this metric
config = self.metrics_config.get(metric, {
'seasonality_mode': 'additive',
'changepoint_prior_scale': 0.1
})
# Initialize Prophet with configuration
model = Prophet(
seasonality_mode=config.get('seasonality_mode', 'additive'),
changepoint_prior_scale=config.get('changepoint_prior_scale', 0.1),
yearly_seasonality=config.get('yearly_seasonality', True),
weekly_seasonality=config.get('weekly_seasonality', True),
daily_seasonality=config.get('daily_seasonality', False),
interval_width=0.95 # 95% confidence interval
)
# Add holidays if provided
if holidays_df is not None:
model = Prophet(
holidays=holidays_df,
**config
)
# Add custom seasonalities (e.g., monthly billing cycles)
if custom_seasonalities:
for seasonality in custom_seasonalities:
model.add_seasonality(**seasonality)
# Add regressor for external factors
# e.g., marketing spend, product launches
if 'marketing_spend' in df.columns:
prophet_df['marketing_spend'] = df['marketing_spend']
model.add_regressor('marketing_spend')
# Train model
model.fit(prophet_df)
self.models[metric] = model
# Cross-validation for performance metrics
cv_results = cross_validation(
model,
initial='365 days',
period='30 days',
horizon='90 days'
)
perf_metrics = performance_metrics(cv_results)
return {
'mape': perf_metrics['mape'].mean(),
'rmse': perf_metrics['rmse'].mean(),
'coverage': perf_metrics['coverage'].mean(),
'cv_results': cv_results
}
def forecast(
self,
metric: str,
periods: int = 90,
include_history: bool = False
) -> pd.DataFrame:
"""Generate forecast for specified periods ahead"""
if metric not in self.models:
raise ValueError(f"No model trained for metric: {metric}")
model = self.models[metric]
# Create future dataframe
future = model.make_future_dataframe(periods=periods)
# If we have regressors, need to provide future values
# This would typically come from planned marketing spend, etc.
# Generate forecast
forecast = model.predict(future)
if not include_history:
forecast = forecast.tail(periods)
return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper', 'trend']]
def get_components(self, metric: str) -> Dict:
"""Get forecast components (trend, seasonality, etc.)"""
if metric not in self.models:
raise ValueError(f"No model trained for metric: {metric}")
model = self.models[metric]
# Get the last forecast
future = model.make_future_dataframe(periods=30)
forecast = model.predict(future)
components = {
'trend': forecast[['ds', 'trend']],
'weekly': forecast[['ds', 'weekly']] if 'weekly' in forecast else None,
'yearly': forecast[['ds', 'yearly']] if 'yearly' in forecast else None
}
return components
def detect_anomalies(
self,
df: pd.DataFrame,
metric: str,
threshold: float = 0.95
) -> pd.DataFrame:
"""Detect anomalies by comparing actual vs predicted values"""
if metric not in self.models:
raise ValueError(f"No model trained for metric: {metric}")
model = self.models[metric]
prophet_df = self.prepare_data(df, metric)
# Predict on historical data
forecast = model.predict(prophet_df)
# Merge with actual values
result = prophet_df.merge(
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']],
on='ds'
)
# Flag anomalies (outside confidence interval)
result['is_anomaly'] = (
(result['y'] < result['yhat_lower']) |
(result['y'] > result['yhat_upper'])
)
# Calculate deviation
result['deviation'] = (result['y'] - result['yhat']) / result['yhat']
result['deviation_severity'] = np.abs(result['deviation'])
return result[result['is_anomaly']].sort_values(
'deviation_severity',
ascending=False
)
# Example: Forecasting with holidays and marketing spend
def create_holiday_dataframe() -> pd.DataFrame:
"""Create holiday dataframe for Prophet"""
holidays = pd.DataFrame({
'holiday': 'black_friday',
'ds': pd.to_datetime([
'2023-11-24', '2024-11-29', '2025-11-28'
]),
'lower_window': 0,
'upper_window': 3 # Effect lasts through Cyber Monday
})
# Add more holidays
christmas = pd.DataFrame({
'holiday': 'christmas_season',
'ds': pd.to_datetime([
'2023-12-20', '2024-12-20', '2025-12-20'
]),
'lower_window': -5,
'upper_window': 5
})
return pd.concat([holidays, christmas])
Conversion Probability Forecasting
Predicting which users will convert enables targeted engagement at the right moment. Here's a complete conversion prediction system:
# conversion_prediction.py
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from typing import Dict, List, Tuple
import xgboost as xgb
class ConversionPredictor:
"""Predict user conversion probability and optimal intervention timing"""
def __init__(self):
self.model = None
self.feature_pipeline = None
self.conversion_funnel_stages = [
'anonymous',
'registered',
'activated',
'engaged',
'converted'
]
def compute_funnel_features(self, user_events: pd.DataFrame) -> Dict:
"""Compute conversion funnel-specific features"""
features = {}
# Time in each funnel stage
stage_times = self._compute_stage_durations(user_events)
for stage, duration in stage_times.items():
features[f'{stage}_duration_hours'] = duration
# Actions per funnel stage
stage_actions = self._compute_stage_actions(user_events)
for stage, count in stage_actions.items():
features[f'{stage}_action_count'] = count
# Key conversion indicators
features['viewed_pricing_page'] = int(
'pricing_page_view' in user_events['event_type'].values
)
features['pricing_page_time_seconds'] = self._get_page_time(
user_events, 'pricing'
)
features['compared_plans'] = int(
'plan_comparison' in user_events['event_type'].values
)
features['started_trial'] = int(
'trial_started' in user_events['event_type'].values
)
features['trial_days_remaining'] = self._get_trial_days_remaining(
user_events
)
features['features_during_trial'] = self._count_trial_features(
user_events
)
# Engagement quality
features['return_visit_count'] = user_events['session_id'].nunique()
features['avg_session_depth'] = (
user_events.groupby('session_id').size().mean()
)
# Social proof interaction
features['viewed_testimonials'] = int(
'testimonials_view' in user_events['event_type'].values
)
features['viewed_case_studies'] = int(
'case_study_view' in user_events['event_type'].values
)
return features
def train(
self,
features_df: pd.DataFrame,
converted: pd.Series
) -> Dict:
"""Train conversion prediction model"""
# Use XGBoost for better handling of imbalanced data
self.model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.1,
scale_pos_weight=len(converted[converted==0]) / len(converted[converted==1]),
eval_metric='auc',
use_label_encoder=False,
random_state=42
)
# Create preprocessing pipeline
self.feature_pipeline = Pipeline([
('scaler', StandardScaler())
])
X = self.feature_pipeline.fit_transform(features_df)
y = converted.values
# Train with early stopping
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
self.model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=20,
verbose=False
)
# Return metrics
y_pred_proba = self.model.predict_proba(X_val)[:, 1]
return {
'auc_roc': roc_auc_score(y_val, y_pred_proba),
'feature_importance': dict(zip(
features_df.columns,
self.model.feature_importances_
))
}
def predict_conversion(
self,
user_features: pd.DataFrame
) -> List[ConversionPrediction]:
"""Predict conversion probability for users"""
X = self.feature_pipeline.transform(user_features)
probabilities = self.model.predict_proba(X)[:, 1]
predictions = []
for idx, (_, row) in enumerate(user_features.iterrows()):
prob = probabilities[idx]
predictions.append(ConversionPrediction(
user_id=row['user_id'],
conversion_probability=float(prob),
expected_conversion_time=self._estimate_conversion_time(row, prob),
optimal_touchpoint=self._determine_optimal_touchpoint(row, prob),
recommended_offer=self._recommend_offer(row, prob)
))
return predictions
def _determine_optimal_touchpoint(
self,
features: pd.Series,
probability: float
) -> str:
"""Determine the best intervention point"""
if features.get('trial_days_remaining', 0) <= 3:
return 'trial_expiry_reminder'
elif features.get('viewed_pricing_page', 0) and probability > 0.6:
return 'sales_outreach'
elif features.get('features_during_trial', 0) < 3:
return 'feature_discovery_email'
elif probability > 0.4:
return 'personalized_demo_offer'
else:
return 'educational_content'
def _recommend_offer(
self,
features: pd.Series,
probability: float
) -> Offer:
"""Recommend the best conversion offer"""
if probability > 0.7:
# High probability - minimal discount needed
return Offer(
type='standard',
discount_percent=0,
message='Start your subscription today'
)
elif probability > 0.5:
# Medium probability - small incentive
return Offer(
type='time_limited',
discount_percent=10,
message='Get 10% off if you subscribe this week',
valid_days=7
)
else:
# Lower probability - stronger incentive
return Offer(
type='extended_trial',
discount_percent=20,
message='Extend your trial by 14 days or get 20% off annual plan',
valid_days=14
)
Building a Real-Time Prediction API
Production systems need real-time predictions served via APIs. Here's a complete FastAPI implementation with caching and monitoring:
# prediction_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import redis
import json
from datetime import datetime, timedelta
import asyncio
from prometheus_client import Counter, Histogram, generate_latest
app = FastAPI(title="Predictive Analytics API")
# Metrics
prediction_counter = Counter(
'predictions_total',
'Total predictions made',
['model_type', 'risk_level']
)
prediction_latency = Histogram(
'prediction_latency_seconds',
'Prediction latency in seconds',
['model_type']
)
# Redis cache
redis_client = redis.Redis(host='localhost', port=6379, db=0)
CACHE_TTL = 300 # 5 minutes
# Load models at startup
churn_model = ChurnPredictionModel.load('models/churn_model.joblib')
conversion_model = ConversionPredictor.load('models/conversion_model.joblib')
class UserPredictionRequest(BaseModel):
user_id: str
include_explanations: bool = True
class BatchPredictionRequest(BaseModel):
user_ids: List[str]
model_type: str # 'churn' or 'conversion'
class PredictionResponse(BaseModel):
user_id: str
prediction_type: str
probability: float
risk_level: str
risk_factors: Optional[List[dict]]
recommended_actions: Optional[List[dict]]
cached: bool
timestamp: str
@app.get("/health")
async def health_check():
return {"status": "healthy", "models_loaded": True}
@app.post("/predict/churn", response_model=PredictionResponse)
async def predict_churn(request: UserPredictionRequest):
"""Get churn prediction for a single user"""
cache_key = f"churn:{request.user_id}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
response = json.loads(cached)
response['cached'] = True
return response
# Get user features
with prediction_latency.labels(model_type='churn').time():
features = await get_user_features(request.user_id)
if features is None:
raise HTTPException(status_code=404, detail="User not found")
# Make prediction
prediction = churn_model.predict(features)[0]
response = PredictionResponse(
user_id=request.user_id,
prediction_type='churn',
probability=prediction.churn_probability,
risk_level=prediction.risk_level,
risk_factors=[rf.__dict__ for rf in prediction.key_risk_factors]
if request.include_explanations else None,
recommended_actions=[ra.__dict__ for ra in prediction.recommended_actions]
if request.include_explanations else None,
cached=False,
timestamp=datetime.utcnow().isoformat()
)
# Update metrics
prediction_counter.labels(
model_type='churn',
risk_level=prediction.risk_level
).inc()
# Cache result
redis_client.setex(
cache_key,
CACHE_TTL,
json.dumps(response.dict())
)
return response
@app.post("/predict/conversion", response_model=PredictionResponse)
async def predict_conversion(request: UserPredictionRequest):
"""Get conversion prediction for a single user"""
cache_key = f"conversion:{request.user_id}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
response = json.loads(cached)
response['cached'] = True
return response
with prediction_latency.labels(model_type='conversion').time():
features = await get_user_features(request.user_id, for_conversion=True)
if features is None:
raise HTTPException(status_code=404, detail="User not found")
prediction = conversion_model.predict_conversion(features)[0]
response = PredictionResponse(
user_id=request.user_id,
prediction_type='conversion',
probability=prediction.conversion_probability,
risk_level=_get_conversion_urgency(prediction.conversion_probability),
risk_factors=None,
recommended_actions=[{
'touchpoint': prediction.optimal_touchpoint,
'offer': prediction.recommended_offer.__dict__
}] if request.include_explanations else None,
cached=False,
timestamp=datetime.utcnow().isoformat()
)
prediction_counter.labels(
model_type='conversion',
risk_level=response.risk_level
).inc()
redis_client.setex(cache_key, CACHE_TTL, json.dumps(response.dict()))
return response
@app.post("/predict/batch")
async def batch_predict(
request: BatchPredictionRequest,
background_tasks: BackgroundTasks
):
"""Batch prediction for multiple users (async processing)"""
job_id = f"batch_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{len(request.user_ids)}"
# Store job status
redis_client.hset(f"job:{job_id}", mapping={
'status': 'processing',
'total': len(request.user_ids),
'completed': 0,
'model_type': request.model_type
})
# Process in background
background_tasks.add_task(
process_batch_predictions,
job_id,
request.user_ids,
request.model_type
)
return {
"job_id": job_id,
"status": "processing",
"status_url": f"/jobs/{job_id}"
}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
"""Check batch prediction job status"""
job_data = redis_client.hgetall(f"job:{job_id}")
if not job_data:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job_data.get(b'status', b'unknown').decode(),
"progress": {
"completed": int(job_data.get(b'completed', 0)),
"total": int(job_data.get(b'total', 0))
}
}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(
generate_latest(),
media_type="text/plain"
)
async def process_batch_predictions(
job_id: str,
user_ids: List[str],
model_type: str
):
"""Background task for batch predictions"""
results = []
for i, user_id in enumerate(user_ids):
try:
features = await get_user_features(user_id)
if model_type == 'churn':
prediction = churn_model.predict(features)[0]
results.append({
'user_id': user_id,
'probability': prediction.churn_probability,
'risk_level': prediction.risk_level
})
else:
prediction = conversion_model.predict_conversion(features)[0]
results.append({
'user_id': user_id,
'probability': prediction.conversion_probability
})
# Update progress
redis_client.hset(f"job:{job_id}", 'completed', i + 1)
except Exception as e:
results.append({
'user_id': user_id,
'error': str(e)
})
# Store results
redis_client.hset(f"job:{job_id}", mapping={
'status': 'completed',
'results': json.dumps(results)
})
redis_client.expire(f"job:{job_id}", 3600) # Keep for 1 hour
async def get_user_features(
user_id: str,
for_conversion: bool = False
) -> Optional[pd.DataFrame]:
"""Fetch user features from feature store"""
# In production, this would query your feature store
# (e.g., Feast, Tecton, or custom solution)
pass
User Journey Optimization
Optimizing user journeys requires understanding the paths users take and predicting optimal next actions:
// user-journey-optimizer.ts
import * as tf from '@tensorflow/tfjs-node';
interface JourneyStep {
userId: string;
stepIndex: number;
action: string;
timestamp: number;
metadata: Record;
}
interface JourneyPrediction {
userId: string;
currentStep: string;
predictedNextSteps: Array<{
action: string;
probability: number;
}>;
optimalPath: string[];
dropoffRisk: number;
recommendedIntervention?: Intervention;
}
interface Intervention {
type: 'tooltip' | 'modal' | 'email' | 'notification';
content: string;
triggerCondition: string;
expectedImpact: number;
}
class UserJourneyOptimizer {
private model: tf.LayersModel | null = null;
private actionEncoder: Map = new Map();
private actionDecoder: Map = new Map();
private sequenceLength: number = 10;
private embeddingDim: number = 32;
async initialize(): Promise {
// Load pre-trained model or create new one
try {
this.model = await tf.loadLayersModel('file://models/journey-model/model.json');
} catch {
this.model = this.buildModel();
}
}
private buildModel(): tf.LayersModel {
// LSTM-based sequence model for next action prediction
const input = tf.input({ shape: [this.sequenceLength, 1] });
// Embedding layer for actions
let x = tf.layers.embedding({
inputDim: 100, // Max unique actions
outputDim: this.embeddingDim
}).apply(input) as tf.SymbolicTensor;
// Reshape for LSTM
x = tf.layers.reshape({
targetShape: [this.sequenceLength, this.embeddingDim]
}).apply(x) as tf.SymbolicTensor;
// Bidirectional LSTM for better context understanding
x = tf.layers.bidirectional({
layer: tf.layers.lstm({
units: 64,
returnSequences: true,
dropout: 0.2,
recurrentDropout: 0.2
})
}).apply(x) as tf.SymbolicTensor;
x = tf.layers.bidirectional({
layer: tf.layers.lstm({
units: 32,
returnSequences: false,
dropout: 0.2
})
}).apply(x) as tf.SymbolicTensor;
// Dense layers
x = tf.layers.dense({ units: 64, activation: 'relu' }).apply(x) as tf.SymbolicTensor;
x = tf.layers.dropout({ rate: 0.3 }).apply(x) as tf.SymbolicTensor;
// Output: probability distribution over next actions
const output = tf.layers.dense({
units: 100, // Number of possible actions
activation: 'softmax'
}).apply(x) as tf.SymbolicTensor;
const model = tf.model({ inputs: input, outputs: output });
model.compile({
optimizer: tf.train.adam(0.001),
loss: 'categoricalCrossentropy',
metrics: ['accuracy']
});
return model;
}
async predictNextSteps(
userId: string,
recentActions: string[]
): Promise {
if (!this.model) {
throw new Error('Model not initialized');
}
// Encode actions to numbers
const encodedSequence = recentActions
.slice(-this.sequenceLength)
.map(action => this.encodeAction(action));
// Pad sequence if needed
while (encodedSequence.length < this.sequenceLength) {
encodedSequence.unshift(0); // Pad with zeros
}
// Predict
const inputTensor = tf.tensor3d([encodedSequence.map(x => [x])]);
const predictions = this.model.predict(inputTensor) as tf.Tensor;
const probs = await predictions.data();
inputTensor.dispose();
predictions.dispose();
// Get top 5 predicted actions
const actionProbs = Array.from(probs)
.map((prob, idx) => ({ action: this.decodeAction(idx), probability: prob }))
.filter(ap => ap.action !== 'unknown')
.sort((a, b) => b.probability - a.probability)
.slice(0, 5);
// Calculate dropoff risk
const dropoffRisk = this.calculateDropoffRisk(recentActions, actionProbs);
// Determine optimal path
const optimalPath = await this.computeOptimalPath(
recentActions[recentActions.length - 1],
'conversion' // Target state
);
return {
userId,
currentStep: recentActions[recentActions.length - 1],
predictedNextSteps: actionProbs,
optimalPath,
dropoffRisk,
recommendedIntervention: dropoffRisk > 0.5
? this.getIntervention(recentActions, dropoffRisk)
: undefined
};
}
private calculateDropoffRisk(
recentActions: string[],
predictions: Array<{ action: string; probability: number }>
): number {
// High-risk indicators
const riskSignals = {
stagnation: this.detectStagnation(recentActions),
backtracking: this.detectBacktracking(recentActions),
exitIntent: predictions.some(p =>
['exit', 'close', 'abandon'].includes(p.action) && p.probability > 0.2
),
lowEngagement: recentActions.length < 3
};
// Weighted risk score
return (
riskSignals.stagnation * 0.3 +
riskSignals.backtracking * 0.25 +
(riskSignals.exitIntent ? 0.35 : 0) +
(riskSignals.lowEngagement ? 0.1 : 0)
);
}
private detectStagnation(actions: string[]): number {
// Check if user is repeating same actions
const uniqueActions = new Set(actions.slice(-5));
return 1 - (uniqueActions.size / Math.min(5, actions.length));
}
private detectBacktracking(actions: string[]): number {
// Check if user is going back in the funnel
const funnelOrder = ['landing', 'features', 'pricing', 'signup', 'onboarding'];
let backtrackCount = 0;
for (let i = 1; i < actions.length; i++) {
const prevIdx = funnelOrder.indexOf(actions[i - 1]);
const currIdx = funnelOrder.indexOf(actions[i]);
if (prevIdx > -1 && currIdx > -1 && currIdx < prevIdx) {
backtrackCount++;
}
}
return Math.min(1, backtrackCount / actions.length);
}
private getIntervention(
actions: string[],
dropoffRisk: number
): Intervention {
const lastAction = actions[actions.length - 1];
// Context-aware interventions
if (lastAction === 'pricing' && dropoffRisk > 0.6) {
return {
type: 'modal',
content: 'Have questions about pricing? Chat with us!',
triggerCondition: 'pricing_page_dwell > 30s',
expectedImpact: 0.15
};
}
if (this.detectStagnation(actions) > 0.5) {
return {
type: 'tooltip',
content: 'Not sure what to do next? Try our guided tour.',
triggerCondition: 'idle_time > 10s',
expectedImpact: 0.12
};
}
return {
type: 'notification',
content: 'Need help? Our support team is here for you.',
triggerCondition: 'session_time > 120s && no_conversion',
expectedImpact: 0.08
};
}
async computeOptimalPath(
currentState: string,
targetState: string
): Promise {
// Use beam search to find optimal path
const beamWidth = 5;
const maxDepth = 10;
interface PathState {
path: string[];
probability: number;
}
let beam: PathState[] = [{
path: [currentState],
probability: 1.0
}];
for (let depth = 0; depth < maxDepth; depth++) {
const candidates: PathState[] = [];
for (const state of beam) {
if (state.path[state.path.length - 1] === targetState) {
candidates.push(state);
continue;
}
// Get predictions for current path
const prediction = await this.predictNextSteps(
'optimizer',
state.path
);
for (const nextStep of prediction.predictedNextSteps) {
candidates.push({
path: [...state.path, nextStep.action],
probability: state.probability * nextStep.probability
});
}
}
// Keep top beamWidth candidates
beam = candidates
.sort((a, b) => b.probability - a.probability)
.slice(0, beamWidth);
// Early exit if all paths reached target
if (beam.every(s => s.path[s.path.length - 1] === targetState)) {
break;
}
}
return beam[0]?.path || [currentState];
}
private encodeAction(action: string): number {
if (!this.actionEncoder.has(action)) {
const id = this.actionEncoder.size + 1;
this.actionEncoder.set(action, id);
this.actionDecoder.set(id, action);
}
return this.actionEncoder.get(action)!;
}
private decodeAction(id: number): string {
return this.actionDecoder.get(id) || 'unknown';
}
}
Anomaly Detection in User Behavior
Detecting unusual patterns helps identify fraud, system issues, or opportunities. Here's an anomaly detection system using Isolation Forest and statistical methods:
# anomaly_detection.py
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
from typing import List, Dict, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class Anomaly:
timestamp: datetime
metric: str
actual_value: float
expected_value: float
deviation_score: float
anomaly_type: str # 'spike', 'drop', 'pattern_change'
severity: str # 'low', 'medium', 'high', 'critical'
context: Dict
class AnomalyDetector:
"""Multi-method anomaly detection for user behavior"""
def __init__(self):
self.isolation_forest = IsolationForest(
contamination=0.05, # Expected 5% anomalies
random_state=42,
n_estimators=200
)
self.scaler = StandardScaler()
self.baseline_stats: Dict[str, Dict] = {}
def fit(self, df: pd.DataFrame, metric_columns: List[str]):
"""Fit detector on historical data"""
# Scale features
X = self.scaler.fit_transform(df[metric_columns])
# Train Isolation Forest
self.isolation_forest.fit(X)
# Compute baseline statistics for each metric
for col in metric_columns:
self.baseline_stats[col] = {
'mean': df[col].mean(),
'std': df[col].std(),
'median': df[col].median(),
'q1': df[col].quantile(0.25),
'q3': df[col].quantile(0.75),
'iqr': df[col].quantile(0.75) - df[col].quantile(0.25),
'hourly_pattern': df.groupby(df['timestamp'].dt.hour)[col].mean().to_dict(),
'daily_pattern': df.groupby(df['timestamp'].dt.dayofweek)[col].mean().to_dict()
}
def detect(
self,
df: pd.DataFrame,
metric_columns: List[str]
) -> List[Anomaly]:
"""Detect anomalies using multiple methods"""
anomalies = []
# Method 1: Isolation Forest for multivariate anomalies
X = self.scaler.transform(df[metric_columns])
if_predictions = self.isolation_forest.predict(X)
if_scores = self.isolation_forest.decision_function(X)
for idx, (prediction, score) in enumerate(zip(if_predictions, if_scores)):
if prediction == -1: # Anomaly
row = df.iloc[idx]
# Find which metric is most anomalous
most_anomalous = self._find_most_anomalous_metric(
row, metric_columns
)
anomalies.append(Anomaly(
timestamp=row['timestamp'],
metric=most_anomalous['metric'],
actual_value=most_anomalous['value'],
expected_value=most_anomalous['expected'],
deviation_score=abs(score),
anomaly_type=most_anomalous['type'],
severity=self._get_severity(abs(score)),
context={'isolation_forest_score': float(score)}
))
# Method 2: Statistical anomalies (z-score based)
for col in metric_columns:
stats_anomalies = self._detect_statistical_anomalies(df, col)
anomalies.extend(stats_anomalies)
# Method 3: Pattern-based anomalies
for col in metric_columns:
pattern_anomalies = self._detect_pattern_anomalies(df, col)
anomalies.extend(pattern_anomalies)
# Deduplicate and rank
return self._deduplicate_anomalies(anomalies)
def _detect_statistical_anomalies(
self,
df: pd.DataFrame,
metric: str
) -> List[Anomaly]:
"""Detect anomalies using z-scores and IQR"""
anomalies = []
stats = self.baseline_stats[metric]
for idx, row in df.iterrows():
value = row[metric]
# Z-score method
z_score = (value - stats['mean']) / stats['std'] if stats['std'] > 0 else 0
# IQR method
lower_bound = stats['q1'] - 1.5 * stats['iqr']
upper_bound = stats['q3'] + 1.5 * stats['iqr']
if abs(z_score) > 3 or value < lower_bound or value > upper_bound:
anomaly_type = 'spike' if value > stats['mean'] else 'drop'
anomalies.append(Anomaly(
timestamp=row['timestamp'],
metric=metric,
actual_value=value,
expected_value=stats['mean'],
deviation_score=abs(z_score),
anomaly_type=anomaly_type,
severity=self._get_severity(abs(z_score)),
context={
'z_score': float(z_score),
'lower_bound': float(lower_bound),
'upper_bound': float(upper_bound)
}
))
return anomalies
def _detect_pattern_anomalies(
self,
df: pd.DataFrame,
metric: str
) -> List[Anomaly]:
"""Detect anomalies based on hourly/daily patterns"""
anomalies = []
stats = self.baseline_stats[metric]
for idx, row in df.iterrows():
value = row[metric]
hour = row['timestamp'].hour
day = row['timestamp'].dayofweek
# Expected value based on time patterns
hourly_expected = stats['hourly_pattern'].get(hour, stats['mean'])
daily_expected = stats['daily_pattern'].get(day, stats['mean'])
expected = (hourly_expected + daily_expected) / 2
# Check deviation from expected pattern
if stats['std'] > 0:
pattern_z = abs(value - expected) / stats['std']
if pattern_z > 2.5: # Significant deviation from pattern
anomalies.append(Anomaly(
timestamp=row['timestamp'],
metric=metric,
actual_value=value,
expected_value=expected,
deviation_score=pattern_z,
anomaly_type='pattern_change',
severity=self._get_severity(pattern_z),
context={
'hour': hour,
'day': day,
'hourly_expected': float(hourly_expected),
'daily_expected': float(daily_expected)
}
))
return anomalies
def _get_severity(self, score: float) -> str:
"""Map deviation score to severity level"""
if score > 5:
return 'critical'
elif score > 3:
return 'high'
elif score > 2:
return 'medium'
return 'low'
def _find_most_anomalous_metric(
self,
row: pd.Series,
metrics: List[str]
) -> Dict:
"""Find which metric contributed most to anomaly"""
max_deviation = 0
result = None
for metric in metrics:
stats = self.baseline_stats[metric]
value = row[metric]
expected = stats['mean']
if stats['std'] > 0:
deviation = abs(value - expected) / stats['std']
if deviation > max_deviation:
max_deviation = deviation
result = {
'metric': metric,
'value': value,
'expected': expected,
'type': 'spike' if value > expected else 'drop'
}
return result or {'metric': metrics[0], 'value': 0, 'expected': 0, 'type': 'unknown'}
def _deduplicate_anomalies(
self,
anomalies: List[Anomaly]
) -> List[Anomaly]:
"""Remove duplicate anomalies, keeping highest severity"""
seen = {}
for anomaly in anomalies:
key = (anomaly.timestamp, anomaly.metric)
if key not in seen or self._severity_rank(anomaly.severity) > \
self._severity_rank(seen[key].severity):
seen[key] = anomaly
return sorted(
seen.values(),
key=lambda a: (self._severity_rank(a.severity), a.deviation_score),
reverse=True
)
def _severity_rank(self, severity: str) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(severity, 0)
Key Takeaways
Remember These Points
- Feature engineering is critical: The quality of your predictive models depends heavily on well-crafted features that capture user behavior patterns
- Use calibrated probabilities: Ensure your churn and conversion scores are well-calibrated for accurate risk assessment and intervention planning
- Prophet excels at time series: Use Facebook Prophet for forecasting metrics with seasonal patterns, holidays, and external regressors
- Explain predictions with SHAP: Provide interpretable risk factors using SHAP values to enable actionable interventions
- Build real-time prediction APIs: Use FastAPI with Redis caching for low-latency predictions in production
- Optimize user journeys proactively: Predict next actions and identify dropoff risk to intervene before users leave
- Detect anomalies with multiple methods: Combine Isolation Forest, statistical methods, and pattern analysis for comprehensive anomaly detection
- Monitor model performance continuously: Track prediction accuracy and retrain models as user behavior evolves
Conclusion
Predictive analytics transforms web applications from reactive to proactive systems. By implementing churn prediction, conversion forecasting, and user journey optimization, SaaS companies routinely achieve 25% churn reduction and 40% conversion improvements. The combination of Prophet for time series forecasting and scikit-learn for classification provides a robust foundation for production-grade predictive systems.
The key to success is starting with high-quality feature engineering, building explainable models that enable action, and deploying real-time prediction APIs that integrate seamlessly with your product. For deeper learning, explore Andrew Ng's Machine Learning course and Google's ML Crash Course.
Remember that predictive models require continuous monitoring and retraining as user behavior evolves. Implement drift detection, track prediction accuracy over time, and establish automated retraining pipelines. The investment in predictive analytics infrastructure pays dividends in customer retention, conversion optimization, and data-driven decision making.