#!/usr/bin/env python3
"""
GNS3 Network Topology Management Module
This module provides asynchronous functionality for creating and
configuring network topologies in GNS3, including device creation,
configuration, and network infrastructure setup.
"""
import asyncio
import logging
from io import BytesIO
import tarfile
from typing import List, Dict, Optional, Union, Tuple, Any
import aiohttp
import aiodocker
from aiodocker import Docker as docker
from ptovnetlab.data_classes import Switch, Connection
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
[docs]
logger = logging.getLogger(__name__)
[docs]
class GNS3WorkerError(Exception):
"""Custom exception for GNS3 Worker related errors."""
pass
[docs]
class ContainerConfigurationError(GNS3WorkerError):
"""Exception raised for errors during container configuration."""
pass
[docs]
def invoker(
servername: str,
gns3_url: str,
switches: List[Switch],
prj_id: str,
connections: List[Connection]
) -> str:
"""
Synchronous entry point for creating GNS3 project nodes and connections.
This function provides a synchronous wrapper for the asynchronous main_job,
allowing the module to be called from synchronous code.
Args:
servername (str): The name of the aiohttp.ClientSession object
gns3_url (str): The URL for the GNS3 server
switches (List[Switch]): List of Switch objects to be emulated
prj_id (str): The GNS3 project ID
connections (List[Connection]): List of connections to make between nodes
Returns:
str: Status message indicating completion or error
Raises:
GNS3WorkerError: If there are issues during the GNS3 project setup
"""
try:
logger.info('Initiating GNS3 project node and connection creation')
result = asyncio.run(
main_job(servername, gns3_url, switches, prj_id, connections)
)
logger.info('GNS3 project setup completed successfully')
return result
except Exception as e:
logger.error(f'GNS3 project setup failed: {e}')
raise GNS3WorkerError(f'Project setup failed: {e}')
[docs]
async def main_job(
servername: str,
gns3_url: str,
switches: List[Switch],
prj_id: str,
connections: List[Connection]
) -> str:
"""
Asynchronously create GNS3 nodes, configure containers, and establish connections.
Args:
servername (str): The name of the aiohttp.ClientSession object
gns3_url (str): The URL for the GNS3 server
switches (List[Switch]): List of Switch objects to be emulated
prj_id (str): The GNS3 project ID
connections (List[Connection]): List of connections to make between nodes
Returns:
str: Status message indicating completion
"""
logger.info('Creating nodes in the GNS3 project.')
# Configure session timeout
timeout_seconds = 30
session_timeout = aiohttp.ClientTimeout(
total=None,
sock_connect=timeout_seconds,
sock_read=timeout_seconds
)
async with aiohttp.ClientSession(timeout=session_timeout) as session:
# Position nodes in the project
nodex, nodey = -825, -375
# Create nodes
async with asyncio.TaskGroup() as tg1:
node_tasks = []
for switch in switches:
task = tg1.create_task(
make_a_gns3_node(
switch, session, gns3_url, nodex, nodey, prj_id
)
)
node_tasks.append(task)
# Update node positioning
nodex += 150
if nodex > 400:
nodex = -800
nodey += 200
# Update switches with node information
switches = [await task for task in node_tasks]
# Configure Docker containers
docker_client = aiodocker.Docker(url=f"http://{servername}:2375")
try:
async with asyncio.TaskGroup() as tg2:
config_tasks = []
for switch in switches:
task = tg2.create_task(
docker_api_config(switch, docker_client, servername)
)
config_tasks.append(task)
# Wait for configuration tasks
results = [await task for task in config_tasks]
logger.info("Configuration copying completed for all switches")
finally:
await docker_client.close()
# Establish connections between nodes
async with asyncio.TaskGroup() as tg3:
for connection in connections:
a_node_id, b_node_id = _find_connection_nodes(switches, connection)
if a_node_id and b_node_id:
try:
a_adapter = _parse_port(connection.port_a)
b_adapter = _parse_port(connection.port_b)
make_link_url = f"{gns3_url}projects/{prj_id}/links"
make_link_json = {
'nodes': [
{
'adapter_number': int(a_adapter),
'node_id': a_node_id,
'port_number': 0
},
{
'adapter_number': int(b_adapter),
'node_id': b_node_id,
'port_number': 0
}
]
}
tg3.create_task(
gns3_post(
session,
str(make_link_url),
'post',
jsondata=make_link_json
)
)
except ValueError as e:
logger.error(
f"Error parsing ports for connection "
f"{connection.switch_a}:{connection.port_a} -> "
f"{connection.switch_b}:{connection.port_b}: {e}"
)
return "Virtual Network Lab is ready to run."
def _find_connection_nodes(
switches: List[Switch],
connection: Connection
) -> Tuple[Optional[str], Optional[str]]:
"""
Find node IDs for a given connection.
Args:
switches (List[Switch]): List of switches
connection (Connection): Connection to find nodes for
Returns:
Tuple[Optional[str], Optional[str]]: Node IDs for switches A and B
"""
a_node_id = next(
(switch.gns3_node_id for switch in switches
if switch.lldp_system_name == connection.switch_a),
None
)
b_node_id = next(
(switch.gns3_node_id for switch in switches
if switch.lldp_system_name == connection.switch_b),
None
)
return a_node_id, b_node_id
def _parse_port(port: str) -> str:
"""
Parse port string to extract adapter number.
Args:
port (str): Port string (e.g., 'Ethernet1/1')
Returns:
str: Extracted adapter number
Raises:
ValueError: If port format is invalid
"""
if not port.lower().startswith('ethernet'):
raise ValueError(f"Invalid port format: {port}. Port must start with 'ethernet'")
# Extract numbers after 'ethernet', before any '/'
port_base = port.lower().split('/')[0]
adapter_num = ''.join(filter(str.isdigit, port_base))
if not adapter_num:
raise ValueError(f"Could not extract adapter number from port: {port}")
return adapter_num
[docs]
async def docker_api_config(
switch: Switch,
docker_client: aiodocker.Docker,
servername: str = 'localhost'
) -> str:
"""
Configure Docker container with switch startup configuration.
Args:
switch (Switch): Switch object containing configuration
docker_client (aiodocker.Docker): Docker client for API interactions
servername (str, optional): Docker daemon hostname. Defaults to 'localhost'.
Returns:
str: Configuration status
"""
try:
logger.info(f"Starting configuration for switch {switch.name}")
# Validate configuration
if not switch.initial_config:
logger.warning(f"Empty configuration for switch {switch.name}")
return 'skipped - empty config'
# Prepare configuration string
config_string = '\n'.join(switch.initial_config)
ascii_config = config_string.encode('ascii')
# Create tar archive
fh = BytesIO()
with tarfile.open(fileobj=fh, mode='w') as tarch:
info = tarfile.TarInfo('startup-config')
info.size = len(ascii_config)
bytes_to_go = BytesIO(ascii_config)
tarch.addfile(info, bytes_to_go)
fh.seek(0)
archive_content = fh.read()
logger.info(f"Configuration archive created for {switch.name}")
# Async Docker API session
async with aiohttp.ClientSession(base_url=f"http://{servername}:2375") as session:
# Start container
await _start_container(session, switch.docker_container_id)
# Copy configuration to container
await _copy_config_to_container(
session,
switch.docker_container_id,
archive_content
)
# Move configuration file
await _move_config_file(session, switch.docker_container_id)
# Stop container
await _stop_container(session, switch.docker_container_id)
logger.info(f"Successfully configured switch {switch.name}")
return 'success'
except Exception as e:
logger.error(f"Error configuring switch {switch.name}: {e}")
return f'failed - {str(e)}'
async def _start_container(session: aiohttp.ClientSession, container_id: str) -> None:
"""
Start a Docker container.
Args:
session (aiohttp.ClientSession): Async HTTP session
container_id (str): Docker container ID
Raises:
ContainerConfigurationError: If container start fails
"""
async with session.post(f"/containers/{container_id}/start") as response:
if response.status not in [204, 304]:
raise ContainerConfigurationError(
f"Could not start container. Status: {response.status}"
)
# Wait for container to be ready
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < 20:
async with session.get(f"/containers/{container_id}/json") as response:
if response.status == 200:
container_info = await response.json()
if container_info['State']['Running']:
return
await asyncio.sleep(1)
raise ContainerConfigurationError(f"Container {container_id} did not become ready")
async def _copy_config_to_container(
session: aiohttp.ClientSession,
container_id: str,
archive_content: bytes
) -> None:
"""
Copy configuration archive to container.
Args:
session (aiohttp.ClientSession): Async HTTP session
container_id (str): Docker container ID
archive_content (bytes): Configuration archive content
Raises:
ContainerConfigurationError: If file copy fails
"""
headers = {'Content-Type': 'application/x-tar'}
async with session.put(
f"/containers/{container_id}/archive",
params={'path': '/'},
headers=headers,
data=archive_content
) as response:
if response.status != 200:
raise ContainerConfigurationError(
f"Error copying configuration. Status: {response.status}"
)
async def _move_config_file(
session: aiohttp.ClientSession,
container_id: str
) -> None:
"""
Move configuration file to correct location in container.
Args:
session (aiohttp.ClientSession): Async HTTP session
container_id (str): Docker container ID
Raises:
ContainerConfigurationError: If file move fails
"""
# Create exec instance for mv command
async with session.post(
f"/containers/{container_id}/exec",
json={
"AttachStdout": True,
"AttachStderr": True,
"Cmd": ["mv", "/startup-config", "/mnt/flash/"]
}
) as response:
if response.status != 201:
raise ContainerConfigurationError(
f"Could not create mv exec. Status: {response.status}"
)
exec_data = await response.json()
exec_id = exec_data['Id']
# Start exec instance
async with session.post(
f"/exec/{exec_id}/start",
json={"Detach": False, "Tty": False},
headers={'Content-Type': 'application/json'}
) as exec_response:
if exec_response.status != 200:
raise ContainerConfigurationError(
f"mv command exec failed. Status: {exec_response.status}"
)
async def _stop_container(
session: aiohttp.ClientSession,
container_id: str
) -> None:
"""
Stop a Docker container.
Args:
session (aiohttp.ClientSession): Async HTTP session
container_id (str): Docker container ID
Raises:
ContainerConfigurationError: If container stop fails
"""
async with session.post(f"/containers/{container_id}/stop") as response:
if response.status not in [204, 304]:
raise ContainerConfigurationError(
f"Could not stop container. Status: {response.status}"
)
[docs]
async def make_a_gns3_node(
switch: Switch,
session: aiohttp.ClientSession,
gns3_url: str,
nodex: int,
nodey: int,
prj_id: str
) -> Switch:
"""
Create a GNS3 node for a switch.
Args:
switch (Switch): Switch to create a node for
session (aiohttp.ClientSession): Async HTTP session
gns3_url (str): Base URL for GNS3 API
nodex (int): X coordinate for node placement
nodey (int): Y coordinate for node placement
prj_id (str): GNS3 project ID
Returns:
Switch: Updated switch with GNS3 node details
"""
try:
# Duplicate template
async with session.post(
f'{gns3_url}templates/{switch.gns3_template_id}/duplicate',
json=[]
) as response:
response.raise_for_status()
json_data = await response.json()
tmp_template_id = json_data['template_id']
# Update interface count
async with session.put(
f'{gns3_url}templates/{tmp_template_id}',
json={'adapters': switch.ethernet_interfaces + 1}
) as response:
response.raise_for_status()
# Create node
async with session.post(
f'{gns3_url}projects/{prj_id}/templates/{tmp_template_id}',
json={'x': nodex, 'y': nodey}
) as response:
response.raise_for_status()
json_data = await response.json()
switch.gns3_node_id = json_data['node_id']
# Delete temporary template
async with session.delete(f'{gns3_url}templates/{tmp_template_id}'):
pass
# Rename node
async with session.put(
f'{gns3_url}projects/{prj_id}/nodes/{switch.gns3_node_id}',
json={'name': switch.name}
) as response:
response.raise_for_status()
# Get container ID
async with session.get(
f'{gns3_url}projects/{prj_id}/nodes/{switch.gns3_node_id}'
) as response:
response.raise_for_status()
json_data = await response.json()
switch.docker_container_id = json_data['properties']['container_id']
return switch
except aiohttp.ClientResponseError as e:
logger.error(f"GNS3 node creation failed: {e}")
raise GNS3WorkerError(f"Failed to create GNS3 node: {e}")
[docs]
async def gns3_post(
session: aiohttp.ClientSession,
url: str,
method: str,
**kwargs
) -> None:
"""
Send an async request to GNS3 server.
Args:
session (aiohttp.ClientSession): Async HTTP session
url (str): URL to send request to
method (str): HTTP method (get, post, put)
**kwargs: Additional arguments for the request
Raises:
GNS3WorkerError: If request fails
"""
try:
jsondata = kwargs.get('jsondata', {})
if method == 'post':
async with session.post(url, json=jsondata) as response:
response.raise_for_status()
elif method == 'get':
async with session.get(url, json=jsondata) as response:
response.raise_for_status()
elif method == 'put':
async with session.put(url, json=jsondata) as response:
response.raise_for_status()
# Small delay to prevent overwhelming the server
await asyncio.sleep(0.2)
except aiohttp.ClientResponseError as e:
logger.error(f"GNS3 API request failed: {e}")
raise GNS3WorkerError(f"API request failed: {e}")