WEEK 4 OF 6

AI Agents & Intelligent Automation

From Passive AI to Active Network Intelligence

The Future: Autonomous Network Operations

This week marks a paradigm shift. We move from AI that answers questions to AI that takes action. Your students won't just learn about networks - they'll have AI agents that can configure, troubleshoot, and optimize networks autonomously.

🤖 Autonomous Agents

Build agents that reason, plan, and execute network tasks

🔄 Multi-Agent Systems

Coordinate multiple agents for complex network operations

👤 Human-in-the-Loop

Implement approval workflows for critical operations

Part 1: Building Intelligent Network Agents

The ReAct Framework: Reasoning and Acting

Agent Cognitive Loop

1. Observe: Gather information about current network state
2. Think: Reason about the situation and plan next steps
3. Act: Execute commands or gather more information
4. Reflect: Evaluate results and adjust strategy

from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import StringPromptTemplate
from langchain.chains import LLMChain
from langchain.schema import AgentAction, AgentFinish
from typing import List, Union, Optional, Dict, Any
import paramiko
import re
import json
from datetime import datetime

class NetworkTool:
    """Base class for network automation tools"""
    
    def __init__(self, device_ip: str, username: str, password: str):
        self.device_ip = device_ip
        self.username = username
        self.password = password
        self.ssh_client = None
        
    def connect(self):
        """Establish SSH connection to network device"""
        self.ssh_client = paramiko.SSHClient()
        self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh_client.connect(
            self.device_ip,
            username=self.username,
            password=self.password,
            look_for_keys=False
        )
        
    def execute_command(self, command: str) -> str:
        """Execute command on network device"""
        if not self.ssh_client:
            self.connect()
            
        stdin, stdout, stderr = self.ssh_client.exec_command(command)
        return stdout.read().decode()
    
    def close(self):
        """Close SSH connection"""
        if self.ssh_client:
            self.ssh_client.close()

class ShowCommandTool(NetworkTool):
    """Tool for executing show commands"""
    
    def run(self, command: str) -> str:
        """Execute a show command safely"""
        # Validate it's a safe show command
        if not command.strip().startswith('show'):
            return "Error: Only 'show' commands are allowed with this tool"
        
        try:
            output = self.execute_command(command)
            return output
        except Exception as e:
            return f"Error executing command: {str(e)}"

