Skip to main content
20 min read

Vulnerability Management at Scale with Open Source Tools

Build an enterprise-grade vulnerability management program using only open source tools. From scanning to remediation tracking, here's how to do it right

Reading time: 11 minutes

The Reality of Vulnerability Management

Managing vulnerabilities across thousands of systems isn't just about running scanners – it's about building a sustainable process that scales. After implementing vulnerability management programs for organizations ranging from startups to federal agencies, I've learned that open source tools can match or exceed commercial solutions when properly integrated.

This guide shares a production-tested approach to building a comprehensive vulnerability management program using only open source tools.

The Complete Vulnerability Management Stack

Here's the stack I've successfully deployed in production:

  • Discovery: Nmap, Masscan, Rumble
  • Vulnerability Scanning: OpenVAS/GVM, Nuclei, Wazuh
  • Container Scanning: Trivy, Grype, Clair
  • Web Application: OWASP ZAP, Nikto, SQLMap
  • Orchestration: Apache Airflow, n8n
  • Data Management: PostgreSQL, Elasticsearch
  • Visualization: Grafana, Kibana
  • Ticketing: GLPI, Request Tracker

Building the Foundation

Asset Discovery and Inventory

You can't protect what you don't know exists. Start with comprehensive asset discovery:

#!/usr/bin/env python3
"""
Automated Asset Discovery and Inventory System
Combines multiple discovery methods for comprehensive coverage
"""

import asyncio
import ipaddress
import json
from datetime import datetime
import asyncpg
import aiohttp
from typing import List, Dict, Any
import subprocess
import xml.etree.ElementTree as ET

