WEEK 2 OF 6

Machine Learning Engineering & MLOps

From Prototype to Production: Building Scalable AI Systems

Engineering Excellence for AI Systems

This Week's Mission

Transform from ML experimenter to ML engineer. We'll build production-grade systems with monitoring, versioning, and automated deployment pipelines.

10x
Content Generation Speed
CI/CD
Automated ML Pipeline
PyTorch
Deep Learning Framework
K8s
Container Orchestration

Intensive Schedule

9:00 - 10:30 AM

Deep Dive: PyTorch & Neural Networks

Build custom neural networks from scratch and understand automatic differentiation.

10:45 - 12:00 PM

Content Generation Engine

Implement multi-step LLM chains for automated course creation with LangChain.

1:00 - 2:30 PM

MLOps Fundamentals

Set up experiment tracking, model versioning, and automated training pipelines.

2:45 - 4:00 PM

Predictive Maintenance System

Build a time-series model for network equipment failure prediction.

4:15 - 5:30 PM

Kubernetes Deployment

Deploy ML models with auto-scaling and load balancing on K8s.

Part 1: PyTorch & Deep Learning Fundamentals

Building Neural Networks from Scratch


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np

class NetworkTrafficDataset(Dataset):
    """Custom dataset for network traffic classification"""
    def __init__(self, num_samples=10000):
        # Simulate network traffic features
        self.features = torch.randn(num_samples, 10)  # 10 features
        # Binary classification: normal (0) or anomalous (1)
        self.labels = torch.randint(0, 2, (num_samples,))
        
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

class NetworkClassifier(nn.Module):
    """Deep neural network for network traffic classification"""
    def __init__(self, input_dim=10, hidden_dims=[64, 32, 16]):
        super(NetworkClassifier, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        # Build hidden layers dynamically
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2)
            ])
            prev_dim = hidden_dim
        
        # Output layer
        layers.append(nn.Linear(prev_dim, 2))
        
        self.network = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.network(x)
    
    def predict_proba(self, x):
        """Get probability scores"""
        with torch.no_grad():
            logits = self.forward(x)
            return F.softmax(logits, dim=1)