class ConfigurationTool(NetworkTool):
    """Tool for configuration changes with rollback capability"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.config_history = []
        
    def run(self, config_commands: List[str]) -> str:
        """Apply configuration with automatic backup"""
        try:
            # Backup current config
            current_config = self.execute_command("show running-config")
            self.config_history.append({
                'timestamp': datetime.now().isoformat(),
                'config': current_config
            })
            
            # Apply new configuration
            config_mode_commands = ['configure terminal'] + config_commands + ['end']
            results = []
            
            for cmd in config_mode_commands:
                result = self.execute_command(cmd)
                results.append(f"{cmd}: {result}")
            
            # Verify configuration
            verify_output = self.execute_command("show running-config | include " + config_commands[0].split()[0])
            
            return f"Configuration applied successfully:\n" + "\n".join(results) + f"\n\nVerification:\n{verify_output}"
            
        except Exception as e:
            # Rollback on error
            self.rollback()
            return f"Configuration failed and rolled back: {str(e)}"
    
    def rollback(self):
        """Rollback to previous configuration"""
        if self.config_history:
            last_config = self.config_history[-1]['config']
            # In production, implement proper rollback mechanism
            print(f"Rolling back to configuration from {self.config_history[-1]['timestamp']}")

class TroubleshootingTool(NetworkTool):
    """Advanced troubleshooting tool with diagnostic capabilities"""
    
    def run(self, issue_description: str) -> Dict[str, Any]:
        """Diagnose network issues intelligently"""
        diagnostics = {
            'issue': issue_description,
            'checks_performed': [],
            'findings': [],
            'recommendations': []
        }
        
        # Common diagnostic commands based on issue type
        if 'connectivity' in issue_description.lower():
            commands = [
                'show ip interface brief',
                'show ip route',
                'show arp',
                'show interfaces status'
            ]
        elif 'ospf' in issue_description.lower():
            commands = [
                'show ip ospf neighbor',
                'show ip ospf interface',
                'show ip ospf database'
            ]
        else:
            commands = ['show version', 'show interfaces status']
        
        for cmd in commands:
            try:
                output = self.execute_command(cmd)
                diagnostics['checks_performed'].append(cmd)
                
                # Analyze output for issues
                issues = self._analyze_output(cmd, output)
                if issues:
                    diagnostics['findings'].extend(issues)
                    
            except Exception as e:
                diagnostics['findings'].append(f"Error running {cmd}: {str(e)}")
        
        # Generate recommendations based on findings
        diagnostics['recommendations'] = self._generate_recommendations(diagnostics['findings'])
        
        return diagnostics
    
    def _analyze_output(self, command: str, output: str) -> List[str]:
        """Analyze command output for issues"""
        issues = []
        
        if 'show interfaces' in command:
            # Check for errors, drops
            if 'error' in output.lower() or 'drop' in output.lower():
                issues.append("Interface errors or drops detected")
                
        elif 'show ip ospf neighbor' in command:
            # Check for OSPF adjacencies
            if 'FULL' not in output:
                issues.append("OSPF adjacencies not fully established")
        
        return issues
    
    def _generate_recommendations(self, findings: List[str]) -> List[str]:
        """Generate recommendations based on findings"""
        recommendations = []
        
        for finding in findings:
            if 'Interface errors' in finding:
                recommendations.append("Check physical connections and cable quality")
                recommendations.append("Verify duplex and speed settings")
            elif 'OSPF adjacencies' in finding:
                recommendations.append("Verify OSPF network statements")
                recommendations.append("Check MTU consistency across links")
                
        return recommendations

class NetworkAutomationAgent:
    """Intelligent agent for network automation tasks"""
    
    def __init__(self, llm, device_credentials: Dict[str, str]):
        self.llm = llm
        self.device_credentials = device_credentials
        
        # Initialize tools
        self.show_tool = ShowCommandTool(**device_credentials)
        self.config_tool = ConfigurationTool(**device_credentials)
        self.troubleshoot_tool = TroubleshootingTool(**device_credentials)
        
        # Define available tools for the agent
        self.tools = [
            Tool(
                name="show_command",
                func=self.show_tool.run,
                description="Execute show commands to gather network information"
            ),
            Tool(
                name="configure",
                func=self.config_tool.run,
                description="Apply configuration changes to network devices"
            ),
            Tool(
                name="troubleshoot",
                func=self.troubleshoot_tool.run,
                description="Perform automated troubleshooting diagnostics"
            )
        ]
        
        # Agent configuration
        self.agent_executor = self._create_agent_executor()
        
    def _create_agent_executor(self) -> AgentExecutor:
        """Create the agent executor with ReAct framework"""
        
        # Custom prompt template for network automation
        template = """You are an expert network automation engineer. Your goal is to help configure, monitor, and troubleshoot network devices.

You have access to the following tools:
{tools}

Use the following format:
Thought: Think about what you need to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now have enough information to provide a solution
Final Answer: the final answer to the original problem

Current task: {input}

Begin!

