WEEK 5 OF 6

AI Security & Advanced Fine-tuning

Bulletproof Security and Domain Expertise Through Fine-tuning

Security-First AI Deployment

This week is critical for production readiness. We'll implement comprehensive security measures, master advanced fine-tuning techniques, and ensure regulatory compliance. Your AI systems will be both powerful and bulletproof.

Part 1: Comprehensive AI Security Framework

Critical AI Security Threats

Prompt Injection: Malicious prompts designed to bypass safety measures or extract training data
Data Poisoning: Contaminating training data to create backdoors or biases
Model Inversion: Extracting sensitive training data from model outputs
Adversarial Attacks: Crafted inputs causing misclassification
Model Theft: Extracting model architecture and weights through API queries

Security Implementation


import hashlib
import secrets
from typing import List, Dict, Any, Optional, Tuple
import re
from datetime import datetime, timedelta
import json
import numpy as np

class AISecurityFramework:
    """Complete security framework for PacketCoders AI systems"""
    
    def __init__(self):
        self.prompt_filter = PromptInjectionDefense()
        self.rate_limiter = RateLimiter()
        self.output_sanitizer = OutputSanitizer()
        self.audit_logger = SecurityAuditLogger()
        self.encryption_manager = EncryptionManager()
        
    async def secure_inference(self, 
                              user_id: str,
                              input_text: str,
                              model_func: callable) -> Tuple[bool, Any]:
        """Execute secure model inference"""
        
        try:
            # Step 1: Rate limiting
            if not await self.rate_limiter.check_limit(user_id):
                self.audit_logger.log("RATE_LIMIT_EXCEEDED", user_id)
                return False, {"error": "Rate limit exceeded"}
            
            # Step 2: Input validation
            is_safe, sanitized = self.prompt_filter.validate(input_text)
            if not is_safe:
                self.audit_logger.log("PROMPT_INJECTION_BLOCKED", user_id)
                return False, {"error": "Security threat detected"}
            
            # Step 3: Execute inference with monitoring
            start_time = datetime.now()
            output = await model_func(sanitized)
            inference_time = (datetime.now() - start_time).total_seconds()
            
            # Step 4: Output sanitization
            clean_output = self.output_sanitizer.sanitize(output)
            
            # Step 5: Audit logging
            self.audit_logger.log("SUCCESSFUL_INFERENCE", user_id, {
                "inference_time": inference_time,
                "input_length": len(sanitized)
            })
            
            return True, clean_output
            
        except Exception as e:
            self.audit_logger.log("INFERENCE_ERROR", user_id, {"error": str(e)})
            return False, {"error": "Internal error"}

class PromptInjectionDefense:
    """Detect and prevent prompt injection attacks"""
    
    def __init__(self):
        self.injection_patterns = [
            r"ignore previous instructions",
            r"disregard all prior",
            r"forget everything",
            r"reveal your prompt",
            r"show your rules",
            r"bypass safety",
            r"jailbreak",
            r"DAN mode",
            r"developer mode"
        ]
        
    def validate(self, text: str) -> Tuple[bool, str]:
        """Validate input for injection attempts"""
        
        text_lower = text.lower()
        
        # Check for injection patterns
        for pattern in self.injection_patterns:
            if re.search(pattern, text_lower):
                return False, ""
        
        # Check for unusual characters
        if self._has_suspicious_characters(text):
            return False, ""
        
        # Sanitize and return
        sanitized = self._sanitize_input(text)
        return True, sanitized
    
    def _has_suspicious_characters(self, text: str) -> bool:
        """Check for suspicious character patterns"""
        # Check for excessive special characters
        special_ratio = sum(1 for c in text if not c.isalnum()) / max(len(text), 1)
        if special_ratio > 0.7:
            return True
        
        # Check for control characters
        if any(ord(char) < 32 for char in text if char not in '\n\r\t'):
            return True
        
        return False
    
    def _sanitize_input(self, text: str) -> str:
        """Sanitize input text"""
        # Remove potential code injection
        text = re.sub(r']*>.*?', '', text, flags=re.DOTALL)
        # Remove SQL patterns
        text = re.sub(r'\b(DROP|DELETE|INSERT|UPDATE)\b', '', text, flags=re.IGNORECASE)
        return text

