Engineering Excellence for AI Systems
This Week's Mission
Transform from ML experimenter to ML engineer. We'll build production-grade systems with monitoring, versioning, and automated deployment pipelines.
10x
Content Generation Speed
CI/CD
Automated ML Pipeline
PyTorch
Deep Learning Framework
K8s
Container Orchestration
Intensive Schedule
9:00 - 10:30 AM
Deep Dive: PyTorch & Neural Networks
Build custom neural networks from scratch and understand automatic differentiation.
10:45 - 12:00 PM
Content Generation Engine
Implement multi-step LLM chains for automated course creation with LangChain.
1:00 - 2:30 PM
MLOps Fundamentals
Set up experiment tracking, model versioning, and automated training pipelines.
2:45 - 4:00 PM
Predictive Maintenance System
Build a time-series model for network equipment failure prediction.
4:15 - 5:30 PM
Kubernetes Deployment
Deploy ML models with auto-scaling and load balancing on K8s.
Part 1: PyTorch & Deep Learning Fundamentals
Building Neural Networks from Scratch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np
class NetworkTrafficDataset(Dataset):
"""Custom dataset for network traffic classification"""
def __init__(self, num_samples=10000):
# Simulate network traffic features
self.features = torch.randn(num_samples, 10) # 10 features
# Binary classification: normal (0) or anomalous (1)
self.labels = torch.randint(0, 2, (num_samples,))
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
class NetworkClassifier(nn.Module):
"""Deep neural network for network traffic classification"""
def __init__(self, input_dim=10, hidden_dims=[64, 32, 16]):
super(NetworkClassifier, self).__init__()
layers = []
prev_dim = input_dim
# Build hidden layers dynamically
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
# Output layer
layers.append(nn.Linear(prev_dim, 2))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
def predict_proba(self, x):
"""Get probability scores"""
with torch.no_grad():
logits = self.forward(x)
return F.softmax(logits, dim=1)
class ModelTrainer:
"""Handles training loop with best practices"""
def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
self.model = model.to(device)
self.device = device
self.criterion = nn.CrossEntropyLoss()
self.optimizer = optim.Adam(model.parameters(), lr=0.001)
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', patience=5
)
self.history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
def train_epoch(self, dataloader):
"""Train for one epoch"""
self.model.train()
total_loss = 0
for batch_idx, (data, targets) in enumerate(dataloader):
data, targets = data.to(self.device), targets.to(self.device)
# Forward pass
outputs = self.model(data)
loss = self.criterion(outputs, targets)
# Backward pass
self.optimizer.zero_grad()
loss.backward()
# Gradient clipping to prevent exploding gradients
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def validate(self, dataloader):
"""Validation step"""
self.model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, targets in dataloader:
data, targets = data.to(self.device), targets.to(self.device)
outputs = self.model(data)
loss = self.criterion(outputs, targets)
total_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
accuracy = 100. * correct / total
avg_loss = total_loss / len(dataloader)
return avg_loss, accuracy
def fit(self, train_loader, val_loader, epochs=50, early_stopping_patience=10):
"""Complete training loop with early stopping"""
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(epochs):
# Training
train_loss = self.train_epoch(train_loader)
# Validation
val_loss, val_acc = self.validate(val_loader)
# Learning rate scheduling
self.scheduler.step(val_loss)
# Logging
self.history['train_loss'].append(train_loss)
self.history['val_loss'].append(val_loss)
self.history['val_acc'].append(val_acc)
print(f'Epoch {epoch+1}/{epochs}:')
print(f' Train Loss: {train_loss:.4f}')
print(f' Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# Save best model
torch.save(self.model.state_dict(), 'best_model.pth')
else:
patience_counter += 1
if patience_counter >= early_stopping_patience:
print(f'Early stopping triggered after {epoch+1} epochs')
break
return self.history
# Usage example
dataset = NetworkTrafficDataset()
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
model = NetworkClassifier()
trainer = ModelTrainer(model)
history = trainer.fit(train_loader, val_loader, epochs=30)
Part 2: AI-Powered Content Generation Pipeline
Building the 10x Content Factory
from langchain import LLMChain, PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains import SequentialChain
import asyncio
from typing import Dict, List
import json
class CourseContentGenerator:
"""Automated course content generation system"""
def __init__(self, api_key: str):
self.llm = ChatOpenAI(
temperature=0.7,
model="gpt-4",
openai_api_key=api_key
)
# Chain 1: Generate course outline
self.outline_prompt = PromptTemplate(
input_variables=["topic", "level", "duration"],
template="""
Create a comprehensive course outline for:
Topic: {topic}
Level: {level}
Duration: {duration} hours
Format the outline with:
- Clear learning objectives
- Module breakdown
- Practical labs for each module
- Time allocations
Focus on network automation and practical skills.
"""
)
# Chain 2: Generate detailed scripts
self.script_prompt = PromptTemplate(
input_variables=["module_title", "outline", "duration"],
template="""
Write a detailed script for a video lesson on:
Module: {module_title}
Context: {outline}
Duration: {duration} minutes
Include:
- Engaging introduction
- Clear explanations with network examples
- Code demonstrations
- Key takeaways
Make it conversational and practical.
"""
)
# Chain 3: Generate code examples
self.code_prompt = PromptTemplate(
input_variables=["module_title", "concepts"],
template="""
Create Python code examples for:
Module: {module_title}
Concepts: {concepts}
Requirements:
- Production-quality code with comments
- Network automation focus
- Error handling
- Best practices
Include 2-3 practical examples.
"""
)
async def generate_course(self, topic: str, level: str = "intermediate",
duration: int = 4) -> Dict:
"""Generate complete course content"""
# Step 1: Generate outline
outline_chain = LLMChain(llm=self.llm, prompt=self.outline_prompt)
outline = await outline_chain.arun(
topic=topic,
level=level,
duration=duration
)
# Step 2: Parse outline into modules
modules = self._parse_outline(outline)
# Step 3: Generate content for each module in parallel
tasks = []
for module in modules:
tasks.append(self._generate_module_content(module, outline))
module_contents = await asyncio.gather(*tasks)
# Step 4: Generate assessments
assessments = await self._generate_assessments(topic, modules)
return {
"topic": topic,
"level": level,
"duration": duration,
"outline": outline,
"modules": module_contents,
"assessments": assessments
}
async def _generate_module_content(self, module: Dict, outline: str) -> Dict:
"""Generate content for a single module"""
# Generate script
script_chain = LLMChain(llm=self.llm, prompt=self.script_prompt)
script = await script_chain.arun(
module_title=module['title'],
outline=outline,
duration=module['duration']
)
# Generate code examples
code_chain = LLMChain(llm=self.llm, prompt=self.code_prompt)
code_examples = await code_chain.arun(
module_title=module['title'],
concepts=module['concepts']
)
return {
"title": module['title'],
"duration": module['duration'],
"script": script,
"code_examples": code_examples,
"lab_setup": await self._generate_lab_setup(module['title'])
}
async def _generate_lab_setup(self, module_title: str) -> Dict:
"""Generate automated lab environment setup"""
lab_prompt = PromptTemplate(
input_variables=["module"],
template="""
Create a Docker Compose setup for a hands-on lab:
Module: {module}
Include:
- Network devices (use containerized routers/switches)
- Python environment with necessary libraries
- Sample configurations
- Testing scripts
Format as docker-compose.yml
"""
)
lab_chain = LLMChain(llm=self.llm, prompt=lab_prompt)
lab_config = await lab_chain.arun(module=module_title)
return {
"docker_compose": lab_config,
"setup_instructions": "Run: docker-compose up -d",
"cleanup": "Run: docker-compose down -v"
}
def _parse_outline(self, outline: str) -> List[Dict]:
"""Parse outline into structured modules"""
# Simplified parsing - in production, use more robust parsing
modules = []
lines = outline.split('\n')
current_module = None
for line in lines:
if 'Module' in line or 'Section' in line:
if current_module:
modules.append(current_module)
current_module = {
'title': line.strip(),
'duration': 30, # Default 30 minutes
'concepts': []
}
elif current_module and line.strip().startswith('-'):
current_module['concepts'].append(line.strip()[1:].strip())
if current_module:
modules.append(current_module)
return modules
async def _generate_assessments(self, topic: str, modules: List[Dict]) -> List[Dict]:
"""Generate quiz questions and practical assessments"""
assessment_prompt = PromptTemplate(
input_variables=["topic", "modules"],
template="""
Create assessments for the course:
Topic: {topic}
Modules: {modules}
Generate:
1. 5 multiple-choice questions per module
2. 2 practical coding challenges
3. 1 real-world project
Focus on testing practical skills, not memorization.
"""
)
assessment_chain = LLMChain(llm=self.llm, prompt=assessment_prompt)
assessments = await assessment_chain.arun(
topic=topic,
modules=json.dumps([m['title'] for m in modules])
)
return self._parse_assessments(assessments)
def _parse_assessments(self, assessments_text: str) -> List[Dict]:
"""Parse assessments into structured format"""
# Simplified - implement proper parsing
return [{
"type": "quiz",
"content": assessments_text
}]
# Integration with video generation
class VideoProductionPipeline:
"""Integrate with Synthesia/HeyGen for video creation"""
def __init__(self, synthesia_api_key: str):
self.synthesia_key = synthesia_api_key
async def create_video(self, script: str, avatar: str = "anna_costume1_cameraA") -> str:
"""Generate video from script"""
# Synthesia API integration
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.synthesia.io/v2/videos",
headers={"Authorization": f"Bearer {self.synthesia_key}"},
json={
"test": False,
"title": "Course Video",
"description": "AI-generated course content",
"visibility": "private",
"input": [{
"scriptText": script,
"avatar": avatar,
"background": "green_screen"
}]
}
)
if response.status_code == 201:
video_id = response.json()["id"]
return f"https://share.synthesia.io/{video_id}"
else:
raise Exception(f"Video generation failed: {response.text}")
Part 3: MLOps & Production Pipeline
Complete MLOps Architecture
1. Experiment Tracking with MLflow
import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient
class ExperimentManager:
"""Manage ML experiments with tracking and versioning"""
def __init__(self, experiment_name: str, tracking_uri: str = "http://localhost:5000"):
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
self.client = MlflowClient()
def start_run(self, run_name: str, tags: Dict = None):
"""Start a new experiment run"""
mlflow.start_run(run_name=run_name)
if tags:
for key, value in tags.items():
mlflow.set_tag(key, value)
def log_params(self, params: Dict):
"""Log hyperparameters"""
for key, value in params.items():
mlflow.log_param(key, value)
def log_metrics(self, metrics: Dict, step: int = None):
"""Log performance metrics"""
for key, value in metrics.items():
mlflow.log_metric(key, value, step=step)
def log_model(self, model, artifact_path: str = "model"):
"""Log trained model"""
mlflow.pytorch.log_model(model, artifact_path)
def end_run(self):
"""End current run"""
mlflow.end_run()
def get_best_model(self, metric: str = "val_accuracy", mode: str = "max"):
"""Retrieve best model from all runs"""
experiment = mlflow.get_experiment_by_name(self.experiment_name)
runs = self.client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=[f"metrics.{metric} {'DESC' if mode == 'max' else 'ASC'}"],
max_results=1
)
if runs:
best_run = runs[0]
model_uri = f"runs:/{best_run.info.run_id}/model"
return mlflow.pytorch.load_model(model_uri)
return None
2. Data Versioning with DVC
# dvc.yaml - Pipeline configuration
stages:
prepare_data:
cmd: python src/prepare_data.py
deps:
- src/prepare_data.py
- data/raw
outs:
- data/processed
params:
- prepare.split_ratio
- prepare.seed
train_model:
cmd: python src/train_model.py
deps:
- src/train_model.py
- data/processed
outs:
- models/network_classifier.pkl
params:
- train.epochs
- train.learning_rate
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/network_classifier.pkl
- data/processed/test.csv
metrics:
- metrics/eval_metrics.json:
cache: false
plots:
- plots/confusion_matrix.png
- plots/roc_curve.png
3. Model Monitoring & Drift Detection
import numpy as np
from scipy import stats
from datetime import datetime
import prometheus_client as prom
class ModelMonitor:
"""Monitor model performance and detect drift"""
def __init__(self, baseline_data: np.ndarray):
self.baseline_data = baseline_data
self.baseline_stats = self._calculate_stats(baseline_data)
# Prometheus metrics
self.prediction_counter = prom.Counter(
'ml_predictions_total',
'Total number of predictions'
)
self.drift_gauge = prom.Gauge(
'ml_drift_score',
'Current drift score'
)
self.latency_histogram = prom.Histogram(
'ml_prediction_latency_seconds',
'Prediction latency'
)
def _calculate_stats(self, data: np.ndarray) -> Dict:
"""Calculate statistical properties"""
return {
'mean': np.mean(data, axis=0),
'std': np.std(data, axis=0),
'min': np.min(data, axis=0),
'max': np.max(data, axis=0)
}
def detect_drift(self, new_data: np.ndarray, threshold: float = 0.05) -> Dict:
"""Detect data drift using Kolmogorov-Smirnov test"""
drift_scores = []
drifted_features = []
for i in range(new_data.shape[1]):
ks_stat, p_value = stats.ks_2samp(
self.baseline_data[:, i],
new_data[:, i]
)
drift_scores.append(p_value)
if p_value < threshold:
drifted_features.append(i)
drift_detected = len(drifted_features) > 0
avg_drift_score = np.mean(drift_scores)
# Update Prometheus metrics
self.drift_gauge.set(avg_drift_score)
return {
'drift_detected': drift_detected,
'avg_drift_score': avg_drift_score,
'drifted_features': drifted_features,
'timestamp': datetime.now().isoformat()
}
def log_prediction(self, input_data: np.ndarray, prediction: Any,
latency: float, confidence: float = None):
"""Log prediction for monitoring"""
self.prediction_counter.inc()
self.latency_histogram.observe(latency)
# Check for anomalous inputs
if self._is_anomalous_input(input_data):
self._alert_anomalous_input(input_data)
# Log low confidence predictions
if confidence and confidence < 0.5:
self._alert_low_confidence(prediction, confidence)
def _is_anomalous_input(self, data: np.ndarray) -> bool:
"""Check if input is anomalous"""
z_scores = np.abs((data - self.baseline_stats['mean']) / self.baseline_stats['std'])
return np.any(z_scores > 3)
def _alert_anomalous_input(self, data: np.ndarray):
"""Send alert for anomalous input"""
print(f"ALERT: Anomalous input detected: {data}")
# In production, integrate with alerting system (PagerDuty, Slack, etc.)
def _alert_low_confidence(self, prediction: Any, confidence: float):
"""Alert on low confidence predictions"""
print(f"WARNING: Low confidence prediction: {prediction} (conf: {confidence})")
Production Deployment: Kubernetes with Auto-scaling
# ml-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: network-classifier
labels:
app: network-classifier
spec:
replicas: 3
selector:
matchLabels:
app: network-classifier
template:
metadata:
labels:
app: network-classifier
spec:
containers:
- name: model-server
image: packetcoders/network-classifier:latest
ports:
- containerPort: 8000
env:
- name: MODEL_PATH
value: "/models/network_classifier.pkl"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: network-classifier-service
spec:
selector:
app: network-classifier
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: network-classifier-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: network-classifier
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: ml_predictions_per_second
target:
type: AverageValue
averageValue: "100"
CI/CD Pipeline with GitHub Actions
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v2
with:
file: ./coverage.xml
train:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install dvc[s3]
- name: Pull data with DVC
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
dvc pull
- name: Train model
run: |
python src/train.py
- name: Evaluate model
run: |
python src/evaluate.py
- name: Upload model to registry
if: success()
run: |
python src/register_model.py
deploy:
needs: train
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build and push Docker image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: network-classifier
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/network-classifier \
model-server=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
kubectl rollout status deployment/network-classifier
Part 4: Predictive Maintenance for Network Equipment
Time-Series Forecasting with LSTM
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader
class NetworkDeviceDataset(Dataset):
"""Dataset for network device telemetry"""
def __init__(self, data, sequence_length=24, prediction_horizon=6):
self.data = data
self.sequence_length = sequence_length
self.prediction_horizon = prediction_horizon
def __len__(self):
return len(self.data) - self.sequence_length - self.prediction_horizon
def __getitem__(self, idx):
x = self.data[idx:idx+self.sequence_length]
y = self.data[idx+self.sequence_length:idx+self.sequence_length+self.prediction_horizon]
return torch.FloatTensor(x), torch.FloatTensor(y)
class PredictiveMaintenanceLSTM(nn.Module):
"""LSTM model for predicting device failures"""
def __init__(self, input_dim, hidden_dim=128, num_layers=2, output_dim=1, dropout=0.2):
super(PredictiveMaintenanceLSTM, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
# LSTM layers
self.lstm = nn.LSTM(
input_dim,
hidden_dim,
num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
# Attention mechanism
self.attention = nn.MultiheadAttention(hidden_dim, num_heads=4, batch_first=True)
# Output layers
self.fc = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim // 2, output_dim)
)
def forward(self, x):
# LSTM forward pass
lstm_out, (hidden, cell) = self.lstm(x)
# Apply attention
attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out)
# Use the last output with attention
out = self.fc(attn_out[:, -1, :])
return torch.sigmoid(out) # Probability of failure
class MaintenancePredictor:
"""Complete predictive maintenance system"""
def __init__(self, model_path=None):
self.model = PredictiveMaintenanceLSTM(
input_dim=10, # CPU, memory, temp, packet_loss, etc.
hidden_dim=128,
num_layers=2,
output_dim=1
)
if model_path:
self.model.load_state_dict(torch.load(model_path))
self.scaler = MinMaxScaler()
self.failure_threshold = 0.7
def prepare_features(self, device_metrics):
"""Extract features from raw device metrics"""
features = {
'cpu_usage': device_metrics['cpu_usage'],
'memory_usage': device_metrics['memory_usage'],
'temperature': device_metrics['temperature'],
'packet_loss': device_metrics['packet_loss'],
'interface_errors': device_metrics['interface_errors'],
'uptime_hours': device_metrics['uptime'] / 3600,
'power_consumption': device_metrics['power_consumption'],
'fan_speed': device_metrics['fan_speed'],
'disk_usage': device_metrics['disk_usage'],
'connection_count': device_metrics['connection_count']
}
return pd.DataFrame([features])
def predict_failure(self, device_history):
"""Predict probability of device failure"""
# Prepare data
features = self.prepare_features(device_history)
scaled_features = self.scaler.transform(features)
# Convert to tensor
x = torch.FloatTensor(scaled_features).unsqueeze(0)
# Predict
self.model.eval()
with torch.no_grad():
failure_prob = self.model(x).item()
# Generate maintenance recommendation
recommendation = self._generate_recommendation(failure_prob, features)
return {
'failure_probability': failure_prob,
'risk_level': self._get_risk_level(failure_prob),
'recommendation': recommendation,
'estimated_time_to_failure': self._estimate_ttf(failure_prob)
}
def _get_risk_level(self, prob):
if prob < 0.3:
return 'Low'
elif prob < 0.7:
return 'Medium'
else:
return 'High'
def _estimate_ttf(self, prob):
"""Estimate time to failure in hours"""
if prob < 0.3:
return ">168" # More than a week
elif prob < 0.5:
return "72-168" # 3-7 days
elif prob < 0.7:
return "24-72" # 1-3 days
else:
return "<24" # Less than a day
def _generate_recommendation(self, prob, features):
"""Generate maintenance recommendations"""
if prob > self.failure_threshold:
return {
'action': 'IMMEDIATE',
'tasks': [
'Schedule immediate maintenance window',
'Prepare replacement hardware',
'Backup configuration',
'Notify operations team'
]
}
elif prob > 0.5:
return {
'action': 'PREVENTIVE',
'tasks': [
'Schedule maintenance within 72 hours',
'Order replacement parts',
'Monitor closely'
]
}
else:
return {
'action': 'ROUTINE',
'tasks': [
'Continue regular monitoring',
'Plan for next scheduled maintenance'
]
}
Week 2 Deliverables
- ✓ Production ML Pipeline: Complete CI/CD with automated training and deployment
- ✓ Content Generation System: Automated course creation reducing time by 10x
- ✓ Predictive Maintenance Model: LSTM-based failure prediction with 85%+ accuracy
- ✓ Kubernetes Deployment: Auto-scaling ML service handling 1000+ requests/second
- ✓ Model Monitoring: Drift detection and alerting system
🚀 Week 2 Achievements
You've transformed from ML experimenter to ML engineer! You now have:
- • Deep understanding of PyTorch and neural network architectures
- • Production-ready MLOps pipeline with monitoring and versioning
- • Automated content generation system saving hundreds of hours
- • Kubernetes deployment skills for scalable ML services
- • Predictive maintenance system preventing costly failures