Machine Learning
ML Operations
End-to-end MLOps platform with automated training pipelines, model registry, feature store, and production monitoring serving 50M+ predictions daily.
12
Models in Production
52M
Daily Predictions
23ms
Avg Inference
94.7%
Model Accuracy
Kubeflow Pipeline
Automated ML pipelines with Kubeflow, featuring data validation, hyperparameter tuning, and model validation.
# pipelines/valuation_model.py
from kfp import dsl
from kfp.components import create_component_from_func
from typing import NamedTuple
@dsl.pipeline(
name="Valuation Model Training",
description="End-to-end training pipeline for product valuation"
)
def valuation_training_pipeline(
training_data_path: str,
model_name: str = "valuation-model",
experiment_name: str = "valuation-experiments",
target_metric: str = "mae",
max_trials: int = 50,
):
# Step 1: Data validation
validate_data = data_validation_op(
input_path=training_data_path,
schema_path="gs://justkalm-ml/schemas/valuation.json",
anomaly_threshold=0.01
)
# Step 2: Feature engineering
features = feature_engineering_op(
input_data=validate_data.outputs["validated_data"],
feature_config="gs://justkalm-ml/configs/valuation_features.yaml"
).after(validate_data)
# Step 3: Data split
splits = train_test_split_op(
features=features.outputs["features"],
test_size=0.15,
validation_size=0.15,
stratify_column="brand"
).after(features)
# Step 4: Hyperparameter tuning with Katib
tuning = hyperparameter_tuning_op(
train_data=splits.outputs["train"],
val_data=splits.outputs["validation"],
algorithm="bayesian",
objective=target_metric,
max_trials=max_trials,
parallel_trials=5,
param_space={
"learning_rate": {"min": 0.001, "max": 0.1, "type": "double"},
"n_estimators": {"min": 100, "max": 1000, "type": "int"},
"max_depth": {"min": 3, "max": 15, "type": "int"},
"min_samples_leaf": {"min": 1, "max": 50, "type": "int"},
}
).after(splits)
# Step 5: Train final model with best params
training = model_training_op(
train_data=splits.outputs["train"],
val_data=splits.outputs["validation"],
hyperparameters=tuning.outputs["best_params"],
model_type="xgboost"
).after(tuning).set_gpu_limit(1)
# Step 6: Model evaluation
evaluation = model_evaluation_op(
model=training.outputs["model"],
test_data=splits.outputs["test"],
metrics=["mae", "rmse", "mape", "r2"],
baseline_model="gs://justkalm-ml/models/valuation/production/model.pkl"
).after(training)
# Step 7: Conditional deployment
with dsl.Condition(
evaluation.outputs["improvement"] > 0.02,
name="deploy-if-improved"
):
register = model_registry_op(
model=training.outputs["model"],
model_name=model_name,
metrics=evaluation.outputs["metrics"],
experiment=experiment_name
)
deploy = model_deployment_op(
model_uri=register.outputs["model_uri"],
endpoint_name=f"{model_name}-endpoint",
traffic_percentage=10 # Start with 10% canary
).after(register)Experiment Tracking
# experiments/tracking.py
import mlflow
from mlflow.tracking import MlflowClient
from dataclasses import dataclass
from typing import Dict, Any, Optional
@dataclass
class ExperimentConfig:
name: str
tracking_uri: str = "https://mlflow.justkalm.com"
artifact_location: str = "s3://justkalm-ml/artifacts"
class ExperimentTracker:
"""
MLflow-based experiment tracking with automatic
logging, artifact storage, and model comparison.
"""
def __init__(self, config: ExperimentConfig):
self.config = config
mlflow.set_tracking_uri(config.tracking_uri)
self.client = MlflowClient()
# Create or get experiment
experiment = mlflow.get_experiment_by_name(config.name)
if experiment is None:
self.experiment_id = mlflow.create_experiment(
config.name,
artifact_location=config.artifact_location
)
else:
self.experiment_id = experiment.experiment_id
def start_run(
self,
run_name: str,
tags: Optional[Dict[str, str]] = None
):
"""Start a new experiment run."""
return mlflow.start_run(
experiment_id=self.experiment_id,
run_name=run_name,
tags={
"framework": "pytorch",
"team": "ml-platform",
**(tags or {})
}
)
def log_training(
self,
model,
params: Dict[str, Any],
metrics: Dict[str, float],
artifacts: Dict[str, str] = None
):
"""Log training run with all artifacts."""
# Log parameters
mlflow.log_params(params)
# Log metrics
mlflow.log_metrics(metrics)
# Log model with signature
signature = mlflow.models.infer_signature(
self.X_train[:100],
model.predict(self.X_train[:100])
)
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
registered_model_name=self.config.name
)
# Log additional artifacts
if artifacts:
for name, path in artifacts.items():
mlflow.log_artifact(path, name)
# Log feature importance
if hasattr(model, 'feature_importances_'):
importance_df = pd.DataFrame({
'feature': self.feature_names,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
mlflow.log_table(importance_df, "feature_importance.json")
def compare_runs(
self,
metric: str = "mae",
top_n: int = 5
) -> pd.DataFrame:
"""Compare top runs by metric."""
runs = mlflow.search_runs(
experiment_ids=[self.experiment_id],
order_by=[f"metrics.{metric} ASC"],
max_results=top_n
)
return runs[[
"run_id", "run_name",
f"metrics.{metric}", "metrics.r2",
"params.learning_rate", "params.n_estimators"
]]Pipeline Stages
Validate
Features
Split
Tune
Train
Evaluate
Deploy
Enterprise ML at Scale
From experiment to production in hours, not weeks.
12 Production Models52M Predictions/Day23ms P50 Latency