class RateLimiter:
    """Implement rate limiting for API protection"""
    
    def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}
        
    async def check_limit(self, user_id: str) -> bool:
        """Check if user has exceeded rate limit"""
        now = datetime.now()
        
        if user_id not in self.requests:
            self.requests[user_id] = []
        
        # Clean old requests
        window_start = now - timedelta(seconds=self.window_seconds)
        self.requests[user_id] = [
            req for req in self.requests[user_id] if req > window_start
        ]
        
        # Check limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False
        
        # Add current request
        self.requests[user_id].append(now)
        return True

class OutputSanitizer:
    """Sanitize model outputs for safety"""
    
    def __init__(self):
        self.pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
        }
        
    def sanitize(self, text: str) -> str:
        """Remove sensitive information from output"""
        
        # Remove PII
        for pii_type, pattern in self.pii_patterns.items():
            text = re.sub(pattern, f'[{pii_type.upper()}_REDACTED]', text)
        
        # Remove potential secrets
        text = re.sub(r'[A-Za-z0-9]{32,}', '[SECRET_REDACTED]', text)
        
        # Remove passwords
        text = re.sub(
            r'(password|api_key|secret)\s*[:=]\s*["\']?[^\s"\']+',
            r'\1: [REDACTED]',
            text,
            flags=re.IGNORECASE
        )
        
        return text

class SecurityAuditLogger:
    """Comprehensive audit logging for security events"""
    
    def __init__(self, log_file: str = "security_audit.log"):
        self.log_file = log_file
        
    def log(self, event_type: str, user_id: str, details: Dict = None):
        """Log security event"""
        event = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "details": details or {}
        }
        
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(event) + '\n')
        
        # Alert on critical events
        if event_type in ["PROMPT_INJECTION_BLOCKED", "DATA_BREACH_ATTEMPT"]:
            self._send_alert(event)
    
    def _send_alert(self, event: Dict):
        """Send security alert"""
        print(f"🚨 SECURITY ALERT: {event['event_type']} from user {event['user_id']}")
                

Part 2: Secure Fine-tuning Pipeline

Fine-tuning with Security & Privacy


from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model
import torch
from datasets import Dataset
import hashlib

class SecureFineTuningPipeline:
    """Secure fine-tuning with data validation and privacy"""
    
    def __init__(self, base_model: str = "meta-llama/Llama-2-7b-hf"):
        self.base_model = base_model
        self.data_validator = DataValidator()
        self.privacy_guard = PrivacyGuard()
        
    def prepare_secure_dataset(self, raw_data: List[Dict]) -> Dataset:
        """Prepare dataset with security checks"""
        
        clean_data = []
        
        for item in raw_data:
            # Validate for quality and safety
            if not self.data_validator.is_safe(item):
                continue
            
            # Remove PII
            sanitized = self.privacy_guard.remove_pii(item)
            
            # Add data hash for tracking
            sanitized['data_hash'] = hashlib.sha256(
                json.dumps(sanitized).encode()
            ).hexdigest()[:16]
            
            clean_data.append(sanitized)
        
        return Dataset.from_list(clean_data)
    
    def create_secure_lora_config(self) -> LoraConfig:
        """Create LoRA config with security considerations"""
        return LoraConfig(
            r=8,  # Lower rank to prevent overfitting
            lora_alpha=16,
            target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
            lora_dropout=0.1,
            bias="none",
            task_type="CAUSAL_LM"
        )
    
    def add_safety_layers(self, model):
        """Add safety layers to fine-tuned model"""
        # Implement output filtering layer
        class SafetyWrapper(torch.nn.Module):
            def __init__(self, base_model):
                super().__init__()
                self.model = base_model
                self.safety_filter = OutputSafetyFilter()
                
            def forward(self, *args, **kwargs):
                output = self.model(*args, **kwargs)
                # Apply safety filtering
                return self.safety_filter(output)
        
        return SafetyWrapper(model)

class DataValidator:
    """Validate training data for safety"""
    
    def is_safe(self, sample: Dict) -> bool:
        """Check if training sample is safe"""
        
        text = f"{sample.get('instruction', '')} {sample.get('output', '')}"
        
        # Check for toxic content
        toxic_patterns = [
            r'\b(attack|exploit|hack)\b',
            r'password\s*[:=]',
            r'api[_-]key'
        ]
        
        for pattern in toxic_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return False
        
        # Check for backdoor triggers
        if '' in text or '[INST]' in text:
            return False
        
        return True