{agent_scratchpad}"""

        prompt = StringPromptTemplate(
            template=template,
            input_variables=["input", "agent_scratchpad"],
            partial_variables={
                "tools": "\n".join([f"{t.name}: {t.description}" for t in self.tools]),
                "tool_names": ", ".join([t.name for t in self.tools])
            }
        )
        
        # Create LLM chain
        llm_chain = LLMChain(llm=self.llm, prompt=prompt)
        
        # Create agent
        agent = LLMSingleActionAgent(
            llm_chain=llm_chain,
            output_parser=self._create_output_parser(),
            stop=["\nObservation:"],
            allowed_tools=[t.name for t in self.tools]
        )
        
        # Create executor
        return AgentExecutor.from_agent_and_tools(
            agent=agent,
            tools=self.tools,
            verbose=True,
            max_iterations=5,
            early_stopping_method="generate"
        )
    
    def _create_output_parser(self):
        """Create custom output parser for agent responses"""
        class NetworkAgentOutputParser:
            def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
                # Parse action
                if "Action:" in llm_output:
                    action_match = re.search(r"Action:\s*(.*?)\n", llm_output)
                    action_input_match = re.search(r"Action Input:\s*(.*?)(?:\n|$)", llm_output, re.DOTALL)
                    
                    if action_match and action_input_match:
                        action = action_match.group(1).strip()
                        action_input = action_input_match.group(1).strip()
                        
                        return AgentAction(
                            tool=action,
                            tool_input=action_input,
                            log=llm_output
                        )
                
                # Parse final answer
                if "Final Answer:" in llm_output:
                    final_answer = llm_output.split("Final Answer:")[-1].strip()
                    return AgentFinish(
                        return_values={"output": final_answer},
                        log=llm_output
                    )
                
                # Default to continuing
                return AgentAction(
                    tool="show_command",
                    tool_input="show version",
                    log=llm_output
                )
        
        return NetworkAgentOutputParser()
    
    def execute_task(self, task: str) -> str:
        """Execute a network automation task"""
        try:
            result = self.agent_executor.run(task)
            return result
        except Exception as e:
            return f"Task execution failed: {str(e)}"
        finally:
            # Clean up connections
            self.show_tool.close()
            self.config_tool.close()
            self.troubleshoot_tool.close()
                

Part 2: Multi-Agent Network Provisioning

Orchestrating Multiple Agents for Complex Tasks


from enum import Enum
from typing import List, Dict, Any, Optional
import asyncio
from dataclasses import dataclass
from langchain.memory import ConversationBufferMemory

class AgentRole(Enum):
    """Define specialized agent roles"""
    PLANNER = "planner"
    CONFIGURATOR = "configurator"
    VALIDATOR = "validator"
    DOCUMENTER = "documenter"

@dataclass
class NetworkTask:
    """Represents a network automation task"""
    task_id: str
    description: str
    priority: int
    requirements: List[str]
    status: str = "pending"
    result: Optional[str] = None

class SpecializedAgent:
    """Base class for specialized agents"""
    
    def __init__(self, role: AgentRole, llm):
        self.role = role
        self.llm = llm
        self.memory = ConversationBufferMemory()
        
    async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
        """Process task based on agent role"""
        raise NotImplementedError

class PlannerAgent(SpecializedAgent):
    """Agent responsible for planning network changes"""
    
    def __init__(self, llm):
        super().__init__(AgentRole.PLANNER, llm)
        
    async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
        """Create detailed plan for network task"""
        
        prompt = f"""
        As a network planning specialist, create a detailed plan for:
        Task: {task.description}
        Requirements: {task.requirements}
        Current Network State: {context.get('network_state', 'Unknown')}
        
        Provide:
        1. Step-by-step configuration plan
        2. Required prerequisites
        3. Risk assessment
        4. Rollback strategy
        """
        
        plan = await self.llm.agenerate([prompt])
        
        return {
            'plan': plan.generations[0][0].text,
            'risks': self._identify_risks(task),
            'prerequisites': self._check_prerequisites(task, context)
        }
    
    def _identify_risks(self, task: NetworkTask) -> List[str]:
        """Identify potential risks in the task"""
        risks = []
        
        if 'production' in task.description.lower():
            risks.append("Production environment change - requires change window")
        if 'routing' in task.description.lower():
            risks.append("Routing changes may affect network connectivity")
        if 'firewall' in task.description.lower():
            risks.append("Firewall changes may block legitimate traffic")
            
        return risks
    
    def _check_prerequisites(self, task: NetworkTask, context: Dict) -> List[str]:
        """Check task prerequisites"""
        prerequisites = []
        
        if 'vlan' in task.description.lower():
            prerequisites.append("Verify VLAN availability")
        if 'ip' in task.description.lower():
            prerequisites.append("Verify IP address availability")
            
        return prerequisites

class ConfiguratorAgent(SpecializedAgent):
    """Agent responsible for implementing configurations"""
    
    def __init__(self, llm, network_tools):
        super().__init__(AgentRole.CONFIGURATOR, llm)
        self.network_tools = network_tools
        
    async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
        """Implement network configurations based on plan"""
        
        plan = context.get('plan', {})
        
        if not plan:
            return {'error': 'No plan provided for configuration'}
        
        # Generate configuration commands
        config_commands = await self._generate_configs(plan, task)
        
        # Apply configurations with safety checks
        results = []
        for device, commands in config_commands.items():
            if await self._safety_check(commands):
                result = await self._apply_config(device, commands)
                results.append(result)
            else:
                results.append({
                    'device': device,
                    'status': 'blocked',
                    'reason': 'Failed safety check'
                })
        
        return {
            'configurations_applied': results,
            'timestamp': datetime.now().isoformat()
        }
    
    async def _generate_configs(self, plan: Dict, task: NetworkTask) -> Dict[str, List[str]]:
        """Generate device-specific configurations"""
        # In production, this would parse the plan and generate actual configs
        return {
            'router1': [
                'interface GigabitEthernet0/1',
                'description Configured by AI Agent',
                'ip address 192.168.1.1 255.255.255.0',
                'no shutdown'
            ]
        }
    
    async def _safety_check(self, commands: List[str]) -> bool:
        """Validate configuration safety"""
        dangerous_commands = ['reload', 'write erase', 'no router']
        
        for cmd in commands:
            if any(danger in cmd.lower() for danger in dangerous_commands):
                return False
        return True
    
    async def _apply_config(self, device: str, commands: List[str]) -> Dict:
        """Apply configuration to device"""
        # Simulate configuration application
        return {
            'device': device,
            'status': 'success',
            'commands_applied': len(commands)
        }

class ValidatorAgent(SpecializedAgent):
    """Agent responsible for validating configurations"""
    
    def __init__(self, llm):
        super().__init__(AgentRole.VALIDATOR, llm)
        
    async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
        """Validate that configurations meet requirements"""
        
        configurations = context.get('configurations_applied', [])
        
        validation_results = []
        for config in configurations:
            result = await self._validate_config(config, task.requirements)
            validation_results.append(result)
        
        return {
            'validation_status': 'passed' if all(r['valid'] for r in validation_results) else 'failed',
            'validation_details': validation_results,
            'recommendations': self._generate_recommendations(validation_results)
        }
    
    async def _validate_config(self, config: Dict, requirements: List[str]) -> Dict:
        """Validate individual configuration"""
        return {
            'device': config.get('device'),
            'valid': config.get('status') == 'success',
            'checks_performed': [
                'Syntax validation',
                'Requirements compliance',
                'Best practices check'
            ]
        }
    
    def _generate_recommendations(self, results: List[Dict]) -> List[str]:
        """Generate recommendations based on validation"""
        recommendations = []
        
        for result in results:
            if not result['valid']:
                recommendations.append(f"Review configuration on {result['device']}")
        
        return recommendations

class MultiAgentOrchestrator:
    """Orchestrate multiple agents for complex network tasks"""
    
    def __init__(self, llm):
        self.llm = llm
        self.agents = {
            AgentRole.PLANNER: PlannerAgent(llm),
            AgentRole.CONFIGURATOR: ConfiguratorAgent(llm, {}),
            AgentRole.VALIDATOR: ValidatorAgent(llm)
        }
        self.task_queue = asyncio.Queue()
        self.results = {}
        
    async def execute_workflow(self, task: NetworkTask) -> Dict[str, Any]:
        """Execute multi-agent workflow"""
        
        workflow_context = {
            'task_id': task.task_id,
            'network_state': await self._get_network_state()
        }
        
        # Step 1: Planning
        print(f"🎯 Planning phase for task: {task.task_id}")
        plan_result = await self.agents[AgentRole.PLANNER].process(task, workflow_context)
        workflow_context['plan'] = plan_result
        
        # Step 2: Human approval (if required)
        if await self._requires_approval(plan_result):
            approval = await self._get_human_approval(plan_result)
            if not approval:
                return {'status': 'rejected', 'reason': 'Human approval denied'}
        
        # Step 3: Configuration
        print(f"⚙️ Configuration phase for task: {task.task_id}")
        config_result = await self.agents[AgentRole.CONFIGURATOR].process(task, workflow_context)
        workflow_context['configurations_applied'] = config_result
        
        # Step 4: Validation
        print(f"✅ Validation phase for task: {task.task_id}")
        validation_result = await self.agents[AgentRole.VALIDATOR].process(task, workflow_context)
        
        return {
            'task_id': task.task_id,
            'status': 'completed',
            'plan': plan_result,
            'configuration': config_result,
            'validation': validation_result
        }
    
    async def _get_network_state(self) -> Dict:
        """Get current network state"""
        # In production, gather actual network state
        return {
            'devices': ['router1', 'switch1'],
            'vlans': [1, 10, 20],
            'interfaces_available': 5
        }
    
    async def _requires_approval(self, plan: Dict) -> bool:
        """Check if human approval is required"""
        risks = plan.get('risks', [])
        return len(risks) > 0 or 'production' in str(plan).lower()
    
    async def _get_human_approval(self, plan: Dict) -> bool:
        """Get human approval for plan"""
        # In production, send notification and wait for approval
        print(f"⚠️ Human approval required for plan with risks: {plan.get('risks', [])}")
        # Simulate approval
        return True

# Create workflow execution function
async def execute_network_automation_workflow(task_description: str):
    """Execute complete network automation workflow"""
    
    # Initialize LLM (using local Llama model)
    from langchain.llms import LlamaCpp
    llm = LlamaCpp(
        model_path="./models/llama-3-8b.gguf",
        temperature=0.3,
        max_tokens=1000
    )
    
    # Create orchestrator
    orchestrator = MultiAgentOrchestrator(llm)
    
    # Create task
    task = NetworkTask(
        task_id="TASK-001",
        description=task_description,
        priority=1,
        requirements=[
            "Configure VLAN 100",
            "Set up inter-VLAN routing",
            "Apply security policies"
        ]
    )
    
    # Execute workflow
    result = await orchestrator.execute_workflow(task)
    
    return result

# Usage example
# asyncio.run(execute_network_automation_workflow("Configure new VLAN for engineering department"))
                

Part 3: Human-in-the-Loop Approval Workflows

Safe AI Operations with Human Oversight

Approval Workflow Architecture

1
AI Proposes Action

Agent generates configuration plan with risk assessment

2
Risk Evaluation

System evaluates risk level and determines if approval needed

3
Human Review

Network engineer reviews and approves/rejects/modifies

4
Execution

Approved actions are executed with monitoring


from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uuid
from datetime import datetime, timedelta
import asyncio
from enum import Enum

app = FastAPI(title="Human-in-the-Loop Network Automation")

class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"
    MODIFIED = "modified"

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class NetworkChange(BaseModel):
    change_id: str
    description: str
    device: str
    commands: List[str]
    risk_level: RiskLevel
    impact_assessment: str
    rollback_plan: str
    created_by: str = "AI Agent"
    created_at: datetime = datetime.now()

class ApprovalRequest(BaseModel):
    request_id: str
    change: NetworkChange
    status: ApprovalStatus = ApprovalStatus.PENDING
    approver: Optional[str] = None
    approval_time: Optional[datetime] = None
    comments: Optional[str] = None
    modifications: Optional[List[str]] = None
    expiry_time: datetime

class HumanInTheLoopSystem:
    """Manage human approval workflows for network changes"""
    
    def __init__(self):
        self.approval_queue = {}
        self.approval_history = []
        self.notification_channels = []
        
    async def submit_for_approval(self, change: NetworkChange) -> ApprovalRequest:
        """Submit change for human approval"""
        
        # Determine if approval is needed
        if not self._requires_approval(change):
            # Auto-approve low-risk changes
            return self._auto_approve(change)
        
        # Create approval request
        request = ApprovalRequest(
            request_id=str(uuid.uuid4()),
            change=change,
            expiry_time=datetime.now() + timedelta(hours=1)
        )
        
        # Add to queue
        self.approval_queue[request.request_id] = request
        
        # Send notifications
        await self._send_notifications(request)
        
        return request
    
    def _requires_approval(self, change: NetworkChange) -> bool:
        """Determine if change requires human approval"""
        
        # Always require approval for high/critical risk
        if change.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
            return True
        
        # Check for sensitive commands
        sensitive_commands = [
            'router bgp', 'router ospf', 'access-list',
            'crypto', 'aaa', 'username', 'enable secret'
        ]
        
        for cmd in change.commands:
            if any(sensitive in cmd.lower() for sensitive in sensitive_commands):
                return True
        
        # Production devices always need approval
        if 'prod' in change.device.lower():
            return True
        
        return False
    
    def _auto_approve(self, change: NetworkChange) -> ApprovalRequest:
        """Auto-approve low-risk changes"""
        request = ApprovalRequest(
            request_id=str(uuid.uuid4()),
            change=change,
            status=ApprovalStatus.APPROVED,
            approver="AUTO",
            approval_time=datetime.now(),
            comments="Auto-approved: Low risk change",
            expiry_time=datetime.now()
        )
        
        self.approval_history.append(request)
        return request
    
    async def _send_notifications(self, request: ApprovalRequest):
        """Send notifications to approvers"""
        
        # Send to multiple channels
        notifications = []
        
        # Slack notification
        notifications.append(
            self._send_slack_notification(request)
        )
        
        # Email notification
        notifications.append(
            self._send_email_notification(request)
        )
        
        # SMS for critical changes
        if request.change.risk_level == RiskLevel.CRITICAL:
            notifications.append(
                self._send_sms_notification(request)
            )
        
        await asyncio.gather(*notifications)
    
    async def _send_slack_notification(self, request: ApprovalRequest):
        """Send Slack notification"""
        # Implement Slack webhook integration
        message = {
            "text": f"Network Change Approval Required",
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": f"🔔 Approval Required: {request.change.description}"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {"type": "mrkdwn", "text": f"*Device:* {request.change.device}"},
                        {"type": "mrkdwn", "text": f"*Risk Level:* {request.change.risk_level.value}"},
                        {"type": "mrkdwn", "text": f"*Impact:* {request.change.impact_assessment}"}
                    ]
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "Approve"},
                            "style": "primary",
                            "action_id": f"approve_{request.request_id}"
                        },
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "Reject"},
                            "style": "danger",
                            "action_id": f"reject_{request.request_id}"
                        },
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "View Details"},
                            "action_id": f"view_{request.request_id}"
                        }
                    ]
                }
            ]
        }
        # Send to Slack
        print(f"Slack notification sent: {message}")
    
    async def _send_email_notification(self, request: ApprovalRequest):
        """Send email notification"""
        # Implement email sending
        print(f"Email notification sent for request {request.request_id}")
    
    async def _send_sms_notification(self, request: ApprovalRequest):
        """Send SMS for critical changes"""
        # Implement SMS sending (Twilio, etc.)
        print(f"SMS alert sent for critical change {request.request_id}")
    
    async def process_approval(self, request_id: str, decision: ApprovalStatus, 
                             approver: str, comments: Optional[str] = None,
                             modifications: Optional[List[str]] = None):
        """Process approval decision"""
        
        if request_id not in self.approval_queue:
            raise ValueError(f"Request {request_id} not found")
        
        request = self.approval_queue[request_id]
        
        # Check if request expired
        if datetime.now() > request.expiry_time:
            raise ValueError("Approval request has expired")
        
        # Update request
        request.status = decision
        request.approver = approver
        request.approval_time = datetime.now()
        request.comments = comments
        request.modifications = modifications
        
        # Move to history
        del self.approval_queue[request_id]
        self.approval_history.append(request)
        
        # Execute if approved
        if decision == ApprovalStatus.APPROVED:
            await self._execute_change(request)
        
        return request
    
    async def _execute_change(self, request: ApprovalRequest):
        """Execute approved network change"""
        print(f"Executing approved change: {request.change.description}")
        
        # Apply modifications if any
        commands = request.modifications or request.change.commands
        
        # Execute commands (integrate with network tools)
        # ... implementation ...
        
        # Log execution
        print(f"Change {request.request_id} executed successfully")

# API Endpoints
approval_system = HumanInTheLoopSystem()

@app.post("/submit-change")
async def submit_change(change: NetworkChange):
    """Submit network change for approval"""
    request = await approval_system.submit_for_approval(change)
    return {"request_id": request.request_id, "status": request.status.value}

@app.post("/approve/{request_id}")
async def approve_change(request_id: str, approver: str, comments: Optional[str] = None):
    """Approve a network change"""
    try:
        result = await approval_system.process_approval(
            request_id, ApprovalStatus.APPROVED, approver, comments
        )
        return {"status": "approved", "executed": True}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/pending-approvals")
async def get_pending_approvals():
    """Get all pending approval requests"""
    return list(approval_system.approval_queue.values())
                

Week 4 Deliverables

🤖 Week 4 Achievements

You've entered the future of network automation with AI agents!