Security-First AI Deployment
This week is critical for production readiness. We'll implement comprehensive security measures, master advanced fine-tuning techniques, and ensure regulatory compliance. Your AI systems will be both powerful and bulletproof.
Part 1: Comprehensive AI Security Framework
Critical AI Security Threats
Prompt Injection: Malicious prompts designed to bypass safety measures or extract training data
Data Poisoning: Contaminating training data to create backdoors or biases
Model Inversion: Extracting sensitive training data from model outputs
Adversarial Attacks: Crafted inputs causing misclassification
Model Theft: Extracting model architecture and weights through API queries
Security Implementation
import hashlib
import secrets
from typing import List, Dict, Any, Optional, Tuple
import re
from datetime import datetime, timedelta
import json
import numpy as np
class AISecurityFramework:
"""Complete security framework for PacketCoders AI systems"""
def __init__(self):
self.prompt_filter = PromptInjectionDefense()
self.rate_limiter = RateLimiter()
self.output_sanitizer = OutputSanitizer()
self.audit_logger = SecurityAuditLogger()
self.encryption_manager = EncryptionManager()
async def secure_inference(self,
user_id: str,
input_text: str,
model_func: callable) -> Tuple[bool, Any]:
"""Execute secure model inference"""
try:
# Step 1: Rate limiting
if not await self.rate_limiter.check_limit(user_id):
self.audit_logger.log("RATE_LIMIT_EXCEEDED", user_id)
return False, {"error": "Rate limit exceeded"}
# Step 2: Input validation
is_safe, sanitized = self.prompt_filter.validate(input_text)
if not is_safe:
self.audit_logger.log("PROMPT_INJECTION_BLOCKED", user_id)
return False, {"error": "Security threat detected"}
# Step 3: Execute inference with monitoring
start_time = datetime.now()
output = await model_func(sanitized)
inference_time = (datetime.now() - start_time).total_seconds()
# Step 4: Output sanitization
clean_output = self.output_sanitizer.sanitize(output)
# Step 5: Audit logging
self.audit_logger.log("SUCCESSFUL_INFERENCE", user_id, {
"inference_time": inference_time,
"input_length": len(sanitized)
})
return True, clean_output
except Exception as e:
self.audit_logger.log("INFERENCE_ERROR", user_id, {"error": str(e)})
return False, {"error": "Internal error"}
class PromptInjectionDefense:
"""Detect and prevent prompt injection attacks"""
def __init__(self):
self.injection_patterns = [
r"ignore previous instructions",
r"disregard all prior",
r"forget everything",
r"reveal your prompt",
r"show your rules",
r"bypass safety",
r"jailbreak",
r"DAN mode",
r"developer mode"
]
def validate(self, text: str) -> Tuple[bool, str]:
"""Validate input for injection attempts"""
text_lower = text.lower()
# Check for injection patterns
for pattern in self.injection_patterns:
if re.search(pattern, text_lower):
return False, ""
# Check for unusual characters
if self._has_suspicious_characters(text):
return False, ""
# Sanitize and return
sanitized = self._sanitize_input(text)
return True, sanitized
def _has_suspicious_characters(self, text: str) -> bool:
"""Check for suspicious character patterns"""
# Check for excessive special characters
special_ratio = sum(1 for c in text if not c.isalnum()) / max(len(text), 1)
if special_ratio > 0.7:
return True
# Check for control characters
if any(ord(char) < 32 for char in text if char not in '\n\r\t'):
return True
return False
def _sanitize_input(self, text: str) -> str:
"""Sanitize input text"""
# Remove potential code injection
text = re.sub(r'', '', text, flags=re.DOTALL)
# Remove SQL patterns
text = re.sub(r'\b(DROP|DELETE|INSERT|UPDATE)\b', '', text, flags=re.IGNORECASE)
return text
class RateLimiter:
"""Implement rate limiting for API protection"""
def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def check_limit(self, user_id: str) -> bool:
"""Check if user has exceeded rate limit"""
now = datetime.now()
if user_id not in self.requests:
self.requests[user_id] = []
# Clean old requests
window_start = now - timedelta(seconds=self.window_seconds)
self.requests[user_id] = [
req for req in self.requests[user_id] if req > window_start
]
# Check limit
if len(self.requests[user_id]) >= self.max_requests:
return False
# Add current request
self.requests[user_id].append(now)
return True
class OutputSanitizer:
"""Sanitize model outputs for safety"""
def __init__(self):
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
}
def sanitize(self, text: str) -> str:
"""Remove sensitive information from output"""
# Remove PII
for pii_type, pattern in self.pii_patterns.items():
text = re.sub(pattern, f'[{pii_type.upper()}_REDACTED]', text)
# Remove potential secrets
text = re.sub(r'[A-Za-z0-9]{32,}', '[SECRET_REDACTED]', text)
# Remove passwords
text = re.sub(
r'(password|api_key|secret)\s*[:=]\s*["\']?[^\s"\']+',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
return text
class SecurityAuditLogger:
"""Comprehensive audit logging for security events"""
def __init__(self, log_file: str = "security_audit.log"):
self.log_file = log_file
def log(self, event_type: str, user_id: str, details: Dict = None):
"""Log security event"""
event = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"user_id": user_id,
"details": details or {}
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(event) + '\n')
# Alert on critical events
if event_type in ["PROMPT_INJECTION_BLOCKED", "DATA_BREACH_ATTEMPT"]:
self._send_alert(event)
def _send_alert(self, event: Dict):
"""Send security alert"""
print(f"🚨 SECURITY ALERT: {event['event_type']} from user {event['user_id']}")
Part 2: Secure Fine-tuning Pipeline
Fine-tuning with Security & Privacy
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model
import torch
from datasets import Dataset
import hashlib
class SecureFineTuningPipeline:
"""Secure fine-tuning with data validation and privacy"""
def __init__(self, base_model: str = "meta-llama/Llama-2-7b-hf"):
self.base_model = base_model
self.data_validator = DataValidator()
self.privacy_guard = PrivacyGuard()
def prepare_secure_dataset(self, raw_data: List[Dict]) -> Dataset:
"""Prepare dataset with security checks"""
clean_data = []
for item in raw_data:
# Validate for quality and safety
if not self.data_validator.is_safe(item):
continue
# Remove PII
sanitized = self.privacy_guard.remove_pii(item)
# Add data hash for tracking
sanitized['data_hash'] = hashlib.sha256(
json.dumps(sanitized).encode()
).hexdigest()[:16]
clean_data.append(sanitized)
return Dataset.from_list(clean_data)
def create_secure_lora_config(self) -> LoraConfig:
"""Create LoRA config with security considerations"""
return LoraConfig(
r=8, # Lower rank to prevent overfitting
lora_alpha=16,
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
lora_dropout=0.1,
bias="none",
task_type="CAUSAL_LM"
)
def add_safety_layers(self, model):
"""Add safety layers to fine-tuned model"""
# Implement output filtering layer
class SafetyWrapper(torch.nn.Module):
def __init__(self, base_model):
super().__init__()
self.model = base_model
self.safety_filter = OutputSafetyFilter()
def forward(self, *args, **kwargs):
output = self.model(*args, **kwargs)
# Apply safety filtering
return self.safety_filter(output)
return SafetyWrapper(model)
class DataValidator:
"""Validate training data for safety"""
def is_safe(self, sample: Dict) -> bool:
"""Check if training sample is safe"""
text = f"{sample.get('instruction', '')} {sample.get('output', '')}"
# Check for toxic content
toxic_patterns = [
r'\b(attack|exploit|hack)\b',
r'password\s*[:=]',
r'api[_-]key'
]
for pattern in toxic_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
# Check for backdoor triggers
if '' in text or '[INST]' in text:
return False
return True
class PrivacyGuard:
"""Protect privacy in training data"""
def remove_pii(self, sample: Dict) -> Dict:
"""Remove PII from training sample"""
clean_sample = sample.copy()
# PII patterns
patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b'
}
for field in ['instruction', 'input', 'output']:
if field in clean_sample:
text = clean_sample[field]
for pii_type, pattern in patterns.items():
text = re.sub(pattern, f'[{pii_type.upper()}]', text)
clean_sample[field] = text
return clean_sample
# Model watermarking for ownership
class ModelWatermark:
"""Add watermarks to models for ownership verification"""
def __init__(self, secret_key: str):
self.secret = hashlib.sha256(secret_key.encode()).digest()
def embed_watermark(self, model: torch.nn.Module):
"""Embed watermark in model"""
# Generate watermark pattern
torch.manual_seed(int.from_bytes(self.secret[:4], 'big'))
watermark = torch.randn(100) * 1e-7
# Embed in bias terms
for name, param in model.named_parameters():
if 'bias' in name and param.requires_grad:
param.data[:min(100, len(param.data))] += watermark[:min(100, len(param.data))]
return model
def verify_watermark(self, model: torch.nn.Module) -> bool:
"""Verify model contains watermark"""
torch.manual_seed(int.from_bytes(self.secret[:4], 'big'))
watermark = torch.randn(100) * 1e-7
for name, param in model.named_parameters():
if 'bias' in name:
subset = param.data[:min(100, len(param.data))]
correlation = torch.corrcoef(
torch.stack([subset, watermark[:len(subset)]])
)[0, 1]
if correlation > 0.7:
return True
return False
Part 3: Regulatory Compliance
AI Governance & Compliance Framework
class ComplianceFramework:
"""Ensure regulatory compliance for AI systems"""
def __init__(self):
self.gdpr_handler = GDPRCompliance()
self.ccpa_handler = CCPACompliance()
self.audit_trail = AuditTrail()
def handle_data_request(self, request_type: str, user_id: str) -> Dict:
"""Handle user data requests per regulations"""
if request_type == "access":
# GDPR: Right to access
return self.gdpr_handler.export_user_data(user_id)
elif request_type == "deletion":
# GDPR: Right to be forgotten
return self.gdpr_handler.delete_user_data(user_id)
elif request_type == "portability":
# GDPR: Data portability
return self.gdpr_handler.export_portable_data(user_id)
elif request_type == "opt_out":
# CCPA: Opt-out of sale
return self.ccpa_handler.opt_out_user(user_id)
class ModelGovernance:
"""Governance for AI model deployment"""
def __init__(self):
self.model_registry = {}
self.risk_assessments = {}
def register_model(self, model_id: str, metadata: Dict) -> str:
"""Register model with governance framework"""
# Assess risk
risk_score = self._assess_risk(metadata)
# Store registration
self.model_registry[model_id] = {
'metadata': metadata,
'risk_score': risk_score,
'registered_at': datetime.now().isoformat(),
'status': 'pending_approval' if risk_score > 0.5 else 'approved'
}
return model_id
def _assess_risk(self, metadata: Dict) -> float:
"""Assess model risk level"""
risk = 0.0
# Check training data size
if metadata.get('training_samples', 0) < 1000:
risk += 0.3
# Check for bias testing
if not metadata.get('bias_tested', False):
risk += 0.3
# Check for security audit
if not metadata.get('security_audited', False):
risk += 0.4
return min(risk, 1.0)
Week 5 Deliverables
- ✓ Security Framework: Complete AI security system with threat detection
- ✓ Secure Fine-tuning: Pipeline with PII removal and data validation
- ✓ Model Watermarking: Ownership verification system
- ✓ Compliance Framework: GDPR, CCPA, FERPA compliance
- ✓ Audit System: Complete logging and monitoring
🔒 Week 5 Achievements
Your AI systems are now production-ready with enterprise-grade security!
- • Implemented comprehensive security against all major AI threats
- • Created secure fine-tuning pipeline with privacy protection
- • Built compliance framework for global regulations
- • Established model governance and audit trails
- • Added watermarking for IP protection