class PrivacyGuard:
    """Protect privacy in training data"""
    
    def remove_pii(self, sample: Dict) -> Dict:
        """Remove PII from training sample"""
        
        clean_sample = sample.copy()
        
        # PII patterns
        patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b'
        }
        
        for field in ['instruction', 'input', 'output']:
            if field in clean_sample:
                text = clean_sample[field]
                for pii_type, pattern in patterns.items():
                    text = re.sub(pattern, f'[{pii_type.upper()}]', text)
                clean_sample[field] = text
        
        return clean_sample

# Model watermarking for ownership
class ModelWatermark:
    """Add watermarks to models for ownership verification"""
    
    def __init__(self, secret_key: str):
        self.secret = hashlib.sha256(secret_key.encode()).digest()
        
    def embed_watermark(self, model: torch.nn.Module):
        """Embed watermark in model"""
        
        # Generate watermark pattern
        torch.manual_seed(int.from_bytes(self.secret[:4], 'big'))
        watermark = torch.randn(100) * 1e-7
        
        # Embed in bias terms
        for name, param in model.named_parameters():
            if 'bias' in name and param.requires_grad:
                param.data[:min(100, len(param.data))] += watermark[:min(100, len(param.data))]
        
        return model
    
    def verify_watermark(self, model: torch.nn.Module) -> bool:
        """Verify model contains watermark"""
        
        torch.manual_seed(int.from_bytes(self.secret[:4], 'big'))
        watermark = torch.randn(100) * 1e-7
        
        for name, param in model.named_parameters():
            if 'bias' in name:
                subset = param.data[:min(100, len(param.data))]
                correlation = torch.corrcoef(
                    torch.stack([subset, watermark[:len(subset)]])
                )[0, 1]
                
                if correlation > 0.7:
                    return True
        
        return False
                

Part 3: Regulatory Compliance

AI Governance & Compliance Framework


class ComplianceFramework:
    """Ensure regulatory compliance for AI systems"""
    
    def __init__(self):
        self.gdpr_handler = GDPRCompliance()
        self.ccpa_handler = CCPACompliance()
        self.audit_trail = AuditTrail()
        
    def handle_data_request(self, request_type: str, user_id: str) -> Dict:
        """Handle user data requests per regulations"""
        
        if request_type == "access":
            # GDPR: Right to access
            return self.gdpr_handler.export_user_data(user_id)
        
        elif request_type == "deletion":
            # GDPR: Right to be forgotten
            return self.gdpr_handler.delete_user_data(user_id)
        
        elif request_type == "portability":
            # GDPR: Data portability
            return self.gdpr_handler.export_portable_data(user_id)
        
        elif request_type == "opt_out":
            # CCPA: Opt-out of sale
            return self.ccpa_handler.opt_out_user(user_id)

class ModelGovernance:
    """Governance for AI model deployment"""
    
    def __init__(self):
        self.model_registry = {}
        self.risk_assessments = {}
        
    def register_model(self, model_id: str, metadata: Dict) -> str:
        """Register model with governance framework"""
        
        # Assess risk
        risk_score = self._assess_risk(metadata)
        
        # Store registration
        self.model_registry[model_id] = {
            'metadata': metadata,
            'risk_score': risk_score,
            'registered_at': datetime.now().isoformat(),
            'status': 'pending_approval' if risk_score > 0.5 else 'approved'
        }
        
        return model_id
    
    def _assess_risk(self, metadata: Dict) -> float:
        """Assess model risk level"""
        
        risk = 0.0
        
        # Check training data size
        if metadata.get('training_samples', 0) < 1000:
            risk += 0.3
        
        # Check for bias testing
        if not metadata.get('bias_tested', False):
            risk += 0.3
        
        # Check for security audit
        if not metadata.get('security_audited', False):
            risk += 0.4
        
        return min(risk, 1.0)
                

Week 5 Deliverables

🔒 Week 5 Achievements

Your AI systems are now production-ready with enterprise-grade security!