JK
JustKalm
Data Infrastructure

Data Platform

Modern data stack with lakehouse architecture, real-time ETL pipelines, and self-service analytics processing 50TB+ of fashion market intelligence daily.

2.8B

Daily Ingestion

<3s

Query Latency P95

5min

Data Freshness

127

Active Dashboards

Lakehouse Architecture

Delta Lake on S3 with ACID transactions, schema evolution, and time travel for reliable data management at scale.

# data_platform/lakehouse.py
from delta import DeltaTable
from pyspark.sql import SparkSession
from dataclasses import dataclass
from enum import Enum

class DataZone(Enum):
    BRONZE = "bronze"    # Raw ingested data
    SILVER = "silver"    # Cleaned & validated
    GOLD = "gold"        # Business aggregates

@dataclass
class LakehouseConfig:
    bucket: str = "s3://justkalm-data-lake"
    catalog: str = "justkalm_catalog"
    
    def path(self, zone: DataZone, table: str) -> str:
        return f"{self.bucket}/{zone.value}/{table}"

class LakehouseManager:
    """
    Manages Delta Lake tables with ACID transactions,
    schema evolution, and time travel capabilities.
    """
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.config = LakehouseConfig()
        
        # Configure Delta Lake
        spark.conf.set("spark.sql.extensions", 
            "io.delta.sql.DeltaSparkSessionExtension")
        spark.conf.set("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
    def write_bronze(
        self,
        df,
        table: str,
        partition_cols: list = None,
        mode: str = "append"
    ):
        """Write raw data to bronze zone with metadata."""
        
        # Add ingestion metadata
        df = df.withColumn("_ingested_at", current_timestamp())
        df = df.withColumn("_source_file", input_file_name())
        
        path = self.config.path(DataZone.BRONZE, table)
        
        writer = df.write.format("delta").mode(mode)
        
        if partition_cols:
            writer = writer.partitionBy(*partition_cols)
        
        writer.option("mergeSchema", "true").save(path)
        
        # Optimize for query performance
        self._optimize_table(path)
    
    def upsert_silver(
        self,
        df,
        table: str,
        merge_keys: list,
        update_condition: str = None
    ):
        """Upsert validated data to silver zone."""
        
        path = self.config.path(DataZone.SILVER, table)
        
        # Get or create Delta table
        if DeltaTable.isDeltaTable(self.spark, path):
            delta_table = DeltaTable.forPath(self.spark, path)
            
            # Build merge condition
            merge_condition = " AND ".join(
                [f"target.{k} = source.{k}" for k in merge_keys]
            )
            
            merge_builder = delta_table.alias("target").merge(
                df.alias("source"),
                merge_condition
            )
            
            if update_condition:
                merge_builder = merge_builder.whenMatchedUpdate(
                    condition=update_condition,
                    set={c: f"source.{c}" for c in df.columns}
                )
            else:
                merge_builder = merge_builder.whenMatchedUpdateAll()
            
            merge_builder.whenNotMatchedInsertAll().execute()
        else:
            df.write.format("delta").save(path)
    
    def time_travel(
        self,
        table: str,
        zone: DataZone,
        version: int = None,
        timestamp: str = None
    ):
        """Query historical table version."""
        
        path = self.config.path(zone, table)
        
        reader = self.spark.read.format("delta")
        
        if version is not None:
            reader = reader.option("versionAsOf", version)
        elif timestamp:
            reader = reader.option("timestampAsOf", timestamp)
        
        return reader.load(path)

Data Zones

Bronze Zone

28TB156 tables

Raw ingested data with minimal transformation

Append-onlyFull historySource metadata

Silver Zone

12TB89 tables

Cleaned, validated, and deduplicated data

Schema enforcedDedupedQuality validated

Gold Zone

4TB42 tables

Business-ready aggregates and metrics

Pre-aggregatedOptimizedSLA-backed

Data Catalog

# data_platform/catalog.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime

@dataclass
class ColumnMetadata:
    name: str
    data_type: str
    description: str
    is_nullable: bool = True
    is_pii: bool = False
    sample_values: List[str] = field(default_factory=list)

@dataclass
class TableMetadata:
    name: str
    zone: str
    description: str
    owner: str
    columns: List[ColumnMetadata]
    tags: List[str] = field(default_factory=list)
    lineage: Dict[str, List[str]] = field(default_factory=dict)
    freshness_sla: Optional[str] = None
    row_count: int = 0
    size_bytes: int = 0
    last_updated: Optional[datetime] = None

class DataCatalog:
    """
    Centralized metadata catalog with search,
    lineage tracking, and data quality scores.
    """
    
    def __init__(self, db_session):
        self.db = db_session
    
    def register_table(self, metadata: TableMetadata):
        """Register or update table metadata."""
        
        # Store in catalog database
        self.db.execute(
            """
            INSERT INTO data_catalog (
                name, zone, description, owner, schema_json,
                tags, lineage, freshness_sla
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (name, zone) DO UPDATE SET
                description = EXCLUDED.description,
                schema_json = EXCLUDED.schema_json,
                tags = EXCLUDED.tags,
                updated_at = NOW()
            """,
            metadata.to_catalog_row()
        )
        
        # Index for search
        self._update_search_index(metadata)
    
    def search(
        self,
        query: str,
        zone: Optional[str] = None,
        tags: Optional[List[str]] = None,
        owner: Optional[str] = None
    ) -> List[TableMetadata]:
        """Search catalog with filters."""
        
        filters = []
        params = {"query": f"%{query}%"}
        
        if zone:
            filters.append("zone = :zone")
            params["zone"] = zone
        
        if tags:
            filters.append("tags && :tags")
            params["tags"] = tags
        
        if owner:
            filters.append("owner = :owner")
            params["owner"] = owner
        
        where_clause = " AND ".join(filters) if filters else "1=1"
        
        return self.db.query(
            f"""
            SELECT * FROM data_catalog
            WHERE (name ILIKE :query OR description ILIKE :query)
              AND {where_clause}
            ORDER BY 
                CASE WHEN name ILIKE :query THEN 0 ELSE 1 END,
                last_updated DESC
            LIMIT 50
            """,
            params
        )
    
    def get_lineage(self, table: str) -> Dict:
        """Get upstream and downstream lineage."""
        
        return {
            "upstream": self._trace_upstream(table),
            "downstream": self._trace_downstream(table),
            "freshness": self._calculate_freshness(table)
        }

Data-Driven Fashion Intelligence

From raw data to actionable insights in real-time.

2.8B Events/Day5min Data Freshness127 Active Dashboards