The Future: Autonomous Network Operations
This week marks a paradigm shift. We move from AI that answers questions to AI that takes action. Your students won't just learn about networks - they'll have AI agents that can configure, troubleshoot, and optimize networks autonomously.
🤖 Autonomous Agents
Build agents that reason, plan, and execute network tasks
🔄 Multi-Agent Systems
Coordinate multiple agents for complex network operations
👤 Human-in-the-Loop
Implement approval workflows for critical operations
Part 1: Building Intelligent Network Agents
The ReAct Framework: Reasoning and Acting
Agent Cognitive Loop
1. Observe: Gather information about current network state
2. Think: Reason about the situation and plan next steps
3. Act: Execute commands or gather more information
4. Reflect: Evaluate results and adjust strategy
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import StringPromptTemplate
from langchain.chains import LLMChain
from langchain.schema import AgentAction, AgentFinish
from typing import List, Union, Optional, Dict, Any
import paramiko
import re
import json
from datetime import datetime
class NetworkTool:
"""Base class for network automation tools"""
def __init__(self, device_ip: str, username: str, password: str):
self.device_ip = device_ip
self.username = username
self.password = password
self.ssh_client = None
def connect(self):
"""Establish SSH connection to network device"""
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_client.connect(
self.device_ip,
username=self.username,
password=self.password,
look_for_keys=False
)
def execute_command(self, command: str) -> str:
"""Execute command on network device"""
if not self.ssh_client:
self.connect()
stdin, stdout, stderr = self.ssh_client.exec_command(command)
return stdout.read().decode()
def close(self):
"""Close SSH connection"""
if self.ssh_client:
self.ssh_client.close()
class ShowCommandTool(NetworkTool):
"""Tool for executing show commands"""
def run(self, command: str) -> str:
"""Execute a show command safely"""
# Validate it's a safe show command
if not command.strip().startswith('show'):
return "Error: Only 'show' commands are allowed with this tool"
try:
output = self.execute_command(command)
return output
except Exception as e:
return f"Error executing command: {str(e)}"
class ConfigurationTool(NetworkTool):
"""Tool for configuration changes with rollback capability"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config_history = []
def run(self, config_commands: List[str]) -> str:
"""Apply configuration with automatic backup"""
try:
# Backup current config
current_config = self.execute_command("show running-config")
self.config_history.append({
'timestamp': datetime.now().isoformat(),
'config': current_config
})
# Apply new configuration
config_mode_commands = ['configure terminal'] + config_commands + ['end']
results = []
for cmd in config_mode_commands:
result = self.execute_command(cmd)
results.append(f"{cmd}: {result}")
# Verify configuration
verify_output = self.execute_command("show running-config | include " + config_commands[0].split()[0])
return f"Configuration applied successfully:\n" + "\n".join(results) + f"\n\nVerification:\n{verify_output}"
except Exception as e:
# Rollback on error
self.rollback()
return f"Configuration failed and rolled back: {str(e)}"
def rollback(self):
"""Rollback to previous configuration"""
if self.config_history:
last_config = self.config_history[-1]['config']
# In production, implement proper rollback mechanism
print(f"Rolling back to configuration from {self.config_history[-1]['timestamp']}")
class TroubleshootingTool(NetworkTool):
"""Advanced troubleshooting tool with diagnostic capabilities"""
def run(self, issue_description: str) -> Dict[str, Any]:
"""Diagnose network issues intelligently"""
diagnostics = {
'issue': issue_description,
'checks_performed': [],
'findings': [],
'recommendations': []
}
# Common diagnostic commands based on issue type
if 'connectivity' in issue_description.lower():
commands = [
'show ip interface brief',
'show ip route',
'show arp',
'show interfaces status'
]
elif 'ospf' in issue_description.lower():
commands = [
'show ip ospf neighbor',
'show ip ospf interface',
'show ip ospf database'
]
else:
commands = ['show version', 'show interfaces status']
for cmd in commands:
try:
output = self.execute_command(cmd)
diagnostics['checks_performed'].append(cmd)
# Analyze output for issues
issues = self._analyze_output(cmd, output)
if issues:
diagnostics['findings'].extend(issues)
except Exception as e:
diagnostics['findings'].append(f"Error running {cmd}: {str(e)}")
# Generate recommendations based on findings
diagnostics['recommendations'] = self._generate_recommendations(diagnostics['findings'])
return diagnostics
def _analyze_output(self, command: str, output: str) -> List[str]:
"""Analyze command output for issues"""
issues = []
if 'show interfaces' in command:
# Check for errors, drops
if 'error' in output.lower() or 'drop' in output.lower():
issues.append("Interface errors or drops detected")
elif 'show ip ospf neighbor' in command:
# Check for OSPF adjacencies
if 'FULL' not in output:
issues.append("OSPF adjacencies not fully established")
return issues
def _generate_recommendations(self, findings: List[str]) -> List[str]:
"""Generate recommendations based on findings"""
recommendations = []
for finding in findings:
if 'Interface errors' in finding:
recommendations.append("Check physical connections and cable quality")
recommendations.append("Verify duplex and speed settings")
elif 'OSPF adjacencies' in finding:
recommendations.append("Verify OSPF network statements")
recommendations.append("Check MTU consistency across links")
return recommendations
class NetworkAutomationAgent:
"""Intelligent agent for network automation tasks"""
def __init__(self, llm, device_credentials: Dict[str, str]):
self.llm = llm
self.device_credentials = device_credentials
# Initialize tools
self.show_tool = ShowCommandTool(**device_credentials)
self.config_tool = ConfigurationTool(**device_credentials)
self.troubleshoot_tool = TroubleshootingTool(**device_credentials)
# Define available tools for the agent
self.tools = [
Tool(
name="show_command",
func=self.show_tool.run,
description="Execute show commands to gather network information"
),
Tool(
name="configure",
func=self.config_tool.run,
description="Apply configuration changes to network devices"
),
Tool(
name="troubleshoot",
func=self.troubleshoot_tool.run,
description="Perform automated troubleshooting diagnostics"
)
]
# Agent configuration
self.agent_executor = self._create_agent_executor()
def _create_agent_executor(self) -> AgentExecutor:
"""Create the agent executor with ReAct framework"""
# Custom prompt template for network automation
template = """You are an expert network automation engineer. Your goal is to help configure, monitor, and troubleshoot network devices.
You have access to the following tools:
{tools}
Use the following format:
Thought: Think about what you need to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now have enough information to provide a solution
Final Answer: the final answer to the original problem
Current task: {input}
Begin!
{agent_scratchpad}"""
prompt = StringPromptTemplate(
template=template,
input_variables=["input", "agent_scratchpad"],
partial_variables={
"tools": "\n".join([f"{t.name}: {t.description}" for t in self.tools]),
"tool_names": ", ".join([t.name for t in self.tools])
}
)
# Create LLM chain
llm_chain = LLMChain(llm=self.llm, prompt=prompt)
# Create agent
agent = LLMSingleActionAgent(
llm_chain=llm_chain,
output_parser=self._create_output_parser(),
stop=["\nObservation:"],
allowed_tools=[t.name for t in self.tools]
)
# Create executor
return AgentExecutor.from_agent_and_tools(
agent=agent,
tools=self.tools,
verbose=True,
max_iterations=5,
early_stopping_method="generate"
)
def _create_output_parser(self):
"""Create custom output parser for agent responses"""
class NetworkAgentOutputParser:
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
# Parse action
if "Action:" in llm_output:
action_match = re.search(r"Action:\s*(.*?)\n", llm_output)
action_input_match = re.search(r"Action Input:\s*(.*?)(?:\n|$)", llm_output, re.DOTALL)
if action_match and action_input_match:
action = action_match.group(1).strip()
action_input = action_input_match.group(1).strip()
return AgentAction(
tool=action,
tool_input=action_input,
log=llm_output
)
# Parse final answer
if "Final Answer:" in llm_output:
final_answer = llm_output.split("Final Answer:")[-1].strip()
return AgentFinish(
return_values={"output": final_answer},
log=llm_output
)
# Default to continuing
return AgentAction(
tool="show_command",
tool_input="show version",
log=llm_output
)
return NetworkAgentOutputParser()
def execute_task(self, task: str) -> str:
"""Execute a network automation task"""
try:
result = self.agent_executor.run(task)
return result
except Exception as e:
return f"Task execution failed: {str(e)}"
finally:
# Clean up connections
self.show_tool.close()
self.config_tool.close()
self.troubleshoot_tool.close()
Part 2: Multi-Agent Network Provisioning
Orchestrating Multiple Agents for Complex Tasks
from enum import Enum
from typing import List, Dict, Any, Optional
import asyncio
from dataclasses import dataclass
from langchain.memory import ConversationBufferMemory
class AgentRole(Enum):
"""Define specialized agent roles"""
PLANNER = "planner"
CONFIGURATOR = "configurator"
VALIDATOR = "validator"
DOCUMENTER = "documenter"
@dataclass
class NetworkTask:
"""Represents a network automation task"""
task_id: str
description: str
priority: int
requirements: List[str]
status: str = "pending"
result: Optional[str] = None
class SpecializedAgent:
"""Base class for specialized agents"""
def __init__(self, role: AgentRole, llm):
self.role = role
self.llm = llm
self.memory = ConversationBufferMemory()
async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
"""Process task based on agent role"""
raise NotImplementedError
class PlannerAgent(SpecializedAgent):
"""Agent responsible for planning network changes"""
def __init__(self, llm):
super().__init__(AgentRole.PLANNER, llm)
async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
"""Create detailed plan for network task"""
prompt = f"""
As a network planning specialist, create a detailed plan for:
Task: {task.description}
Requirements: {task.requirements}
Current Network State: {context.get('network_state', 'Unknown')}
Provide:
1. Step-by-step configuration plan
2. Required prerequisites
3. Risk assessment
4. Rollback strategy
"""
plan = await self.llm.agenerate([prompt])
return {
'plan': plan.generations[0][0].text,
'risks': self._identify_risks(task),
'prerequisites': self._check_prerequisites(task, context)
}
def _identify_risks(self, task: NetworkTask) -> List[str]:
"""Identify potential risks in the task"""
risks = []
if 'production' in task.description.lower():
risks.append("Production environment change - requires change window")
if 'routing' in task.description.lower():
risks.append("Routing changes may affect network connectivity")
if 'firewall' in task.description.lower():
risks.append("Firewall changes may block legitimate traffic")
return risks
def _check_prerequisites(self, task: NetworkTask, context: Dict) -> List[str]:
"""Check task prerequisites"""
prerequisites = []
if 'vlan' in task.description.lower():
prerequisites.append("Verify VLAN availability")
if 'ip' in task.description.lower():
prerequisites.append("Verify IP address availability")
return prerequisites
class ConfiguratorAgent(SpecializedAgent):
"""Agent responsible for implementing configurations"""
def __init__(self, llm, network_tools):
super().__init__(AgentRole.CONFIGURATOR, llm)
self.network_tools = network_tools
async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
"""Implement network configurations based on plan"""
plan = context.get('plan', {})
if not plan:
return {'error': 'No plan provided for configuration'}
# Generate configuration commands
config_commands = await self._generate_configs(plan, task)
# Apply configurations with safety checks
results = []
for device, commands in config_commands.items():
if await self._safety_check(commands):
result = await self._apply_config(device, commands)
results.append(result)
else:
results.append({
'device': device,
'status': 'blocked',
'reason': 'Failed safety check'
})
return {
'configurations_applied': results,
'timestamp': datetime.now().isoformat()
}
async def _generate_configs(self, plan: Dict, task: NetworkTask) -> Dict[str, List[str]]:
"""Generate device-specific configurations"""
# In production, this would parse the plan and generate actual configs
return {
'router1': [
'interface GigabitEthernet0/1',
'description Configured by AI Agent',
'ip address 192.168.1.1 255.255.255.0',
'no shutdown'
]
}
async def _safety_check(self, commands: List[str]) -> bool:
"""Validate configuration safety"""
dangerous_commands = ['reload', 'write erase', 'no router']
for cmd in commands:
if any(danger in cmd.lower() for danger in dangerous_commands):
return False
return True
async def _apply_config(self, device: str, commands: List[str]) -> Dict:
"""Apply configuration to device"""
# Simulate configuration application
return {
'device': device,
'status': 'success',
'commands_applied': len(commands)
}
class ValidatorAgent(SpecializedAgent):
"""Agent responsible for validating configurations"""
def __init__(self, llm):
super().__init__(AgentRole.VALIDATOR, llm)
async def process(self, task: NetworkTask, context: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that configurations meet requirements"""
configurations = context.get('configurations_applied', [])
validation_results = []
for config in configurations:
result = await self._validate_config(config, task.requirements)
validation_results.append(result)
return {
'validation_status': 'passed' if all(r['valid'] for r in validation_results) else 'failed',
'validation_details': validation_results,
'recommendations': self._generate_recommendations(validation_results)
}
async def _validate_config(self, config: Dict, requirements: List[str]) -> Dict:
"""Validate individual configuration"""
return {
'device': config.get('device'),
'valid': config.get('status') == 'success',
'checks_performed': [
'Syntax validation',
'Requirements compliance',
'Best practices check'
]
}
def _generate_recommendations(self, results: List[Dict]) -> List[str]:
"""Generate recommendations based on validation"""
recommendations = []
for result in results:
if not result['valid']:
recommendations.append(f"Review configuration on {result['device']}")
return recommendations
class MultiAgentOrchestrator:
"""Orchestrate multiple agents for complex network tasks"""
def __init__(self, llm):
self.llm = llm
self.agents = {
AgentRole.PLANNER: PlannerAgent(llm),
AgentRole.CONFIGURATOR: ConfiguratorAgent(llm, {}),
AgentRole.VALIDATOR: ValidatorAgent(llm)
}
self.task_queue = asyncio.Queue()
self.results = {}
async def execute_workflow(self, task: NetworkTask) -> Dict[str, Any]:
"""Execute multi-agent workflow"""
workflow_context = {
'task_id': task.task_id,
'network_state': await self._get_network_state()
}
# Step 1: Planning
print(f"🎯 Planning phase for task: {task.task_id}")
plan_result = await self.agents[AgentRole.PLANNER].process(task, workflow_context)
workflow_context['plan'] = plan_result
# Step 2: Human approval (if required)
if await self._requires_approval(plan_result):
approval = await self._get_human_approval(plan_result)
if not approval:
return {'status': 'rejected', 'reason': 'Human approval denied'}
# Step 3: Configuration
print(f"⚙️ Configuration phase for task: {task.task_id}")
config_result = await self.agents[AgentRole.CONFIGURATOR].process(task, workflow_context)
workflow_context['configurations_applied'] = config_result
# Step 4: Validation
print(f"✅ Validation phase for task: {task.task_id}")
validation_result = await self.agents[AgentRole.VALIDATOR].process(task, workflow_context)
return {
'task_id': task.task_id,
'status': 'completed',
'plan': plan_result,
'configuration': config_result,
'validation': validation_result
}
async def _get_network_state(self) -> Dict:
"""Get current network state"""
# In production, gather actual network state
return {
'devices': ['router1', 'switch1'],
'vlans': [1, 10, 20],
'interfaces_available': 5
}
async def _requires_approval(self, plan: Dict) -> bool:
"""Check if human approval is required"""
risks = plan.get('risks', [])
return len(risks) > 0 or 'production' in str(plan).lower()
async def _get_human_approval(self, plan: Dict) -> bool:
"""Get human approval for plan"""
# In production, send notification and wait for approval
print(f"⚠️ Human approval required for plan with risks: {plan.get('risks', [])}")
# Simulate approval
return True
# Create workflow execution function
async def execute_network_automation_workflow(task_description: str):
"""Execute complete network automation workflow"""
# Initialize LLM (using local Llama model)
from langchain.llms import LlamaCpp
llm = LlamaCpp(
model_path="./models/llama-3-8b.gguf",
temperature=0.3,
max_tokens=1000
)
# Create orchestrator
orchestrator = MultiAgentOrchestrator(llm)
# Create task
task = NetworkTask(
task_id="TASK-001",
description=task_description,
priority=1,
requirements=[
"Configure VLAN 100",
"Set up inter-VLAN routing",
"Apply security policies"
]
)
# Execute workflow
result = await orchestrator.execute_workflow(task)
return result
# Usage example
# asyncio.run(execute_network_automation_workflow("Configure new VLAN for engineering department"))
Part 3: Human-in-the-Loop Approval Workflows
Safe AI Operations with Human Oversight
Approval Workflow Architecture
1
AI Proposes Action
Agent generates configuration plan with risk assessment
2
Risk Evaluation
System evaluates risk level and determines if approval needed
3
Human Review
Network engineer reviews and approves/rejects/modifies
4
Execution
Approved actions are executed with monitoring
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uuid
from datetime import datetime, timedelta
import asyncio
from enum import Enum
app = FastAPI(title="Human-in-the-Loop Network Automation")
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
MODIFIED = "modified"
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class NetworkChange(BaseModel):
change_id: str
description: str
device: str
commands: List[str]
risk_level: RiskLevel
impact_assessment: str
rollback_plan: str
created_by: str = "AI Agent"
created_at: datetime = datetime.now()
class ApprovalRequest(BaseModel):
request_id: str
change: NetworkChange
status: ApprovalStatus = ApprovalStatus.PENDING
approver: Optional[str] = None
approval_time: Optional[datetime] = None
comments: Optional[str] = None
modifications: Optional[List[str]] = None
expiry_time: datetime
class HumanInTheLoopSystem:
"""Manage human approval workflows for network changes"""
def __init__(self):
self.approval_queue = {}
self.approval_history = []
self.notification_channels = []
async def submit_for_approval(self, change: NetworkChange) -> ApprovalRequest:
"""Submit change for human approval"""
# Determine if approval is needed
if not self._requires_approval(change):
# Auto-approve low-risk changes
return self._auto_approve(change)
# Create approval request
request = ApprovalRequest(
request_id=str(uuid.uuid4()),
change=change,
expiry_time=datetime.now() + timedelta(hours=1)
)
# Add to queue
self.approval_queue[request.request_id] = request
# Send notifications
await self._send_notifications(request)
return request
def _requires_approval(self, change: NetworkChange) -> bool:
"""Determine if change requires human approval"""
# Always require approval for high/critical risk
if change.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
return True
# Check for sensitive commands
sensitive_commands = [
'router bgp', 'router ospf', 'access-list',
'crypto', 'aaa', 'username', 'enable secret'
]
for cmd in change.commands:
if any(sensitive in cmd.lower() for sensitive in sensitive_commands):
return True
# Production devices always need approval
if 'prod' in change.device.lower():
return True
return False
def _auto_approve(self, change: NetworkChange) -> ApprovalRequest:
"""Auto-approve low-risk changes"""
request = ApprovalRequest(
request_id=str(uuid.uuid4()),
change=change,
status=ApprovalStatus.APPROVED,
approver="AUTO",
approval_time=datetime.now(),
comments="Auto-approved: Low risk change",
expiry_time=datetime.now()
)
self.approval_history.append(request)
return request
async def _send_notifications(self, request: ApprovalRequest):
"""Send notifications to approvers"""
# Send to multiple channels
notifications = []
# Slack notification
notifications.append(
self._send_slack_notification(request)
)
# Email notification
notifications.append(
self._send_email_notification(request)
)
# SMS for critical changes
if request.change.risk_level == RiskLevel.CRITICAL:
notifications.append(
self._send_sms_notification(request)
)
await asyncio.gather(*notifications)
async def _send_slack_notification(self, request: ApprovalRequest):
"""Send Slack notification"""
# Implement Slack webhook integration
message = {
"text": f"Network Change Approval Required",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🔔 Approval Required: {request.change.description}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Device:* {request.change.device}"},
{"type": "mrkdwn", "text": f"*Risk Level:* {request.change.risk_level.value}"},
{"type": "mrkdwn", "text": f"*Impact:* {request.change.impact_assessment}"}
]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Approve"},
"style": "primary",
"action_id": f"approve_{request.request_id}"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Reject"},
"style": "danger",
"action_id": f"reject_{request.request_id}"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "View Details"},
"action_id": f"view_{request.request_id}"
}
]
}
]
}
# Send to Slack
print(f"Slack notification sent: {message}")
async def _send_email_notification(self, request: ApprovalRequest):
"""Send email notification"""
# Implement email sending
print(f"Email notification sent for request {request.request_id}")
async def _send_sms_notification(self, request: ApprovalRequest):
"""Send SMS for critical changes"""
# Implement SMS sending (Twilio, etc.)
print(f"SMS alert sent for critical change {request.request_id}")
async def process_approval(self, request_id: str, decision: ApprovalStatus,
approver: str, comments: Optional[str] = None,
modifications: Optional[List[str]] = None):
"""Process approval decision"""
if request_id not in self.approval_queue:
raise ValueError(f"Request {request_id} not found")
request = self.approval_queue[request_id]
# Check if request expired
if datetime.now() > request.expiry_time:
raise ValueError("Approval request has expired")
# Update request
request.status = decision
request.approver = approver
request.approval_time = datetime.now()
request.comments = comments
request.modifications = modifications
# Move to history
del self.approval_queue[request_id]
self.approval_history.append(request)
# Execute if approved
if decision == ApprovalStatus.APPROVED:
await self._execute_change(request)
return request
async def _execute_change(self, request: ApprovalRequest):
"""Execute approved network change"""
print(f"Executing approved change: {request.change.description}")
# Apply modifications if any
commands = request.modifications or request.change.commands
# Execute commands (integrate with network tools)
# ... implementation ...
# Log execution
print(f"Change {request.request_id} executed successfully")
# API Endpoints
approval_system = HumanInTheLoopSystem()
@app.post("/submit-change")
async def submit_change(change: NetworkChange):
"""Submit network change for approval"""
request = await approval_system.submit_for_approval(change)
return {"request_id": request.request_id, "status": request.status.value}
@app.post("/approve/{request_id}")
async def approve_change(request_id: str, approver: str, comments: Optional[str] = None):
"""Approve a network change"""
try:
result = await approval_system.process_approval(
request_id, ApprovalStatus.APPROVED, approver, comments
)
return {"status": "approved", "executed": True}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/pending-approvals")
async def get_pending_approvals():
"""Get all pending approval requests"""
return list(approval_system.approval_queue.values())
Week 4 Deliverables
- ✓ Network Automation Agent: ReAct-based agent that can configure and troubleshoot
- ✓ Multi-Agent System: Orchestrated workflow with specialized agents
- ✓ Human-in-the-Loop: Approval system with Slack/email notifications
- ✓ Safety Framework: Risk assessment and rollback capabilities
- ✓ Lab Integration: Working agents in student lab environments
🤖 Week 4 Achievements
You've entered the future of network automation with AI agents!
- • Built autonomous agents that reason and act on network devices
- • Created multi-agent orchestration for complex workflows
- • Implemented human-in-the-loop approval systems
- • Established safety frameworks with risk assessment
- • Prepared revolutionary lab experiences for students