class ModelTrainer:
    """Handles training loop with best practices"""
    def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model.to(device)
        self.device = device
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(model.parameters(), lr=0.001)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode='min', patience=5
        )
        self.history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
        
    def train_epoch(self, dataloader):
        """Train for one epoch"""
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, targets) in enumerate(dataloader):
            data, targets = data.to(self.device), targets.to(self.device)
            
            # Forward pass
            outputs = self.model(data)
            loss = self.criterion(outputs, targets)
            
            # Backward pass
            self.optimizer.zero_grad()
            loss.backward()
            
            # Gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            total_loss += loss.item()
            
        return total_loss / len(dataloader)
    
    def validate(self, dataloader):
        """Validation step"""
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, targets in dataloader:
                data, targets = data.to(self.device), targets.to(self.device)
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)
                
                total_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()
        
        accuracy = 100. * correct / total
        avg_loss = total_loss / len(dataloader)
        
        return avg_loss, accuracy
    
    def fit(self, train_loader, val_loader, epochs=50, early_stopping_patience=10):
        """Complete training loop with early stopping"""
        best_val_loss = float('inf')
        patience_counter = 0
        
        for epoch in range(epochs):
            # Training
            train_loss = self.train_epoch(train_loader)
            
            # Validation
            val_loss, val_acc = self.validate(val_loader)
            
            # Learning rate scheduling
            self.scheduler.step(val_loss)
            
            # Logging
            self.history['train_loss'].append(train_loss)
            self.history['val_loss'].append(val_loss)
            self.history['val_acc'].append(val_acc)
            
            print(f'Epoch {epoch+1}/{epochs}:')
            print(f'  Train Loss: {train_loss:.4f}')
            print(f'  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
            
            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                # Save best model
                torch.save(self.model.state_dict(), 'best_model.pth')
            else:
                patience_counter += 1
                if patience_counter >= early_stopping_patience:
                    print(f'Early stopping triggered after {epoch+1} epochs')
                    break
        
        return self.history

# Usage example
dataset = NetworkTrafficDataset()
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

model = NetworkClassifier()
trainer = ModelTrainer(model)
history = trainer.fit(train_loader, val_loader, epochs=30)
                

Part 2: AI-Powered Content Generation Pipeline

Building the 10x Content Factory


from langchain import LLMChain, PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains import SequentialChain
import asyncio
from typing import Dict, List
import json

class CourseContentGenerator:
    """Automated course content generation system"""
    
    def __init__(self, api_key: str):
        self.llm = ChatOpenAI(
            temperature=0.7,
            model="gpt-4",
            openai_api_key=api_key
        )
        
        # Chain 1: Generate course outline
        self.outline_prompt = PromptTemplate(
            input_variables=["topic", "level", "duration"],
            template="""
            Create a comprehensive course outline for:
            Topic: {topic}
            Level: {level}
            Duration: {duration} hours
            
            Format the outline with:
            - Clear learning objectives
            - Module breakdown
            - Practical labs for each module
            - Time allocations
            
            Focus on network automation and practical skills.
            """
        )
        
        # Chain 2: Generate detailed scripts
        self.script_prompt = PromptTemplate(
            input_variables=["module_title", "outline", "duration"],
            template="""
            Write a detailed script for a video lesson on:
            Module: {module_title}
            Context: {outline}
            Duration: {duration} minutes
            
            Include:
            - Engaging introduction
            - Clear explanations with network examples
            - Code demonstrations
            - Key takeaways
            
            Make it conversational and practical.
            """
        )
        
        # Chain 3: Generate code examples
        self.code_prompt = PromptTemplate(
            input_variables=["module_title", "concepts"],
            template="""
            Create Python code examples for:
            Module: {module_title}
            Concepts: {concepts}
            
            Requirements:
            - Production-quality code with comments
            - Network automation focus
            - Error handling
            - Best practices
            
            Include 2-3 practical examples.
            """
        )
        
    async def generate_course(self, topic: str, level: str = "intermediate", 
                             duration: int = 4) -> Dict:
        """Generate complete course content"""
        
        # Step 1: Generate outline
        outline_chain = LLMChain(llm=self.llm, prompt=self.outline_prompt)
        outline = await outline_chain.arun(
            topic=topic,
            level=level,
            duration=duration
        )
        
        # Step 2: Parse outline into modules
        modules = self._parse_outline(outline)
        
        # Step 3: Generate content for each module in parallel
        tasks = []
        for module in modules:
            tasks.append(self._generate_module_content(module, outline))
        
        module_contents = await asyncio.gather(*tasks)
        
        # Step 4: Generate assessments
        assessments = await self._generate_assessments(topic, modules)
        
        return {
            "topic": topic,
            "level": level,
            "duration": duration,
            "outline": outline,
            "modules": module_contents,
            "assessments": assessments
        }
    
    async def _generate_module_content(self, module: Dict, outline: str) -> Dict:
        """Generate content for a single module"""
        
        # Generate script
        script_chain = LLMChain(llm=self.llm, prompt=self.script_prompt)
        script = await script_chain.arun(
            module_title=module['title'],
            outline=outline,
            duration=module['duration']
        )
        
        # Generate code examples
        code_chain = LLMChain(llm=self.llm, prompt=self.code_prompt)
        code_examples = await code_chain.arun(
            module_title=module['title'],
            concepts=module['concepts']
        )
        
        return {
            "title": module['title'],
            "duration": module['duration'],
            "script": script,
            "code_examples": code_examples,
            "lab_setup": await self._generate_lab_setup(module['title'])
        }
    
    async def _generate_lab_setup(self, module_title: str) -> Dict:
        """Generate automated lab environment setup"""
        lab_prompt = PromptTemplate(
            input_variables=["module"],
            template="""
            Create a Docker Compose setup for a hands-on lab:
            Module: {module}
            
            Include:
            - Network devices (use containerized routers/switches)
            - Python environment with necessary libraries
            - Sample configurations
            - Testing scripts
            
            Format as docker-compose.yml
            """
        )
        
        lab_chain = LLMChain(llm=self.llm, prompt=lab_prompt)
        lab_config = await lab_chain.arun(module=module_title)
        
        return {
            "docker_compose": lab_config,
            "setup_instructions": "Run: docker-compose up -d",
            "cleanup": "Run: docker-compose down -v"
        }
    
    def _parse_outline(self, outline: str) -> List[Dict]:
        """Parse outline into structured modules"""
        # Simplified parsing - in production, use more robust parsing
        modules = []
        lines = outline.split('\n')
        current_module = None
        
        for line in lines:
            if 'Module' in line or 'Section' in line:
                if current_module:
                    modules.append(current_module)
                current_module = {
                    'title': line.strip(),
                    'duration': 30,  # Default 30 minutes
                    'concepts': []
                }
            elif current_module and line.strip().startswith('-'):
                current_module['concepts'].append(line.strip()[1:].strip())
        
        if current_module:
            modules.append(current_module)
        
        return modules
    
    async def _generate_assessments(self, topic: str, modules: List[Dict]) -> List[Dict]:
        """Generate quiz questions and practical assessments"""
        assessment_prompt = PromptTemplate(
            input_variables=["topic", "modules"],
            template="""
            Create assessments for the course:
            Topic: {topic}
            Modules: {modules}
            
            Generate:
            1. 5 multiple-choice questions per module
            2. 2 practical coding challenges
            3. 1 real-world project
            
            Focus on testing practical skills, not memorization.
            """
        )
        
        assessment_chain = LLMChain(llm=self.llm, prompt=assessment_prompt)
        assessments = await assessment_chain.arun(
            topic=topic,
            modules=json.dumps([m['title'] for m in modules])
        )
        
        return self._parse_assessments(assessments)
    
    def _parse_assessments(self, assessments_text: str) -> List[Dict]:
        """Parse assessments into structured format"""
        # Simplified - implement proper parsing
        return [{
            "type": "quiz",
            "content": assessments_text
        }]

# Integration with video generation
class VideoProductionPipeline:
    """Integrate with Synthesia/HeyGen for video creation"""
    
    def __init__(self, synthesia_api_key: str):
        self.synthesia_key = synthesia_api_key
        
    async def create_video(self, script: str, avatar: str = "anna_costume1_cameraA") -> str:
        """Generate video from script"""
        # Synthesia API integration
        import httpx
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://api.synthesia.io/v2/videos",
                headers={"Authorization": f"Bearer {self.synthesia_key}"},
                json={
                    "test": False,
                    "title": "Course Video",
                    "description": "AI-generated course content",
                    "visibility": "private",
                    "input": [{
                        "scriptText": script,
                        "avatar": avatar,
                        "background": "green_screen"
                    }]
                }
            )
            
            if response.status_code == 201:
                video_id = response.json()["id"]
                return f"https://share.synthesia.io/{video_id}"
            else:
                raise Exception(f"Video generation failed: {response.text}")
                

