JK
JustKalm
Machine Learning

ML Operations

End-to-end MLOps platform with automated training pipelines, model registry, feature store, and production monitoring serving 50M+ predictions daily.

12

Models in Production

52M

Daily Predictions

23ms

Avg Inference

94.7%

Model Accuracy

Kubeflow Pipeline

Automated ML pipelines with Kubeflow, featuring data validation, hyperparameter tuning, and model validation.

# pipelines/valuation_model.py
from kfp import dsl
from kfp.components import create_component_from_func
from typing import NamedTuple

@dsl.pipeline(
    name="Valuation Model Training",
    description="End-to-end training pipeline for product valuation"
)
def valuation_training_pipeline(
    training_data_path: str,
    model_name: str = "valuation-model",
    experiment_name: str = "valuation-experiments",
    target_metric: str = "mae",
    max_trials: int = 50,
):
    # Step 1: Data validation
    validate_data = data_validation_op(
        input_path=training_data_path,
        schema_path="gs://justkalm-ml/schemas/valuation.json",
        anomaly_threshold=0.01
    )
    
    # Step 2: Feature engineering
    features = feature_engineering_op(
        input_data=validate_data.outputs["validated_data"],
        feature_config="gs://justkalm-ml/configs/valuation_features.yaml"
    ).after(validate_data)
    
    # Step 3: Data split
    splits = train_test_split_op(
        features=features.outputs["features"],
        test_size=0.15,
        validation_size=0.15,
        stratify_column="brand"
    ).after(features)
    
    # Step 4: Hyperparameter tuning with Katib
    tuning = hyperparameter_tuning_op(
        train_data=splits.outputs["train"],
        val_data=splits.outputs["validation"],
        algorithm="bayesian",
        objective=target_metric,
        max_trials=max_trials,
        parallel_trials=5,
        param_space={
            "learning_rate": {"min": 0.001, "max": 0.1, "type": "double"},
            "n_estimators": {"min": 100, "max": 1000, "type": "int"},
            "max_depth": {"min": 3, "max": 15, "type": "int"},
            "min_samples_leaf": {"min": 1, "max": 50, "type": "int"},
        }
    ).after(splits)
    
    # Step 5: Train final model with best params
    training = model_training_op(
        train_data=splits.outputs["train"],
        val_data=splits.outputs["validation"],
        hyperparameters=tuning.outputs["best_params"],
        model_type="xgboost"
    ).after(tuning).set_gpu_limit(1)
    
    # Step 6: Model evaluation
    evaluation = model_evaluation_op(
        model=training.outputs["model"],
        test_data=splits.outputs["test"],
        metrics=["mae", "rmse", "mape", "r2"],
        baseline_model="gs://justkalm-ml/models/valuation/production/model.pkl"
    ).after(training)
    
    # Step 7: Conditional deployment
    with dsl.Condition(
        evaluation.outputs["improvement"] > 0.02,
        name="deploy-if-improved"
    ):
        register = model_registry_op(
            model=training.outputs["model"],
            model_name=model_name,
            metrics=evaluation.outputs["metrics"],
            experiment=experiment_name
        )
        
        deploy = model_deployment_op(
            model_uri=register.outputs["model_uri"],
            endpoint_name=f"{model_name}-endpoint",
            traffic_percentage=10  # Start with 10% canary
        ).after(register)

Experiment Tracking

# experiments/tracking.py
import mlflow
from mlflow.tracking import MlflowClient
from dataclasses import dataclass
from typing import Dict, Any, Optional

@dataclass
class ExperimentConfig:
    name: str
    tracking_uri: str = "https://mlflow.justkalm.com"
    artifact_location: str = "s3://justkalm-ml/artifacts"
    
class ExperimentTracker:
    """
    MLflow-based experiment tracking with automatic
    logging, artifact storage, and model comparison.
    """
    
    def __init__(self, config: ExperimentConfig):
        self.config = config
        mlflow.set_tracking_uri(config.tracking_uri)
        self.client = MlflowClient()
        
        # Create or get experiment
        experiment = mlflow.get_experiment_by_name(config.name)
        if experiment is None:
            self.experiment_id = mlflow.create_experiment(
                config.name,
                artifact_location=config.artifact_location
            )
        else:
            self.experiment_id = experiment.experiment_id
    
    def start_run(
        self,
        run_name: str,
        tags: Optional[Dict[str, str]] = None
    ):
        """Start a new experiment run."""
        
        return mlflow.start_run(
            experiment_id=self.experiment_id,
            run_name=run_name,
            tags={
                "framework": "pytorch",
                "team": "ml-platform",
                **(tags or {})
            }
        )
    
    def log_training(
        self,
        model,
        params: Dict[str, Any],
        metrics: Dict[str, float],
        artifacts: Dict[str, str] = None
    ):
        """Log training run with all artifacts."""
        
        # Log parameters
        mlflow.log_params(params)
        
        # Log metrics
        mlflow.log_metrics(metrics)
        
        # Log model with signature
        signature = mlflow.models.infer_signature(
            self.X_train[:100],
            model.predict(self.X_train[:100])
        )
        
        mlflow.sklearn.log_model(
            model,
            "model",
            signature=signature,
            registered_model_name=self.config.name
        )
        
        # Log additional artifacts
        if artifacts:
            for name, path in artifacts.items():
                mlflow.log_artifact(path, name)
        
        # Log feature importance
        if hasattr(model, 'feature_importances_'):
            importance_df = pd.DataFrame({
                'feature': self.feature_names,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)
            
            mlflow.log_table(importance_df, "feature_importance.json")
    
    def compare_runs(
        self,
        metric: str = "mae",
        top_n: int = 5
    ) -> pd.DataFrame:
        """Compare top runs by metric."""
        
        runs = mlflow.search_runs(
            experiment_ids=[self.experiment_id],
            order_by=[f"metrics.{metric} ASC"],
            max_results=top_n
        )
        
        return runs[[
            "run_id", "run_name", 
            f"metrics.{metric}", "metrics.r2",
            "params.learning_rate", "params.n_estimators"
        ]]

Pipeline Stages

Validate
2m
Features
8m
Split
1m
Tune
45m
Train
12m
Evaluate
3m
Deploy
5m

Enterprise ML at Scale

From experiment to production in hours, not weeks.

12 Production Models52M Predictions/Day23ms P50 Latency