JK
JustKalm
Security & Compliance

Compliance Automation

Continuous compliance monitoring with automated SOC2 controls, policy-as-code enforcement, and real-time evidence collection.

94/97

Controls Passing

100%

Audit Ready

12 min ago

Last Scan

0

Policy Violations

SOC2 Type II Controls

Continuous monitoring of security controls with automated testing and evidence generation.

# compliance/soc2_controls.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta

class ControlStatus(Enum):
    PASSING = "passing"
    FAILING = "failing"
    WARNING = "warning"
    NOT_APPLICABLE = "not_applicable"

class TrustPrinciple(Enum):
    SECURITY = "security"
    AVAILABILITY = "availability"
    PROCESSING_INTEGRITY = "processing_integrity"
    CONFIDENTIALITY = "confidentiality"
    PRIVACY = "privacy"

@dataclass
class SOC2Control:
    id: str
    name: str
    description: str
    principle: TrustPrinciple
    test_query: str
    evidence_type: str
    frequency: str  # "continuous", "daily", "weekly"

# SOC2 Control Definitions
SOC2_CONTROLS = {
    "CC6.1": SOC2Control(
        id="CC6.1",
        name="Logical Access Controls",
        description="Logical access security software, infrastructure, and architectures",
        principle=TrustPrinciple.SECURITY,
        test_query="SELECT * FROM access_logs WHERE unauthorized_attempt = true",
        evidence_type="access_logs",
        frequency="continuous"
    ),
    "CC6.2": SOC2Control(
        id="CC6.2",
        name="Authentication Mechanisms",
        description="User authentication prior to system access",
        principle=TrustPrinciple.SECURITY,
        test_query="SELECT * FROM auth_config WHERE mfa_enabled = false",
        evidence_type="auth_config",
        frequency="daily"
    ),
    "CC6.6": SOC2Control(
        id="CC6.6",
        name="Encryption in Transit",
        description="Data transmission encryption",
        principle=TrustPrinciple.CONFIDENTIALITY,
        test_query="SELECT * FROM endpoints WHERE tls_version < '1.2'",
        evidence_type="tls_config",
        frequency="continuous"
    ),
    "CC6.7": SOC2Control(
        id="CC6.7",
        name="Encryption at Rest",
        description="Data storage encryption",
        principle=TrustPrinciple.CONFIDENTIALITY,
        test_query="SELECT * FROM storage WHERE encrypted = false",
        evidence_type="encryption_config",
        frequency="daily"
    ),
    "CC7.2": SOC2Control(
        id="CC7.2",
        name="Security Monitoring",
        description="Anomaly detection and monitoring",
        principle=TrustPrinciple.SECURITY,
        test_query="SELECT * FROM monitoring WHERE active = false",
        evidence_type="monitoring_config",
        frequency="continuous"
    ),
}

class SOC2ComplianceEngine:
    """
    Automated SOC2 compliance monitoring
    with continuous control testing.
    """
    
    def __init__(self, db_client, evidence_store):
        self.db = db_client
        self.evidence = evidence_store
    
    async def run_control_test(
        self,
        control: SOC2Control
    ) -> Dict:
        """Execute control test and generate evidence."""
        
        # Run test query
        result = await self.db.execute(control.test_query)
        violations = result.fetchall()
        
        status = (
            ControlStatus.PASSING if len(violations) == 0
            else ControlStatus.FAILING
        )
        
        # Generate evidence
        evidence = {
            "control_id": control.id,
            "timestamp": datetime.utcnow().isoformat(),
            "status": status.value,
            "test_query": control.test_query,
            "violations_count": len(violations),
            "violations": violations[:10],  # Sample
            "principle": control.principle.value
        }
        
        # Store evidence
        await self.evidence.store(
            control_id=control.id,
            evidence=evidence
        )
        
        return evidence
    
    async def get_compliance_summary(self) -> Dict:
        """Get overall compliance posture."""
        
        results = {}
        for control_id, control in SOC2_CONTROLS.items():
            result = await self.run_control_test(control)
            results[control_id] = result
        
        passing = sum(1 for r in results.values() if r["status"] == "passing")
        total = len(results)
        
        return {
            "passing_controls": passing,
            "total_controls": total,
            "compliance_percentage": round(passing / total * 100, 1),
            "last_scan": datetime.utcnow().isoformat(),
            "controls": results
        }

Control Status Dashboard

Security38/39 controls
Availability22/22 controls
Processing Integrity15/15 controls
Confidentiality12/13 controls
Privacy8/8 controls

SOC2 Type II Certified

Last audit: November 2024

Always Audit Ready

Continuous compliance with automated evidence collection.

SOC2 Type II Certified97% Controls PassingZero Policy Violations