class AssetDiscoveryOrchestrator:
    def __init__(self, db_config: Dict[str, str]):
        self.db_config = db_config
        self.discovered_assets = []
        
    async def setup_database(self):
        """Initialize asset inventory database"""
        self.db = await asyncpg.create_pool(**self.db_config)
        
        await self.db.execute('''
            CREATE TABLE IF NOT EXISTS assets (
                id SERIAL PRIMARY KEY,
                ip_address INET UNIQUE NOT NULL,
                hostname TEXT,
                mac_address MACADDR,
                operating_system TEXT,
                open_ports INTEGER[],
                services JSONB,
                first_seen TIMESTAMP DEFAULT NOW(),
                last_seen TIMESTAMP DEFAULT NOW(),
                last_scanned TIMESTAMP,
                asset_type TEXT,
                business_unit TEXT,
                criticality TEXT,
                tags TEXT[],
                active BOOLEAN DEFAULT TRUE
            );
            
            CREATE INDEX IF NOT EXISTS idx_assets_ip ON assets(ip_address);
            CREATE INDEX IF NOT EXISTS idx_assets_criticality ON assets(criticality);
            CREATE INDEX IF NOT EXISTS idx_assets_last_seen ON assets(last_seen);
        ''')
    
    async def discover_with_nmap(self, network: str) -> List[Dict[str, Any]]:
        """Fast network discovery using Nmap"""
        print(f"Starting Nmap discovery for {network}")
        
        # Stage 1: Fast ping sweep
        ping_cmd = [
            'nmap', '-sn', '-n', '--max-retries', '1',
            '--max-rtt-timeout', '500ms', network,
            '-oX', '-'
        ]
        
        proc = await asyncio.create_subprocess_exec(
            *ping_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        if proc.returncode != 0:
            print(f"Nmap error: {stderr.decode()}")
            return []
        
        # Parse XML output
        root = ET.fromstring(stdout.decode())
        discovered = []
        
        for host in root.findall('.//host'):
            if host.find(".//status[@state='up']") is not None:
                addr_elem = host.find(".//address[@addrtype='ipv4']")
                if addr_elem is not None:
                    ip = addr_elem.get('addr')
                    
                    # Stage 2: Detailed scan of live hosts
                    host_info = await self.scan_host_details(ip)
                    discovered.append(host_info)
        
        return discovered
    
    async def scan_host_details(self, ip: str) -> Dict[str, Any]:
        """Detailed scan of individual host"""
        scan_cmd = [
            'nmap', '-sV', '-O', '--osscan-guess',
            '-p-', '--max-retries', '2',
            '--script', 'default,vuln',
            ip, '-oX', '-'
        ]
        
        proc = await asyncio.create_subprocess_exec(
            *scan_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        # Parse results
        host_info = {
            'ip_address': ip,
            'hostname': None,
            'operating_system': None,
            'open_ports': [],
            'services': {},
            'vulnerabilities': []
        }
        
        if proc.returncode == 0:
            root = ET.fromstring(stdout.decode())
            host = root.find('.//host')
            
            # Extract hostname
            hostname_elem = host.find(".//hostname")
            if hostname_elem is not None:
                host_info['hostname'] = hostname_elem.get('name')
            
            # Extract OS
            os_match = host.find(".//osmatch")
            if os_match is not None:
                host_info['operating_system'] = os_match.get('name')
            
            # Extract ports and services
            for port in host.findall(".//port"):
                port_id = int(port.get('portid'))
                state = port.find('state').get('state')
                
                if state == 'open':
                    host_info['open_ports'].append(port_id)
                    
                    service = port.find('service')
                    if service is not None:
                        service_info = {
                            'name': service.get('name', 'unknown'),
                            'product': service.get('product', ''),
                            'version': service.get('version', ''),
                            'extrainfo': service.get('extrainfo', '')
                        }
                        host_info['services'][str(port_id)] = service_info
            
            # Extract vulnerabilities from scripts
            for script in host.findall(".//script[@id]"):
                if 'vuln' in script.get('id'):
                    host_info['vulnerabilities'].append({
                        'script': script.get('id'),
                        'output': script.get('output', '')
                    })
        
        return host_info
    
    async def discover_with_masscan(self, network: str, ports: str = "1-65535") -> List[Dict[str, Any]]:
        """Ultra-fast port discovery using Masscan"""
        print(f"Starting Masscan discovery for {network}")
        
        scan_cmd = [
            'masscan', network, '-p', ports,
            '--rate', '10000',  # Adjust based on network capacity
            '--open-only', '-oJ', '-'
        ]
        
        proc = await asyncio.create_subprocess_exec(
            *scan_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        discovered = []
        for line in stdout.decode().strip().split('\n'):
            if line.strip() and line.startswith('{'):
                try:
                    data = json.loads(line.rstrip(','))
                    if 'ports' in data:
                        for port_info in data['ports']:
                            discovered.append({
                                'ip_address': data['ip'],
                                'port': port_info['port'],
                                'protocol': port_info['proto'],
                                'status': port_info['status']
                            })
                except json.JSONDecodeError:
                    continue
        
        return discovered
    
    async def passive_discovery(self) -> List[Dict[str, Any]]:
        """Passive asset discovery from various sources"""
        discovered = []
        
        # DNS zone transfers (if allowed)
        # DHCP logs parsing
        # ARP cache analysis
        # NetFlow/sFlow data
        # Cloud API inventory (AWS, Azure, GCP)
        
        # Example: AWS EC2 discovery
        try:
            import boto3
            ec2 = boto3.client('ec2')
            
            response = ec2.describe_instances()
            for reservation in response['Reservations']:
                for instance in reservation['Instances']:
                    if instance['State']['Name'] == 'running':
                        discovered.append({
                            'ip_address': instance.get('PrivateIpAddress'),
                            'hostname': instance.get('PrivateDnsName'),
                            'asset_type': 'aws_ec2',
                            'instance_id': instance['InstanceId'],
                            'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                        })
        except Exception as e:
            print(f"AWS discovery error: {e}")
        
        return discovered
    
    async def correlate_and_store(self, discoveries: List[List[Dict[str, Any]]]):
        """Correlate discoveries from multiple sources and store"""
        # Flatten all discoveries
        all_assets = {}
        
        for discovery_set in discoveries:
            for asset in discovery_set:
                ip = asset.get('ip_address')
                if ip:
                    if ip not in all_assets:
                        all_assets[ip] = asset
                    else:
                        # Merge information
                        all_assets[ip].update(asset)
        
        # Store in database
        async with self.db.acquire() as conn:
            for ip, asset in all_assets.items():
                await conn.execute('''
                    INSERT INTO assets (
                        ip_address, hostname, operating_system,
                        open_ports, services, asset_type, tags
                    ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                    ON CONFLICT (ip_address) DO UPDATE SET
                        hostname = EXCLUDED.hostname,
                        operating_system = EXCLUDED.operating_system,
                        open_ports = EXCLUDED.open_ports,
                        services = EXCLUDED.services,
                        last_seen = NOW()
                ''', ip, asset.get('hostname'), asset.get('operating_system'),
                    asset.get('open_ports', []), json.dumps(asset.get('services', {})),
                    asset.get('asset_type'), asset.get('tags', []))
    
    async def run_discovery(self, networks: List[str]):
        """Orchestrate discovery across multiple networks"""
        await self.setup_database()
        
        discovery_tasks = []
        
        for network in networks:
            # Run multiple discovery methods in parallel
            discovery_tasks.extend([
                self.discover_with_nmap(network),
                self.discover_with_masscan(network, "22,80,443,3389,8080,8443"),
                self.passive_discovery()
            ])
        
        discoveries = await asyncio.gather(*discovery_tasks)
        await self.correlate_and_store(discoveries)
        
        # Update asset criticality based on rules
        await self.classify_assets()
    
    async def classify_assets(self):
        """Classify assets by criticality based on characteristics"""
        classification_rules = [
            # Critical: Domain controllers, databases, key services
            {
                'condition': lambda a: any(p in a['open_ports'] for p in [88, 389, 636, 3268]),
                'criticality': 'CRITICAL',
                'reason': 'Domain Controller'
            },
            {
                'condition': lambda a: any(p in a['open_ports'] for p in [1433, 3306, 5432, 1521]),
                'criticality': 'CRITICAL',
                'reason': 'Database Server'
            },
            # High: Web servers, email servers
            {
                'condition': lambda a: any(p in a['open_ports'] for p in [80, 443, 8080, 8443]),
                'criticality': 'HIGH',
                'reason': 'Web Server'
            },
            # Medium: Workstations
            {
                'condition': lambda a: 'Windows' in a.get('operating_system', '') and 
                                     not any(p in a['open_ports'] for p in [80, 443]),
                'criticality': 'MEDIUM',
                'reason': 'Workstation'
            }
        ]
        
        async with self.db.acquire() as conn:
            assets = await conn.fetch('SELECT * FROM assets WHERE criticality IS NULL')
            
            for asset in assets:
                asset_dict = dict(asset)
                
                for rule in classification_rules:
                    if rule['condition'](asset_dict):
                        await conn.execute(
                            'UPDATE assets SET criticality = $1 WHERE id = $2',
                            rule['criticality'], asset['id']
                        )
                        break

# Usage
async def main():
    orchestrator = AssetDiscoveryOrchestrator({
        'host': 'localhost',
        'database': 'vulnerability_management',
        'user': 'vmuser',
        'password': 'secure_password'
    })
    
    networks = [
        '192.168.1.0/24',
        '10.0.0.0/24',
        '172.16.0.0/16'
    ]
    
    await orchestrator.run_discovery(networks)

if __name__ == '__main__':
    asyncio.run(main())

Vulnerability Scanning Orchestration

Now let's integrate multiple scanners for comprehensive coverage:

#!/usr/bin/env python3
"""
Vulnerability Scanning Orchestration
Integrates OpenVAS, Nuclei, and Trivy for comprehensive scanning
"""

import asyncio
import aiohttp
import asyncpg
from datetime import datetime
import xml.etree.ElementTree as ET
import json
from typing import List, Dict, Any
import hashlib

class VulnerabilityScanner:
    def __init__(self, db_config: Dict[str, str], scanner_config: Dict[str, Any]):
        self.db_config = db_config
        self.scanner_config = scanner_config
        self.scanners = {
            'openvas': OpenVASScanner(scanner_config.get('openvas', {})),
            'nuclei': NucleiScanner(scanner_config.get('nuclei', {})),
            'trivy': TrivyScanner(scanner_config.get('trivy', {}))
        }
    
    async def setup_database(self):
        """Initialize vulnerability database"""
        self.db = await asyncpg.create_pool(**self.db_config)
        
        await self.db.execute('''
            CREATE TABLE IF NOT EXISTS vulnerabilities (
                id SERIAL PRIMARY KEY,
                asset_id INTEGER REFERENCES assets(id),
                vulnerability_id TEXT NOT NULL,
                scanner TEXT NOT NULL,
                title TEXT NOT NULL,
                severity TEXT NOT NULL,
                cvss_score NUMERIC(3,1),
                cve_ids TEXT[],
                description TEXT,
                solution TEXT,
                references TEXT[],
                detected_at TIMESTAMP DEFAULT NOW(),
                last_seen TIMESTAMP DEFAULT NOW(),
                status TEXT DEFAULT 'OPEN',
                false_positive BOOLEAN DEFAULT FALSE,
                risk_accepted BOOLEAN DEFAULT FALSE,
                UNIQUE(asset_id, vulnerability_id, scanner)
            );
            
            CREATE INDEX IF NOT EXISTS idx_vuln_severity ON vulnerabilities(severity);
            CREATE INDEX IF NOT EXISTS idx_vuln_status ON vulnerabilities(status);
            CREATE INDEX IF NOT EXISTS idx_vuln_asset ON vulnerabilities(asset_id);
            
            CREATE TABLE IF NOT EXISTS scan_history (
                id SERIAL PRIMARY KEY,
                asset_id INTEGER REFERENCES assets(id),
                scanner TEXT NOT NULL,
                scan_type TEXT,
                started_at TIMESTAMP DEFAULT NOW(),
                completed_at TIMESTAMP,
                vulnerabilities_found INTEGER DEFAULT 0,
                status TEXT,
                error_message TEXT
            );
        ''')
    
    async def scan_assets(self, asset_filter: Dict[str, Any] = None):
        """Orchestrate scanning across all assets"""
        await self.setup_database()
        
        # Get assets to scan
        query = 'SELECT * FROM assets WHERE active = TRUE'
        params = []
        
        if asset_filter:
            if 'criticality' in asset_filter:
                query += ' AND criticality = $1'
                params.append(asset_filter['criticality'])
            if 'last_scanned_before' in asset_filter:
                query += f' AND (last_scanned < ${len(params)+1} OR last_scanned IS NULL)'
                params.append(asset_filter['last_scanned_before'])
        
        async with self.db.acquire() as conn:
            assets = await conn.fetch(query, *params)
        
        # Group assets by type for appropriate scanner selection
        scan_tasks = []
        
        for asset in assets:
            asset_dict = dict(asset)
            
            # Select appropriate scanners based on asset type
            if asset_dict.get('asset_type') == 'container':
                scan_tasks.append(self.scan_with_trivy(asset_dict))
            elif any(port in asset_dict.get('open_ports', []) for port in [80, 443, 8080, 8443]):
                scan_tasks.append(self.scan_with_nuclei(asset_dict))
            
            # Always run OpenVAS for comprehensive scanning
            scan_tasks.append(self.scan_with_openvas(asset_dict))
        
        # Execute scans with controlled concurrency
        semaphore = asyncio.Semaphore(10)  # Limit concurrent scans
        
        async def bounded_scan(scan_task):
            async with semaphore:
                return await scan_task
        
        results = await asyncio.gather(
            *[bounded_scan(task) for task in scan_tasks],
            return_exceptions=True
        )
        
        # Process results
        for result in results:
            if isinstance(result, Exception):
                print(f"Scan error: {result}")
            else:
                await self.process_scan_results(result)
    
    async def scan_with_openvas(self, asset: Dict[str, Any]) -> Dict[str, Any]:
        """Scan using OpenVAS/GVM"""
        scanner = self.scanners['openvas']
        scan_id = await scanner.create_scan(asset['ip_address'])
        
        # Record scan start
        async with self.db.acquire() as conn:
            scan_history_id = await conn.fetchval('''
                INSERT INTO scan_history (asset_id, scanner, scan_type, status)
                VALUES ($1, $2, $3, $4) RETURNING id
            ''', asset['id'], 'openvas', 'full', 'running')
        
        # Wait for scan completion
        while True:
            status = await scanner.get_scan_status(scan_id)
            if status in ['completed', 'failed']:
                break
            await asyncio.sleep(30)
        
        # Get results
        vulnerabilities = await scanner.get_scan_results(scan_id)
        
        # Update scan history
        async with self.db.acquire() as conn:
            await conn.execute('''
                UPDATE scan_history 
                SET completed_at = NOW(), 
                    vulnerabilities_found = $1,
                    status = $2
                WHERE id = $3
            ''', len(vulnerabilities), status, scan_history_id)
        
        return {
            'asset': asset,
            'scanner': 'openvas',
            'vulnerabilities': vulnerabilities
        }
    
    async def scan_with_nuclei(self, asset: Dict[str, Any]) -> Dict[str, Any]:
        """Scan web applications using Nuclei"""
        scanner = self.scanners['nuclei']
        
        # Build target URLs from open ports
        targets = []
        for port in asset.get('open_ports', []):
            if port in [80, 8080]:
                targets.append(f"http://{asset['ip_address']}:{port}")
            elif port in [443, 8443]:
                targets.append(f"https://{asset['ip_address']}:{port}")
        
        vulnerabilities = []
        for target in targets:
            vulns = await scanner.scan(target)
            vulnerabilities.extend(vulns)
        
        return {
            'asset': asset,
            'scanner': 'nuclei',
            'vulnerabilities': vulnerabilities
        }
    
    async def scan_with_trivy(self, asset: Dict[str, Any]) -> Dict[str, Any]:
        """Scan containers/images using Trivy"""
        scanner = self.scanners['trivy']
        
        # For container assets, get image info
        image_name = asset.get('container_image')
        if not image_name:
            return {'asset': asset, 'scanner': 'trivy', 'vulnerabilities': []}
        
        vulnerabilities = await scanner.scan_image(image_name)
        
        return {
            'asset': asset,
            'scanner': 'trivy',
            'vulnerabilities': vulnerabilities
        }
    
    async def process_scan_results(self, scan_result: Dict[str, Any]):
        """Process and store scan results"""
        if not scan_result or 'vulnerabilities' not in scan_result:
            return
        
        asset = scan_result['asset']
        scanner = scan_result['scanner']
        vulnerabilities = scan_result['vulnerabilities']
        
        async with self.db.acquire() as conn:
            # Mark existing vulnerabilities as potentially fixed
            await conn.execute('''
                UPDATE vulnerabilities 
                SET status = 'POTENTIALLY_FIXED'
                WHERE asset_id = $1 AND scanner = $2 AND status = 'OPEN'
            ''', asset['id'], scanner)
            
            # Insert/update vulnerabilities
            for vuln in vulnerabilities:
                vuln_id = self.generate_vuln_id(asset['id'], scanner, vuln)
                
                await conn.execute('''
                    INSERT INTO vulnerabilities (
                        asset_id, vulnerability_id, scanner, title,
                        severity, cvss_score, cve_ids, description,
                        solution, references
                    ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
                    ON CONFLICT (asset_id, vulnerability_id, scanner) DO UPDATE SET
                        last_seen = NOW(),
                        status = CASE 
                            WHEN vulnerabilities.status = 'CLOSED' THEN 'REOPENED'
                            ELSE 'OPEN'
                        END,
                        severity = EXCLUDED.severity,
                        cvss_score = EXCLUDED.cvss_score
                ''', asset['id'], vuln_id, scanner, vuln.get('title'),
                    vuln.get('severity', 'UNKNOWN'), vuln.get('cvss_score'),
                    vuln.get('cve_ids', []), vuln.get('description'),
                    vuln.get('solution'), vuln.get('references', []))
            
            # Close vulnerabilities not seen in this scan
            await conn.execute('''
                UPDATE vulnerabilities 
                SET status = 'CLOSED'
                WHERE asset_id = $1 AND scanner = $2 
                AND status = 'POTENTIALLY_FIXED'
            ''', asset['id'], scanner)
            
            # Update asset last_scanned timestamp
            await conn.execute('''
                UPDATE assets SET last_scanned = NOW() WHERE id = $1
            ''', asset['id'])
    
    def generate_vuln_id(self, asset_id: int, scanner: str, vuln: Dict[str, Any]) -> str:
        """Generate unique vulnerability ID"""
        # Create a stable ID based on vulnerability characteristics
        id_components = [
            str(asset_id),
            scanner,
            vuln.get('title', ''),
            vuln.get('plugin_id', ''),
            vuln.get('template_id', '')
        ]
        
        id_string = '|'.join(filter(None, id_components))
        return hashlib.sha256(id_string.encode()).hexdigest()[:16]

class OpenVASScanner:
    def __init__(self, config: Dict[str, Any]):
        self.host = config.get('host', 'localhost')
        self.port = config.get('port', 9390)
        self.username = config.get('username')
        self.password = config.get('password')
        self.session = None
    
    async def create_scan(self, target: str) -> str:
        """Create and start OpenVAS scan"""
        # Implementation depends on GVM version
        # This is a simplified example
        
        # Create target
        target_id = await self.create_target(target)
        
        # Create task
        task_id = await self.create_task(target_id)
        
        # Start task
        await self.start_task(task_id)
        
        return task_id
    
    async def get_scan_results(self, scan_id: str) -> List[Dict[str, Any]]:
        """Get scan results from OpenVAS"""
        # Parse OpenVAS XML results
        results = []
        
        # Simplified - actual implementation would parse GMP response
        xml_results = await self.get_report(scan_id)
        
        return results

class NucleiScanner:
    def __init__(self, config: Dict[str, Any]):
        self.templates_path = config.get('templates_path', '/opt/nuclei-templates')
        self.severity_filter = config.get('severity_filter', ['critical', 'high', 'medium'])
    
    async def scan(self, target: str) -> List[Dict[str, Any]]:
        """Run Nuclei scan against target"""
        cmd = [
            'nuclei',
            '-u', target,
            '-t', self.templates_path,
            '-severity', ','.join(self.severity_filter),
            '-json',
            '-silent'
        ]
        
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        vulnerabilities = []
        for line in stdout.decode().strip().split('\n'):
            if line:
                try:
                    finding = json.loads(line)
                    vulnerabilities.append({
                        'title': finding.get('info', {}).get('name'),
                        'severity': finding.get('info', {}).get('severity', '').upper(),
                        'template_id': finding.get('template-id'),
                        'description': finding.get('info', {}).get('description'),
                        'matcher_name': finding.get('matcher-name'),
                        'matched_at': finding.get('matched-at'),
                        'cve_ids': finding.get('info', {}).get('cve', []),
                        'references': finding.get('info', {}).get('reference', [])
                    })
                except json.JSONDecodeError:
                    continue
        
        return vulnerabilities

class TrivyScanner:
    def __init__(self, config: Dict[str, Any]):
        self.severity_threshold = config.get('severity_threshold', 'MEDIUM')
    
    async def scan_image(self, image: str) -> List[Dict[str, Any]]:
        """Scan container image with Trivy"""
        cmd = [
            'trivy', 'image',
            '--format', 'json',
            '--severity', f'{self.severity_threshold},HIGH,CRITICAL',
            '--quiet',
            image
        ]
        
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        vulnerabilities = []
        if proc.returncode == 0:
            results = json.loads(stdout.decode())
            
            for target in results.get('Results', []):
                for vuln in target.get('Vulnerabilities', []):
                    vulnerabilities.append({
                        'title': vuln.get('Title', vuln.get('VulnerabilityID')),
                        'vulnerability_id': vuln.get('VulnerabilityID'),
                        'package_name': vuln.get('PkgName'),
                        'installed_version': vuln.get('InstalledVersion'),
                        'fixed_version': vuln.get('FixedVersion'),
                        'severity': vuln.get('Severity', 'UNKNOWN').upper(),
                        'cvss_score': self.extract_cvss_score(vuln),
                        'cve_ids': [vuln.get('VulnerabilityID')] if vuln.get('VulnerabilityID', '').startswith('CVE') else [],
                        'description': vuln.get('Description'),
                        'references': vuln.get('References', [])
                    })
        
        return vulnerabilities
    
    def extract_cvss_score(self, vuln: Dict[str, Any]) -> float:
        """Extract CVSS score from vulnerability data"""
        if 'CVSS' in vuln:
            for source, scores in vuln['CVSS'].items():
                if 'V3Score' in scores:
                    return scores['V3Score']
                elif 'V2Score' in scores:
                    return scores['V2Score']
        return None

Remediation Tracking and Automation

Vulnerability management isn't just about finding issues – it's about fixing them:

#!/usr/bin/env python3
"""
Automated Remediation and Patch Management
"""

import asyncio
import asyncpg
from datetime import datetime, timedelta
import ansible_runner
import aiohttp
from typing import List, Dict, Any
import yaml

class RemediationOrchestrator:
    def __init__(self, db_config: Dict[str, str], ansible_config: Dict[str, str]):
        self.db_config = db_config
        self.ansible_config = ansible_config
        self.remediation_playbooks = self.load_playbooks()
    
    def load_playbooks(self) -> Dict[str, str]:
        """Load remediation playbooks mapping"""
        with open('remediation_playbooks.yaml', 'r') as f:
            return yaml.safe_load(f)
    
    async def setup_database(self):
        """Initialize remediation tracking tables"""
        self.db = await asyncpg.create_pool(**self.db_config)
        
        await self.db.execute('''
            CREATE TABLE IF NOT EXISTS remediation_tasks (
                id SERIAL PRIMARY KEY,
                vulnerability_id INTEGER REFERENCES vulnerabilities(id),
                asset_id INTEGER REFERENCES assets(id),
                remediation_type TEXT NOT NULL,
                priority TEXT NOT NULL,
                assigned_to TEXT,
                created_at TIMESTAMP DEFAULT NOW(),
                scheduled_for TIMESTAMP,
                started_at TIMESTAMP,
                completed_at TIMESTAMP,
                status TEXT DEFAULT 'PENDING',
                automation_possible BOOLEAN DEFAULT FALSE,
                playbook_name TEXT,
                patch_id TEXT,
                notes TEXT,
                verification_status TEXT
            );
            
            CREATE INDEX IF NOT EXISTS idx_remediation_status ON remediation_tasks(status);
            CREATE INDEX IF NOT EXISTS idx_remediation_priority ON remediation_tasks(priority);
            
            CREATE TABLE IF NOT EXISTS patch_management (
                id SERIAL PRIMARY KEY,
                patch_id TEXT UNIQUE NOT NULL,
                vendor TEXT,
                product TEXT,
                version TEXT,
                severity TEXT,
                cve_ids TEXT[],
                release_date DATE,
                tested_date DATE,
                approved_date DATE,
                approval_status TEXT DEFAULT 'PENDING',
                test_results JSONB,
                rollback_plan TEXT
            );
        ''')
    
    async def create_remediation_tasks(self):
        """Create remediation tasks for new vulnerabilities"""
        async with self.db.acquire() as conn:
            # Get new vulnerabilities that need remediation
            new_vulns = await conn.fetch('''
                SELECT v.*, a.ip_address, a.hostname, a.operating_system, a.criticality as asset_criticality
                FROM vulnerabilities v
                JOIN assets a ON v.asset_id = a.id
                WHERE v.status = 'OPEN'
                AND v.false_positive = FALSE
                AND v.risk_accepted = FALSE
                AND NOT EXISTS (
                    SELECT 1 FROM remediation_tasks rt 
                    WHERE rt.vulnerability_id = v.id 
                    AND rt.status NOT IN ('FAILED', 'CANCELLED')
                )
            ''')
            
            for vuln in new_vulns:
                # Determine remediation priority
                priority = self.calculate_priority(vuln)
                
                # Check if automated remediation is possible
                automation_possible, playbook_name = self.check_automation_possibility(vuln)
                
                # Create remediation task
                await conn.execute('''
                    INSERT INTO remediation_tasks (
                        vulnerability_id, asset_id, remediation_type,
                        priority, automation_possible, playbook_name
                    ) VALUES ($1, $2, $3, $4, $5, $6)
                ''', vuln['id'], vuln['asset_id'], 
                    self.determine_remediation_type(vuln),
                    priority, automation_possible, playbook_name)
    
    def calculate_priority(self, vuln: Dict[str, Any]) -> str:
        """Calculate remediation priority based on multiple factors"""
        score = 0
        
        # Severity scoring
        severity_scores = {
            'CRITICAL': 40,
            'HIGH': 30,
            'MEDIUM': 20,
            'LOW': 10
        }
        score += severity_scores.get(vuln['severity'], 0)
        
        # Asset criticality scoring
        criticality_scores = {
            'CRITICAL': 30,
            'HIGH': 20,
            'MEDIUM': 10,
            'LOW': 5
        }
        score += criticality_scores.get(vuln['asset_criticality'], 0)
        
        # CVSS score factor
        if vuln['cvss_score']:
            score += int(vuln['cvss_score'] * 3)
        
        # Exploitability factor
        if self.check_active_exploitation(vuln['cve_ids']):
            score += 20
        
        # Determine priority
        if score >= 80:
            return 'CRITICAL'
        elif score >= 60:
            return 'HIGH'
        elif score >= 40:
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def check_active_exploitation(self, cve_ids: List[str]) -> bool:
        """Check if CVEs are being actively exploited"""
        # Check against CISA KEV, threat intel feeds, etc.
        # Simplified example
        known_exploited = ['CVE-2021-44228', 'CVE-2021-34527', 'CVE-2023-23397']
        return any(cve in known_exploited for cve in cve_ids or [])
    
    def determine_remediation_type(self, vuln: Dict[str, Any]) -> str:
        """Determine the type of remediation needed"""
        if vuln.get('solution', '').lower().startswith('upgrade'):
            return 'PATCH'
        elif 'configuration' in vuln.get('solution', '').lower():
            return 'CONFIGURATION'
        elif 'disable' in vuln.get('solution', '').lower():
            return 'DISABLE_SERVICE'
        else:
            return 'MANUAL_REVIEW'
    
    def check_automation_possibility(self, vuln: Dict[str, Any]) -> tuple:
        """Check if vulnerability can be automatically remediated"""
        # Check against playbook mapping
        vuln_key = f"{vuln['scanner']}:{vuln['vulnerability_id']}"
        
        if vuln_key in self.remediation_playbooks:
            return True, self.remediation_playbooks[vuln_key]
        
        # Check for generic remediation patterns
        if vuln['cve_ids']:
            for cve in vuln['cve_ids']:
                if cve in self.remediation_playbooks:
                    return True, self.remediation_playbooks[cve]
        
        # Check OS-specific patches
        if vuln['operating_system'] and 'patch' in vuln.get('remediation_type', '').lower():
            os_type = self.detect_os_type(vuln['operating_system'])
            if os_type in ['ubuntu', 'centos', 'windows']:
                return True, f"patch_{os_type}.yml"
        
        return False, None
    
    def detect_os_type(self, os_string: str) -> str:
        """Detect OS type from string"""
        os_string_lower = os_string.lower()
        if 'ubuntu' in os_string_lower:
            return 'ubuntu'
        elif 'centos' in os_string_lower or 'red hat' in os_string_lower:
            return 'centos'
        elif 'windows' in os_string_lower:
            return 'windows'
        return 'unknown'
    
    async def execute_automated_remediation(self):
        """Execute automated remediation tasks"""
        async with self.db.acquire() as conn:
            # Get pending automated tasks
            tasks = await conn.fetch('''
                SELECT rt.*, a.ip_address, a.hostname, v.cve_ids
                FROM remediation_tasks rt
                JOIN assets a ON rt.asset_id = a.id
                JOIN vulnerabilities v ON rt.vulnerability_id = v.id
                WHERE rt.status = 'PENDING'
                AND rt.automation_possible = TRUE
                AND (rt.scheduled_for IS NULL OR rt.scheduled_for <= NOW())
                ORDER BY rt.priority DESC, rt.created_at ASC
                LIMIT 10
            ''')
            
            for task in tasks:
                await self.run_remediation_playbook(task)
    
    async def run_remediation_playbook(self, task: Dict[str, Any]):
        """Execute Ansible playbook for remediation"""
        playbook_path = f"{self.ansible_config['playbook_dir']}/{task['playbook_name']}"
        
        # Update task status
        async with self.db.acquire() as conn:
            await conn.execute('''
                UPDATE remediation_tasks 
                SET status = 'IN_PROGRESS', started_at = NOW()
                WHERE id = $1
            ''', task['id'])
        
        # Prepare inventory
        inventory = {
            'all': {
                'hosts': {
                    task['hostname'] or task['ip_address']: {
                        'ansible_host': task['ip_address']
                    }
                }
            }
        }
        
        # Prepare extra vars
        extra_vars = {
            'target_host': task['hostname'] or task['ip_address'],
            'vulnerability_cves': task['cve_ids'],
            'remediation_task_id': task['id']
        }
        
        # Run playbook
        result = await asyncio.get_event_loop().run_in_executor(
            None,
            ansible_runner.run,
            {
                'playbook': playbook_path,
                'inventory': inventory,
                'extravars': extra_vars,
                'quiet': True
            }
        )
        
        # Update task based on result
        async with self.db.acquire() as conn:
            if result.status == 'successful':
                await conn.execute('''
                    UPDATE remediation_tasks 
                    SET status = 'COMPLETED', 
                        completed_at = NOW(),
                        notes = 'Automated remediation successful'
                    WHERE id = $1
                ''', task['id'])
                
                # Schedule verification scan
                await self.schedule_verification_scan(task)
            else:
                await conn.execute('''
                    UPDATE remediation_tasks 
                    SET status = 'FAILED',
                        notes = $2
                    WHERE id = $1
                ''', task['id'], f"Playbook failed: {result.stderr}")
    
    async def schedule_verification_scan(self, task: Dict[str, Any]):
        """Schedule a verification scan after remediation"""
        # Wait 30 minutes before verification
        verification_time = datetime.now() + timedelta(minutes=30)
        
        async with self.db.acquire() as conn:
            await conn.execute('''
                INSERT INTO scan_history (
                    asset_id, scanner, scan_type, 
                    started_at, status
                ) VALUES ($1, $2, $3, $4, $5)
            ''', task['asset_id'], 'verification', 
                f"verify_remediation_{task['id']}", 
                verification_time, 'scheduled')

# Example remediation playbooks YAML
remediation_playbooks_yaml = """
# Specific vulnerability remediations
'openvas:1.3.6.1.4.1.25623.1.0.123456':
  playbook: 'fix_ssh_weak_ciphers.yml'

'nuclei:apache-struts-rce':
  playbook: 'patch_struts.yml'

# CVE-based remediations  
'CVE-2021-44228':
  playbook: 'fix_log4j.yml'

'CVE-2021-34527':
  playbook: 'fix_printnightmare.yml'

# Generic OS patching
'patch_ubuntu':
  playbook: 'patch_ubuntu_system.yml'

'patch_centos':
  playbook: 'patch_centos_system.yml'

'patch_windows':
  playbook: 'patch_windows_system.yml'
"""

# Example Ansible playbook for remediation
ansible_playbook_example = """
---
- name: Fix SSH Weak Ciphers
  hosts: ""
  become: yes
  vars:
    strong_ciphers: "chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com"
    strong_macs: "hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com"
    strong_kex: "curve25519-sha256,curve25519-sha256@libssh.org"
    
  tasks:
    - name: Backup current SSH configuration
      copy:
        src: /etc/ssh/sshd_config
        dest: /etc/ssh/sshd_config.
        remote_src: yes

    - name: Configure strong ciphers
      lineinfile:
        path: /etc/ssh/sshd_config
        regexp: '^Ciphers'
        line: "Ciphers "
        state: present

    - name: Configure strong MACs
      lineinfile:
        path: /etc/ssh/sshd_config
        regexp: '^MACs'
        line: "MACs "
        state: present

    - name: Configure strong KEX algorithms
      lineinfile:
        path: /etc/ssh/sshd_config
        regexp: '^KexAlgorithms'
        line: "KexAlgorithms "
        state: present

    - name: Test SSH configuration
      command: sshd -t
      register: sshd_test

    - name: Restart SSH service
      systemd:
        name: sshd
        state: restarted
      when: sshd_test.rc == 0

    - name: Verify SSH is running
      wait_for:
        port: 22
        host: ""
        timeout: 30

    - name: Report success
      uri:
        url: "http://vuln-mgmt-api:8080/api/remediation//complete"
        method: POST
        body_format: json
        body:
          status: "completed"
          message: "SSH weak ciphers remediated successfully"
"""

Dashboards and Reporting

Visualization is crucial for managing vulnerabilities at scale:

#!/usr/bin/env python3
"""
Grafana Dashboard Generator for Vulnerability Management
"""

import json
import requests
from datetime import datetime, timedelta
from typing import Dict, Any, List

class VulnerabilityDashboard:
    def __init__(self, grafana_url: str, api_key: str):
        self.grafana_url = grafana_url
        self.headers = {
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        }
    
    def create_dashboard(self) -> Dict[str, Any]:
        """Create comprehensive vulnerability management dashboard"""
        dashboard = {
            "dashboard": {
                "title": "Vulnerability Management Dashboard",
                "tags": ["security", "vulnerabilities"],
                "timezone": "browser",
                "panels": [
                    self.create_summary_panel(),
                    self.create_severity_distribution_panel(),
                    self.create_trend_panel(),
                    self.create_top_vulnerabilities_panel(),
                    self.create_asset_risk_panel(),
                    self.create_remediation_metrics_panel(),
                    self.create_scanner_performance_panel(),
                    self.create_compliance_panel()
                ],
                "time": {
                    "from": "now-30d",
                    "to": "now"
                },
                "refresh": "5m"
            },
            "overwrite": True
        }
        
        return dashboard
    
    def create_summary_panel(self) -> Dict[str, Any]:
        """Summary statistics panel"""
        return {
            "gridPos": {"h": 8, "w": 24, "x": 0, "y": 0},
            "type": "stat",
            "title": "Vulnerability Summary",
            "targets": [
                {
                    "rawSql": """
                        SELECT COUNT(*) as "Total Open Vulnerabilities"
                        FROM vulnerabilities
                        WHERE status = 'OPEN' AND NOT false_positive
                    """,
                    "format": "table"
                },
                {
                    "rawSql": """
                        SELECT COUNT(*) as "Critical"
                        FROM vulnerabilities
                        WHERE status = 'OPEN' AND severity = 'CRITICAL'
                    """,
                    "format": "table"
                },
                {
                    "rawSql": """
                        SELECT COUNT(*) as "High"
                        FROM vulnerabilities
                        WHERE status = 'OPEN' AND severity = 'HIGH'
                    """,
                    "format": "table"
                },
                {
                    "rawSql": """
                        SELECT COUNT(DISTINCT asset_id) as "Affected Assets"
                        FROM vulnerabilities
                        WHERE status = 'OPEN'
                    """,
                    "format": "table"
                }
            ],
            "options": {
                "colorMode": "background",
                "graphMode": "none",
                "orientation": "horizontal",
                "reduceOptions": {
                    "calcs": ["lastNotNull"]
                }
            }
        }
    
    def create_severity_distribution_panel(self) -> Dict[str, Any]:
        """Severity distribution pie chart"""
        return {
            "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
            "type": "piechart",
            "title": "Vulnerability Severity Distribution",
            "targets": [{
                "rawSql": """
                    SELECT severity, COUNT(*) as count
                    FROM vulnerabilities
                    WHERE status = 'OPEN' AND NOT false_positive
                    GROUP BY severity
                """,
                "format": "table"
            }],
            "options": {
                "pieType": "donut",
                "tooltipDisplayMode": "multi",
                "legend": {
                    "displayMode": "table",
                    "placement": "right",
                    "values": ["value", "percent"]
                }
            },
            "fieldConfig": {
                "defaults": {
                    "mappings": [],
                    "thresholds": {
                        "mode": "absolute",
                        "steps": [
                            {"color": "green", "value": null},
                            {"color": "yellow", "value": 10},
                            {"color": "orange", "value": 50},
                            {"color": "red", "value": 100}
                        ]
                    }
                }
            }
        }
    
    def create_trend_panel(self) -> Dict[str, Any]:
        """Vulnerability trend over time"""
        return {
            "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
            "type": "timeseries",
            "title": "Vulnerability Trend",
            "targets": [{
                "rawSql": """
                    SELECT 
                        date_trunc('day', detected_at) as time,
                        severity,
                        COUNT(*) as count
                    FROM vulnerabilities
                    WHERE detected_at > NOW() - INTERVAL '30 days'
                    GROUP BY time, severity
                    ORDER BY time
                """,
                "format": "time_series"
            }],
            "fieldConfig": {
                "defaults": {
                    "custom": {
                        "lineInterpolation": "smooth",
                        "fillOpacity": 10,
                        "stacking": {
                            "mode": "normal"
                        }
                    }
                }
            }
        }
    
    def create_top_vulnerabilities_panel(self) -> Dict[str, Any]:
        """Table of top vulnerabilities"""
        return {
            "gridPos": {"h": 10, "w": 24, "x": 0, "y": 16},
            "type": "table",
            "title": "Top Vulnerabilities by Risk Score",
            "targets": [{
                "rawSql": """
                    SELECT 
                        v.title,
                        v.severity,
                        v.cvss_score,
                        COUNT(DISTINCT v.asset_id) as affected_assets,
                        STRING_AGG(DISTINCT a.criticality, ', ') as asset_criticalities,
                        v.cve_ids[1] as primary_cve,
                        CASE 
                            WHEN EXISTS (
                                SELECT 1 FROM remediation_tasks rt 
                                WHERE rt.vulnerability_id = v.id 
                                AND rt.status = 'COMPLETED'
                            ) THEN 'Remediated'
                            WHEN EXISTS (
                                SELECT 1 FROM remediation_tasks rt 
                                WHERE rt.vulnerability_id = v.id 
                                AND rt.status IN ('PENDING', 'IN_PROGRESS')
                            ) THEN 'In Progress'
                            ELSE 'Open'
                        END as remediation_status
                    FROM vulnerabilities v
                    JOIN assets a ON v.asset_id = a.id
                    WHERE v.status = 'OPEN' AND NOT v.false_positive
                    GROUP BY v.title, v.severity, v.cvss_score, v.cve_ids, v.id
                    ORDER BY 
                        CASE v.severity 
                            WHEN 'CRITICAL' THEN 1 
                            WHEN 'HIGH' THEN 2 
                            WHEN 'MEDIUM' THEN 3 
                            ELSE 4 
                        END,
                        v.cvss_score DESC NULLS LAST,
                        affected_assets DESC
                    LIMIT 20
                """,
                "format": "table"
            }],
            "options": {
                "showHeader": true
            },
            "fieldConfig": {
                "overrides": [
                    {
                        "matcher": {"id": "byName", "options": "severity"},
                        "properties": [{
                            "id": "custom.displayMode",
                            "value": "color-background"
                        }]
                    }
                ]
            }
        }
    
    def create_compliance_panel(self) -> Dict[str, Any]:
        """Compliance metrics panel"""
        return {
            "gridPos": {"h": 8, "w": 12, "x": 0, "y": 26},
            "type": "gauge",
            "title": "Compliance Metrics",
            "targets": [
                {
                    "rawSql": """
                        SELECT 
                            (COUNT(*) FILTER (WHERE 
                                NOT EXISTS (
                                    SELECT 1 FROM vulnerabilities v 
                                    WHERE v.asset_id = a.id 
                                    AND v.status = 'OPEN' 
                                    AND v.severity IN ('CRITICAL', 'HIGH')
                                )
                            ) * 100.0 / COUNT(*))::INTEGER as "Critical Asset Compliance %"
                        FROM assets a
                        WHERE a.criticality = 'CRITICAL' AND a.active = TRUE
                    """,
                    "format": "table"
                }
            ],
            "options": {
                "reduceOptions": {
                    "calcs": ["lastNotNull"]
                },
                "showThresholdLabels": true,
                "showThresholdMarkers": true
            },
            "fieldConfig": {
                "defaults": {
                    "thresholds": {
                        "mode": "absolute",
                        "steps": [
                            {"color": "red", "value": null},
                            {"color": "yellow", "value": 80},
                            {"color": "green", "value": 95}
                        ]
                    },
                    "unit": "percent",
                    "min": 0,
                    "max": 100
                }
            }
        }
    
    def deploy_dashboard(self):
        """Deploy dashboard to Grafana"""
        dashboard = self.create_dashboard()
        
        response = requests.post(
            f"{self.grafana_url}/api/dashboards/db",
            headers=self.headers,
            json=dashboard
        )
        
        if response.status_code == 200:
            print(f"Dashboard created successfully: {response.json()}")
        else:
            print(f"Failed to create dashboard: {response.text}")

# Usage
dashboard_creator = VulnerabilityDashboard(
    grafana_url="http://localhost:3000",
    api_key="your-grafana-api-key"
)
dashboard_creator.deploy_dashboard()

Integration and Automation

The key to scaling vulnerability management is automation:

# Apache Airflow DAG for automated vulnerability management
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'security-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'vulnerability_management',
    default_args=default_args,
    description='Automated vulnerability management workflow',
    schedule_interval='0 */6 * * *',  # Every 6 hours
    catchup=False
)

# Asset discovery
asset_discovery = PythonOperator(
    task_id='asset_discovery',
    python_callable=run_asset_discovery,
    dag=dag
)

# Vulnerability scanning
vulnerability_scan = PythonOperator(
    task_id='vulnerability_scanning',
    python_callable=run_vulnerability_scans,
    dag=dag
)

# Create remediation tasks
create_remediations = PythonOperator(
    task_id='create_remediation_tasks',
    python_callable=create_remediation_tasks,
    dag=dag
)

# Execute automated remediations
auto_remediate = PythonOperator(
    task_id='automated_remediation',
    python_callable=execute_automated_remediation,
    dag=dag
)

# Generate reports
generate_reports = PythonOperator(
    task_id='generate_reports',
    python_callable=generate_vulnerability_reports,
    dag=dag
)

# Define workflow
asset_discovery >> vulnerability_scan >> create_remediations >> auto_remediate >> generate_reports

Lessons Learned

After years of building and running vulnerability management programs:

1. Start with Asset Management

You can't secure what you don't know exists. Invest heavily in discovery.

2. Automate Everything Possible

Manual processes don't scale. Automate scanning, ticketing, and remediation.

3. Context is King

A critical vulnerability on a development server != critical vulnerability on production domain controller.

4. Measure What Matters

  • Mean Time to Detect (MTTD)
  • Mean Time to Remediate (MTTR)
  • Vulnerability aging
  • Coverage percentage

5. Integration is Essential

Vulnerability management tools must integrate with:

  • CMDB/Asset Management
  • Ticketing systems
  • CI/CD pipelines
  • SIEM/SOAR platforms

Conclusion

Building an effective vulnerability management program with open source tools is not just possible – it can be superior to commercial solutions. The key is thoughtful integration, automation, and continuous improvement.

Start small with asset discovery and basic scanning, then gradually add automation, integration, and advanced features. The framework presented here has successfully managed vulnerabilities across thousands of systems in production environments.

Remember: vulnerability management is a program, not a project. Build it to be sustainable, scalable, and automated from day one.


Questions about scaling vulnerability management? Want to share your open source security stack? Let's connect and improve our collective security posture!

Related Posts