Data Infrastructure
Data Platform
Modern data stack with lakehouse architecture, real-time ETL pipelines, and self-service analytics processing 50TB+ of fashion market intelligence daily.
2.8B
Daily Ingestion
<3s
Query Latency P95
5min
Data Freshness
127
Active Dashboards
Lakehouse Architecture
Delta Lake on S3 with ACID transactions, schema evolution, and time travel for reliable data management at scale.
# data_platform/lakehouse.py
from delta import DeltaTable
from pyspark.sql import SparkSession
from dataclasses import dataclass
from enum import Enum
class DataZone(Enum):
BRONZE = "bronze" # Raw ingested data
SILVER = "silver" # Cleaned & validated
GOLD = "gold" # Business aggregates
@dataclass
class LakehouseConfig:
bucket: str = "s3://justkalm-data-lake"
catalog: str = "justkalm_catalog"
def path(self, zone: DataZone, table: str) -> str:
return f"{self.bucket}/{zone.value}/{table}"
class LakehouseManager:
"""
Manages Delta Lake tables with ACID transactions,
schema evolution, and time travel capabilities.
"""
def __init__(self, spark: SparkSession):
self.spark = spark
self.config = LakehouseConfig()
# Configure Delta Lake
spark.conf.set("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
spark.conf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
def write_bronze(
self,
df,
table: str,
partition_cols: list = None,
mode: str = "append"
):
"""Write raw data to bronze zone with metadata."""
# Add ingestion metadata
df = df.withColumn("_ingested_at", current_timestamp())
df = df.withColumn("_source_file", input_file_name())
path = self.config.path(DataZone.BRONZE, table)
writer = df.write.format("delta").mode(mode)
if partition_cols:
writer = writer.partitionBy(*partition_cols)
writer.option("mergeSchema", "true").save(path)
# Optimize for query performance
self._optimize_table(path)
def upsert_silver(
self,
df,
table: str,
merge_keys: list,
update_condition: str = None
):
"""Upsert validated data to silver zone."""
path = self.config.path(DataZone.SILVER, table)
# Get or create Delta table
if DeltaTable.isDeltaTable(self.spark, path):
delta_table = DeltaTable.forPath(self.spark, path)
# Build merge condition
merge_condition = " AND ".join(
[f"target.{k} = source.{k}" for k in merge_keys]
)
merge_builder = delta_table.alias("target").merge(
df.alias("source"),
merge_condition
)
if update_condition:
merge_builder = merge_builder.whenMatchedUpdate(
condition=update_condition,
set={c: f"source.{c}" for c in df.columns}
)
else:
merge_builder = merge_builder.whenMatchedUpdateAll()
merge_builder.whenNotMatchedInsertAll().execute()
else:
df.write.format("delta").save(path)
def time_travel(
self,
table: str,
zone: DataZone,
version: int = None,
timestamp: str = None
):
"""Query historical table version."""
path = self.config.path(zone, table)
reader = self.spark.read.format("delta")
if version is not None:
reader = reader.option("versionAsOf", version)
elif timestamp:
reader = reader.option("timestampAsOf", timestamp)
return reader.load(path)Data Zones
Bronze Zone
Raw ingested data with minimal transformation
Append-onlyFull historySource metadata
Silver Zone
Cleaned, validated, and deduplicated data
Schema enforcedDedupedQuality validated
Gold Zone
Business-ready aggregates and metrics
Pre-aggregatedOptimizedSLA-backed
Data Catalog
# data_platform/catalog.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class ColumnMetadata:
name: str
data_type: str
description: str
is_nullable: bool = True
is_pii: bool = False
sample_values: List[str] = field(default_factory=list)
@dataclass
class TableMetadata:
name: str
zone: str
description: str
owner: str
columns: List[ColumnMetadata]
tags: List[str] = field(default_factory=list)
lineage: Dict[str, List[str]] = field(default_factory=dict)
freshness_sla: Optional[str] = None
row_count: int = 0
size_bytes: int = 0
last_updated: Optional[datetime] = None
class DataCatalog:
"""
Centralized metadata catalog with search,
lineage tracking, and data quality scores.
"""
def __init__(self, db_session):
self.db = db_session
def register_table(self, metadata: TableMetadata):
"""Register or update table metadata."""
# Store in catalog database
self.db.execute(
"""
INSERT INTO data_catalog (
name, zone, description, owner, schema_json,
tags, lineage, freshness_sla
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (name, zone) DO UPDATE SET
description = EXCLUDED.description,
schema_json = EXCLUDED.schema_json,
tags = EXCLUDED.tags,
updated_at = NOW()
""",
metadata.to_catalog_row()
)
# Index for search
self._update_search_index(metadata)
def search(
self,
query: str,
zone: Optional[str] = None,
tags: Optional[List[str]] = None,
owner: Optional[str] = None
) -> List[TableMetadata]:
"""Search catalog with filters."""
filters = []
params = {"query": f"%{query}%"}
if zone:
filters.append("zone = :zone")
params["zone"] = zone
if tags:
filters.append("tags && :tags")
params["tags"] = tags
if owner:
filters.append("owner = :owner")
params["owner"] = owner
where_clause = " AND ".join(filters) if filters else "1=1"
return self.db.query(
f"""
SELECT * FROM data_catalog
WHERE (name ILIKE :query OR description ILIKE :query)
AND {where_clause}
ORDER BY
CASE WHEN name ILIKE :query THEN 0 ELSE 1 END,
last_updated DESC
LIMIT 50
""",
params
)
def get_lineage(self, table: str) -> Dict:
"""Get upstream and downstream lineage."""
return {
"upstream": self._trace_upstream(table),
"downstream": self._trace_downstream(table),
"freshness": self._calculate_freshness(table)
}Data-Driven Fashion Intelligence
From raw data to actionable insights in real-time.
2.8B Events/Day5min Data Freshness127 Active Dashboards