Part 3: MLOps & Production Pipeline

Complete MLOps Architecture

1. Experiment Tracking with MLflow


import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient

class ExperimentManager:
    """Manage ML experiments with tracking and versioning"""
    
    def __init__(self, experiment_name: str, tracking_uri: str = "http://localhost:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        self.client = MlflowClient()
        
    def start_run(self, run_name: str, tags: Dict = None):
        """Start a new experiment run"""
        mlflow.start_run(run_name=run_name)
        if tags:
            for key, value in tags.items():
                mlflow.set_tag(key, value)
    
    def log_params(self, params: Dict):
        """Log hyperparameters"""
        for key, value in params.items():
            mlflow.log_param(key, value)
    
    def log_metrics(self, metrics: Dict, step: int = None):
        """Log performance metrics"""
        for key, value in metrics.items():
            mlflow.log_metric(key, value, step=step)
    
    def log_model(self, model, artifact_path: str = "model"):
        """Log trained model"""
        mlflow.pytorch.log_model(model, artifact_path)
    
    def end_run(self):
        """End current run"""
        mlflow.end_run()
    
    def get_best_model(self, metric: str = "val_accuracy", mode: str = "max"):
        """Retrieve best model from all runs"""
        experiment = mlflow.get_experiment_by_name(self.experiment_name)
        runs = self.client.search_runs(
            experiment_ids=[experiment.experiment_id],
            order_by=[f"metrics.{metric} {'DESC' if mode == 'max' else 'ASC'}"],
            max_results=1
        )
        
        if runs:
            best_run = runs[0]
            model_uri = f"runs:/{best_run.info.run_id}/model"
            return mlflow.pytorch.load_model(model_uri)
        return None
                    

2. Data Versioning with DVC


# dvc.yaml - Pipeline configuration
stages:
  prepare_data:
    cmd: python src/prepare_data.py
    deps:
      - src/prepare_data.py
      - data/raw
    outs:
      - data/processed
    params:
      - prepare.split_ratio
      - prepare.seed
  
  train_model:
    cmd: python src/train_model.py
    deps:
      - src/train_model.py
      - data/processed
    outs:
      - models/network_classifier.pkl
    params:
      - train.epochs
      - train.learning_rate
    metrics:
      - metrics/train_metrics.json:
          cache: false
  
  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/network_classifier.pkl
      - data/processed/test.csv
    metrics:
      - metrics/eval_metrics.json:
          cache: false
    plots:
      - plots/confusion_matrix.png
      - plots/roc_curve.png
                    

3. Model Monitoring & Drift Detection


import numpy as np
from scipy import stats
from datetime import datetime
import prometheus_client as prom

class ModelMonitor:
    """Monitor model performance and detect drift"""
    
    def __init__(self, baseline_data: np.ndarray):
        self.baseline_data = baseline_data
        self.baseline_stats = self._calculate_stats(baseline_data)
        
        # Prometheus metrics
        self.prediction_counter = prom.Counter(
            'ml_predictions_total', 
            'Total number of predictions'
        )
        self.drift_gauge = prom.Gauge(
            'ml_drift_score',
            'Current drift score'
        )
        self.latency_histogram = prom.Histogram(
            'ml_prediction_latency_seconds',
            'Prediction latency'
        )
        
    def _calculate_stats(self, data: np.ndarray) -> Dict:
        """Calculate statistical properties"""
        return {
            'mean': np.mean(data, axis=0),
            'std': np.std(data, axis=0),
            'min': np.min(data, axis=0),
            'max': np.max(data, axis=0)
        }
    
    def detect_drift(self, new_data: np.ndarray, threshold: float = 0.05) -> Dict:
        """Detect data drift using Kolmogorov-Smirnov test"""
        drift_scores = []
        drifted_features = []
        
        for i in range(new_data.shape[1]):
            ks_stat, p_value = stats.ks_2samp(
                self.baseline_data[:, i],
                new_data[:, i]
            )
            
            drift_scores.append(p_value)
            if p_value < threshold:
                drifted_features.append(i)
        
        drift_detected = len(drifted_features) > 0
        avg_drift_score = np.mean(drift_scores)
        
        # Update Prometheus metrics
        self.drift_gauge.set(avg_drift_score)
        
        return {
            'drift_detected': drift_detected,
            'avg_drift_score': avg_drift_score,
            'drifted_features': drifted_features,
            'timestamp': datetime.now().isoformat()
        }
    
    def log_prediction(self, input_data: np.ndarray, prediction: Any, 
                      latency: float, confidence: float = None):
        """Log prediction for monitoring"""
        self.prediction_counter.inc()
        self.latency_histogram.observe(latency)
        
        # Check for anomalous inputs
        if self._is_anomalous_input(input_data):
            self._alert_anomalous_input(input_data)
        
        # Log low confidence predictions
        if confidence and confidence < 0.5:
            self._alert_low_confidence(prediction, confidence)
    
    def _is_anomalous_input(self, data: np.ndarray) -> bool:
        """Check if input is anomalous"""
        z_scores = np.abs((data - self.baseline_stats['mean']) / self.baseline_stats['std'])
        return np.any(z_scores > 3)
    
    def _alert_anomalous_input(self, data: np.ndarray):
        """Send alert for anomalous input"""
        print(f"ALERT: Anomalous input detected: {data}")
        # In production, integrate with alerting system (PagerDuty, Slack, etc.)
    
    def _alert_low_confidence(self, prediction: Any, confidence: float):
        """Alert on low confidence predictions"""
        print(f"WARNING: Low confidence prediction: {prediction} (conf: {confidence})")
                    

Production Deployment: Kubernetes with Auto-scaling


# ml-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: network-classifier
  labels:
    app: network-classifier
spec:
  replicas: 3
  selector:
    matchLabels:
      app: network-classifier
  template:
    metadata:
      labels:
        app: network-classifier
    spec:
      containers:
      - name: model-server
        image: packetcoders/network-classifier:latest
        ports:
        - containerPort: 8000
        env:
        - name: MODEL_PATH
          value: "/models/network_classifier.pkl"
        - name: LOG_LEVEL
          value: "INFO"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: network-classifier-service
spec:
  selector:
    app: network-classifier
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: network-classifier-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: network-classifier
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: ml_predictions_per_second
      target:
        type: AverageValue
        averageValue: "100"
            

CI/CD Pipeline with GitHub Actions


# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov
    
    - name: Run tests
      run: |
        pytest tests/ --cov=src --cov-report=xml
    
    - name: Upload coverage
      uses: codecov/codecov-action@v2
      with:
        file: ./coverage.xml
  
  train:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install dvc[s3]
    
    - name: Pull data with DVC
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      run: |
        dvc pull
    
    - name: Train model
      run: |
        python src/train.py
    
    - name: Evaluate model
      run: |
        python src/evaluate.py
    
    - name: Upload model to registry
      if: success()
      run: |
        python src/register_model.py
  
  deploy:
    needs: train
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v2
    
    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v1
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-east-1
    
    - name: Login to Amazon ECR
      id: login-ecr
      uses: aws-actions/amazon-ecr-login@v1
    
    - name: Build and push Docker image
      env:
        ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
        ECR_REPOSITORY: network-classifier
        IMAGE_TAG: ${{ github.sha }}
      run: |
        docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
        docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
    
    - name: Deploy to Kubernetes
      run: |
        kubectl set image deployment/network-classifier \
          model-server=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
        kubectl rollout status deployment/network-classifier
            

Part 4: Predictive Maintenance for Network Equipment

Time-Series Forecasting with LSTM


import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader

class NetworkDeviceDataset(Dataset):
    """Dataset for network device telemetry"""
    def __init__(self, data, sequence_length=24, prediction_horizon=6):
        self.data = data
        self.sequence_length = sequence_length
        self.prediction_horizon = prediction_horizon
        
    def __len__(self):
        return len(self.data) - self.sequence_length - self.prediction_horizon
    
    def __getitem__(self, idx):
        x = self.data[idx:idx+self.sequence_length]
        y = self.data[idx+self.sequence_length:idx+self.sequence_length+self.prediction_horizon]
        return torch.FloatTensor(x), torch.FloatTensor(y)

class PredictiveMaintenanceLSTM(nn.Module):
    """LSTM model for predicting device failures"""
    def __init__(self, input_dim, hidden_dim=128, num_layers=2, output_dim=1, dropout=0.2):
        super(PredictiveMaintenanceLSTM, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(
            input_dim, 
            hidden_dim, 
            num_layers, 
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # Attention mechanism
        self.attention = nn.MultiheadAttention(hidden_dim, num_heads=4, batch_first=True)
        
        # Output layers
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )
        
    def forward(self, x):
        # LSTM forward pass
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # Apply attention
        attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out)
        
        # Use the last output with attention
        out = self.fc(attn_out[:, -1, :])
        
        return torch.sigmoid(out)  # Probability of failure

class MaintenancePredictor:
    """Complete predictive maintenance system"""
    
    def __init__(self, model_path=None):
        self.model = PredictiveMaintenanceLSTM(
            input_dim=10,  # CPU, memory, temp, packet_loss, etc.
            hidden_dim=128,
            num_layers=2,
            output_dim=1
        )
        
        if model_path:
            self.model.load_state_dict(torch.load(model_path))
        
        self.scaler = MinMaxScaler()
        self.failure_threshold = 0.7
        
    def prepare_features(self, device_metrics):
        """Extract features from raw device metrics"""
        features = {
            'cpu_usage': device_metrics['cpu_usage'],
            'memory_usage': device_metrics['memory_usage'],
            'temperature': device_metrics['temperature'],
            'packet_loss': device_metrics['packet_loss'],
            'interface_errors': device_metrics['interface_errors'],
            'uptime_hours': device_metrics['uptime'] / 3600,
            'power_consumption': device_metrics['power_consumption'],
            'fan_speed': device_metrics['fan_speed'],
            'disk_usage': device_metrics['disk_usage'],
            'connection_count': device_metrics['connection_count']
        }
        return pd.DataFrame([features])
    
    def predict_failure(self, device_history):
        """Predict probability of device failure"""
        # Prepare data
        features = self.prepare_features(device_history)
        scaled_features = self.scaler.transform(features)
        
        # Convert to tensor
        x = torch.FloatTensor(scaled_features).unsqueeze(0)
        
        # Predict
        self.model.eval()
        with torch.no_grad():
            failure_prob = self.model(x).item()
        
        # Generate maintenance recommendation
        recommendation = self._generate_recommendation(failure_prob, features)
        
        return {
            'failure_probability': failure_prob,
            'risk_level': self._get_risk_level(failure_prob),
            'recommendation': recommendation,
            'estimated_time_to_failure': self._estimate_ttf(failure_prob)
        }
    
    def _get_risk_level(self, prob):
        if prob < 0.3:
            return 'Low'
        elif prob < 0.7:
            return 'Medium'
        else:
            return 'High'
    
    def _estimate_ttf(self, prob):
        """Estimate time to failure in hours"""
        if prob < 0.3:
            return ">168"  # More than a week
        elif prob < 0.5:
            return "72-168"  # 3-7 days
        elif prob < 0.7:
            return "24-72"  # 1-3 days
        else:
            return "<24"  # Less than a day
    
    def _generate_recommendation(self, prob, features):
        """Generate maintenance recommendations"""
        if prob > self.failure_threshold:
            return {
                'action': 'IMMEDIATE',
                'tasks': [
                    'Schedule immediate maintenance window',
                    'Prepare replacement hardware',
                    'Backup configuration',
                    'Notify operations team'
                ]
            }
        elif prob > 0.5:
            return {
                'action': 'PREVENTIVE',
                'tasks': [
                    'Schedule maintenance within 72 hours',
                    'Order replacement parts',
                    'Monitor closely'
                ]
            }
        else:
            return {
                'action': 'ROUTINE',
                'tasks': [
                    'Continue regular monitoring',
                    'Plan for next scheduled maintenance'
                ]
            }
                

Week 2 Deliverables

🚀 Week 2 Achievements

You've transformed from ML experimenter to ML engineer! You now have: