Vulnerability Management at Scale with Open Source Tools
Build an enterprise-grade vulnerability management program using only open source tools. From scanning to remediation tracking, here's how to do it right
Reading time: 11 minutes
The Reality of Vulnerability Management
Managing vulnerabilities across thousands of systems isn't just about running scanners – it's about building a sustainable process that scales. After implementing vulnerability management programs for organizations ranging from startups to federal agencies, I've learned that open source tools can match or exceed commercial solutions when properly integrated.
This guide shares a production-tested approach to building a comprehensive vulnerability management program using only open source tools.
The Complete Vulnerability Management Stack
Here's the stack I've successfully deployed in production:
- Discovery: Nmap, Masscan, Rumble
- Vulnerability Scanning: OpenVAS/GVM, Nuclei, Wazuh
- Container Scanning: Trivy, Grype, Clair
- Web Application: OWASP ZAP, Nikto, SQLMap
- Orchestration: Apache Airflow, n8n
- Data Management: PostgreSQL, Elasticsearch
- Visualization: Grafana, Kibana
- Ticketing: GLPI, Request Tracker
Building the Foundation
Asset Discovery and Inventory
You can't protect what you don't know exists. Start with comprehensive asset discovery:
#!/usr/bin/env python3
"""
Automated Asset Discovery and Inventory System
Combines multiple discovery methods for comprehensive coverage
"""
import asyncio
import ipaddress
import json
from datetime import datetime
import asyncpg
import aiohttp
from typing import List, Dict, Any
import subprocess
import xml.etree.ElementTree as ET
class AssetDiscoveryOrchestrator:
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.discovered_assets = []
async def setup_database(self):
"""Initialize asset inventory database"""
self.db = await asyncpg.create_pool(**self.db_config)
await self.db.execute('''
CREATE TABLE IF NOT EXISTS assets (
id SERIAL PRIMARY KEY,
ip_address INET UNIQUE NOT NULL,
hostname TEXT,
mac_address MACADDR,
operating_system TEXT,
open_ports INTEGER[],
services JSONB,
first_seen TIMESTAMP DEFAULT NOW(),
last_seen TIMESTAMP DEFAULT NOW(),
last_scanned TIMESTAMP,
asset_type TEXT,
business_unit TEXT,
criticality TEXT,
tags TEXT[],
active BOOLEAN DEFAULT TRUE
);
CREATE INDEX IF NOT EXISTS idx_assets_ip ON assets(ip_address);
CREATE INDEX IF NOT EXISTS idx_assets_criticality ON assets(criticality);
CREATE INDEX IF NOT EXISTS idx_assets_last_seen ON assets(last_seen);
''')
async def discover_with_nmap(self, network: str) -> List[Dict[str, Any]]:
"""Fast network discovery using Nmap"""
print(f"Starting Nmap discovery for {network}")
# Stage 1: Fast ping sweep
ping_cmd = [
'nmap', '-sn', '-n', '--max-retries', '1',
'--max-rtt-timeout', '500ms', network,
'-oX', '-'
]
proc = await asyncio.create_subprocess_exec(
*ping_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"Nmap error: {stderr.decode()}")
return []
# Parse XML output
root = ET.fromstring(stdout.decode())
discovered = []
for host in root.findall('.//host'):
if host.find(".//status[@state='up']") is not None:
addr_elem = host.find(".//address[@addrtype='ipv4']")
if addr_elem is not None:
ip = addr_elem.get('addr')
# Stage 2: Detailed scan of live hosts
host_info = await self.scan_host_details(ip)
discovered.append(host_info)
return discovered
async def scan_host_details(self, ip: str) -> Dict[str, Any]:
"""Detailed scan of individual host"""
scan_cmd = [
'nmap', '-sV', '-O', '--osscan-guess',
'-p-', '--max-retries', '2',
'--script', 'default,vuln',
ip, '-oX', '-'
]
proc = await asyncio.create_subprocess_exec(
*scan_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
# Parse results
host_info = {
'ip_address': ip,
'hostname': None,
'operating_system': None,
'open_ports': [],
'services': {},
'vulnerabilities': []
}
if proc.returncode == 0:
root = ET.fromstring(stdout.decode())
host = root.find('.//host')
# Extract hostname
hostname_elem = host.find(".//hostname")
if hostname_elem is not None:
host_info['hostname'] = hostname_elem.get('name')
# Extract OS
os_match = host.find(".//osmatch")
if os_match is not None:
host_info['operating_system'] = os_match.get('name')
# Extract ports and services
for port in host.findall(".//port"):
port_id = int(port.get('portid'))
state = port.find('state').get('state')
if state == 'open':
host_info['open_ports'].append(port_id)
service = port.find('service')
if service is not None:
service_info = {
'name': service.get('name', 'unknown'),
'product': service.get('product', ''),
'version': service.get('version', ''),
'extrainfo': service.get('extrainfo', '')
}
host_info['services'][str(port_id)] = service_info
# Extract vulnerabilities from scripts
for script in host.findall(".//script[@id]"):
if 'vuln' in script.get('id'):
host_info['vulnerabilities'].append({
'script': script.get('id'),
'output': script.get('output', '')
})
return host_info
async def discover_with_masscan(self, network: str, ports: str = "1-65535") -> List[Dict[str, Any]]:
"""Ultra-fast port discovery using Masscan"""
print(f"Starting Masscan discovery for {network}")
scan_cmd = [
'masscan', network, '-p', ports,
'--rate', '10000', # Adjust based on network capacity
'--open-only', '-oJ', '-'
]
proc = await asyncio.create_subprocess_exec(
*scan_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
discovered = []
for line in stdout.decode().strip().split('\n'):
if line.strip() and line.startswith('{'):
try:
data = json.loads(line.rstrip(','))
if 'ports' in data:
for port_info in data['ports']:
discovered.append({
'ip_address': data['ip'],
'port': port_info['port'],
'protocol': port_info['proto'],
'status': port_info['status']
})
except json.JSONDecodeError:
continue
return discovered
async def passive_discovery(self) -> List[Dict[str, Any]]:
"""Passive asset discovery from various sources"""
discovered = []
# DNS zone transfers (if allowed)
# DHCP logs parsing
# ARP cache analysis
# NetFlow/sFlow data
# Cloud API inventory (AWS, Azure, GCP)
# Example: AWS EC2 discovery
try:
import boto3
ec2 = boto3.client('ec2')
response = ec2.describe_instances()
for reservation in response['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'running':
discovered.append({
'ip_address': instance.get('PrivateIpAddress'),
'hostname': instance.get('PrivateDnsName'),
'asset_type': 'aws_ec2',
'instance_id': instance['InstanceId'],
'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
})
except Exception as e:
print(f"AWS discovery error: {e}")
return discovered
async def correlate_and_store(self, discoveries: List[List[Dict[str, Any]]]):
"""Correlate discoveries from multiple sources and store"""
# Flatten all discoveries
all_assets = {}
for discovery_set in discoveries:
for asset in discovery_set:
ip = asset.get('ip_address')
if ip:
if ip not in all_assets:
all_assets[ip] = asset
else:
# Merge information
all_assets[ip].update(asset)
# Store in database
async with self.db.acquire() as conn:
for ip, asset in all_assets.items():
await conn.execute('''
INSERT INTO assets (
ip_address, hostname, operating_system,
open_ports, services, asset_type, tags
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (ip_address) DO UPDATE SET
hostname = EXCLUDED.hostname,
operating_system = EXCLUDED.operating_system,
open_ports = EXCLUDED.open_ports,
services = EXCLUDED.services,
last_seen = NOW()
''', ip, asset.get('hostname'), asset.get('operating_system'),
asset.get('open_ports', []), json.dumps(asset.get('services', {})),
asset.get('asset_type'), asset.get('tags', []))
async def run_discovery(self, networks: List[str]):
"""Orchestrate discovery across multiple networks"""
await self.setup_database()
discovery_tasks = []
for network in networks:
# Run multiple discovery methods in parallel
discovery_tasks.extend([
self.discover_with_nmap(network),
self.discover_with_masscan(network, "22,80,443,3389,8080,8443"),
self.passive_discovery()
])
discoveries = await asyncio.gather(*discovery_tasks)
await self.correlate_and_store(discoveries)
# Update asset criticality based on rules
await self.classify_assets()
async def classify_assets(self):
"""Classify assets by criticality based on characteristics"""
classification_rules = [
# Critical: Domain controllers, databases, key services
{
'condition': lambda a: any(p in a['open_ports'] for p in [88, 389, 636, 3268]),
'criticality': 'CRITICAL',
'reason': 'Domain Controller'
},
{
'condition': lambda a: any(p in a['open_ports'] for p in [1433, 3306, 5432, 1521]),
'criticality': 'CRITICAL',
'reason': 'Database Server'
},
# High: Web servers, email servers
{
'condition': lambda a: any(p in a['open_ports'] for p in [80, 443, 8080, 8443]),
'criticality': 'HIGH',
'reason': 'Web Server'
},
# Medium: Workstations
{
'condition': lambda a: 'Windows' in a.get('operating_system', '') and
not any(p in a['open_ports'] for p in [80, 443]),
'criticality': 'MEDIUM',
'reason': 'Workstation'
}
]
async with self.db.acquire() as conn:
assets = await conn.fetch('SELECT * FROM assets WHERE criticality IS NULL')
for asset in assets:
asset_dict = dict(asset)
for rule in classification_rules:
if rule['condition'](asset_dict):
await conn.execute(
'UPDATE assets SET criticality = $1 WHERE id = $2',
rule['criticality'], asset['id']
)
break
# Usage
async def main():
orchestrator = AssetDiscoveryOrchestrator({
'host': 'localhost',
'database': 'vulnerability_management',
'user': 'vmuser',
'password': 'secure_password'
})
networks = [
'192.168.1.0/24',
'10.0.0.0/24',
'172.16.0.0/16'
]
await orchestrator.run_discovery(networks)
if __name__ == '__main__':
asyncio.run(main())
Vulnerability Scanning Orchestration
Now let's integrate multiple scanners for comprehensive coverage:
#!/usr/bin/env python3
"""
Vulnerability Scanning Orchestration
Integrates OpenVAS, Nuclei, and Trivy for comprehensive scanning
"""
import asyncio
import aiohttp
import asyncpg
from datetime import datetime
import xml.etree.ElementTree as ET
import json
from typing import List, Dict, Any
import hashlib
class VulnerabilityScanner:
def __init__(self, db_config: Dict[str, str], scanner_config: Dict[str, Any]):
self.db_config = db_config
self.scanner_config = scanner_config
self.scanners = {
'openvas': OpenVASScanner(scanner_config.get('openvas', {})),
'nuclei': NucleiScanner(scanner_config.get('nuclei', {})),
'trivy': TrivyScanner(scanner_config.get('trivy', {}))
}
async def setup_database(self):
"""Initialize vulnerability database"""
self.db = await asyncpg.create_pool(**self.db_config)
await self.db.execute('''
CREATE TABLE IF NOT EXISTS vulnerabilities (
id SERIAL PRIMARY KEY,
asset_id INTEGER REFERENCES assets(id),
vulnerability_id TEXT NOT NULL,
scanner TEXT NOT NULL,
title TEXT NOT NULL,
severity TEXT NOT NULL,
cvss_score NUMERIC(3,1),
cve_ids TEXT[],
description TEXT,
solution TEXT,
references TEXT[],
detected_at TIMESTAMP DEFAULT NOW(),
last_seen TIMESTAMP DEFAULT NOW(),
status TEXT DEFAULT 'OPEN',
false_positive BOOLEAN DEFAULT FALSE,
risk_accepted BOOLEAN DEFAULT FALSE,
UNIQUE(asset_id, vulnerability_id, scanner)
);
CREATE INDEX IF NOT EXISTS idx_vuln_severity ON vulnerabilities(severity);
CREATE INDEX IF NOT EXISTS idx_vuln_status ON vulnerabilities(status);
CREATE INDEX IF NOT EXISTS idx_vuln_asset ON vulnerabilities(asset_id);
CREATE TABLE IF NOT EXISTS scan_history (
id SERIAL PRIMARY KEY,
asset_id INTEGER REFERENCES assets(id),
scanner TEXT NOT NULL,
scan_type TEXT,
started_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
vulnerabilities_found INTEGER DEFAULT 0,
status TEXT,
error_message TEXT
);
''')
async def scan_assets(self, asset_filter: Dict[str, Any] = None):
"""Orchestrate scanning across all assets"""
await self.setup_database()
# Get assets to scan
query = 'SELECT * FROM assets WHERE active = TRUE'
params = []
if asset_filter:
if 'criticality' in asset_filter:
query += ' AND criticality = $1'
params.append(asset_filter['criticality'])
if 'last_scanned_before' in asset_filter:
query += f' AND (last_scanned < ${len(params)+1} OR last_scanned IS NULL)'
params.append(asset_filter['last_scanned_before'])
async with self.db.acquire() as conn:
assets = await conn.fetch(query, *params)
# Group assets by type for appropriate scanner selection
scan_tasks = []
for asset in assets:
asset_dict = dict(asset)
# Select appropriate scanners based on asset type
if asset_dict.get('asset_type') == 'container':
scan_tasks.append(self.scan_with_trivy(asset_dict))
elif any(port in asset_dict.get('open_ports', []) for port in [80, 443, 8080, 8443]):
scan_tasks.append(self.scan_with_nuclei(asset_dict))
# Always run OpenVAS for comprehensive scanning
scan_tasks.append(self.scan_with_openvas(asset_dict))
# Execute scans with controlled concurrency
semaphore = asyncio.Semaphore(10) # Limit concurrent scans
async def bounded_scan(scan_task):
async with semaphore:
return await scan_task
results = await asyncio.gather(
*[bounded_scan(task) for task in scan_tasks],
return_exceptions=True
)
# Process results
for result in results:
if isinstance(result, Exception):
print(f"Scan error: {result}")
else:
await self.process_scan_results(result)
async def scan_with_openvas(self, asset: Dict[str, Any]) -> Dict[str, Any]:
"""Scan using OpenVAS/GVM"""
scanner = self.scanners['openvas']
scan_id = await scanner.create_scan(asset['ip_address'])
# Record scan start
async with self.db.acquire() as conn:
scan_history_id = await conn.fetchval('''
INSERT INTO scan_history (asset_id, scanner, scan_type, status)
VALUES ($1, $2, $3, $4) RETURNING id
''', asset['id'], 'openvas', 'full', 'running')
# Wait for scan completion
while True:
status = await scanner.get_scan_status(scan_id)
if status in ['completed', 'failed']:
break
await asyncio.sleep(30)
# Get results
vulnerabilities = await scanner.get_scan_results(scan_id)
# Update scan history
async with self.db.acquire() as conn:
await conn.execute('''
UPDATE scan_history
SET completed_at = NOW(),
vulnerabilities_found = $1,
status = $2
WHERE id = $3
''', len(vulnerabilities), status, scan_history_id)
return {
'asset': asset,
'scanner': 'openvas',
'vulnerabilities': vulnerabilities
}
async def scan_with_nuclei(self, asset: Dict[str, Any]) -> Dict[str, Any]:
"""Scan web applications using Nuclei"""
scanner = self.scanners['nuclei']
# Build target URLs from open ports
targets = []
for port in asset.get('open_ports', []):
if port in [80, 8080]:
targets.append(f"http://{asset['ip_address']}:{port}")
elif port in [443, 8443]:
targets.append(f"https://{asset['ip_address']}:{port}")
vulnerabilities = []
for target in targets:
vulns = await scanner.scan(target)
vulnerabilities.extend(vulns)
return {
'asset': asset,
'scanner': 'nuclei',
'vulnerabilities': vulnerabilities
}
async def scan_with_trivy(self, asset: Dict[str, Any]) -> Dict[str, Any]:
"""Scan containers/images using Trivy"""
scanner = self.scanners['trivy']
# For container assets, get image info
image_name = asset.get('container_image')
if not image_name:
return {'asset': asset, 'scanner': 'trivy', 'vulnerabilities': []}
vulnerabilities = await scanner.scan_image(image_name)
return {
'asset': asset,
'scanner': 'trivy',
'vulnerabilities': vulnerabilities
}
async def process_scan_results(self, scan_result: Dict[str, Any]):
"""Process and store scan results"""
if not scan_result or 'vulnerabilities' not in scan_result:
return
asset = scan_result['asset']
scanner = scan_result['scanner']
vulnerabilities = scan_result['vulnerabilities']
async with self.db.acquire() as conn:
# Mark existing vulnerabilities as potentially fixed
await conn.execute('''
UPDATE vulnerabilities
SET status = 'POTENTIALLY_FIXED'
WHERE asset_id = $1 AND scanner = $2 AND status = 'OPEN'
''', asset['id'], scanner)
# Insert/update vulnerabilities
for vuln in vulnerabilities:
vuln_id = self.generate_vuln_id(asset['id'], scanner, vuln)
await conn.execute('''
INSERT INTO vulnerabilities (
asset_id, vulnerability_id, scanner, title,
severity, cvss_score, cve_ids, description,
solution, references
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (asset_id, vulnerability_id, scanner) DO UPDATE SET
last_seen = NOW(),
status = CASE
WHEN vulnerabilities.status = 'CLOSED' THEN 'REOPENED'
ELSE 'OPEN'
END,
severity = EXCLUDED.severity,
cvss_score = EXCLUDED.cvss_score
''', asset['id'], vuln_id, scanner, vuln.get('title'),
vuln.get('severity', 'UNKNOWN'), vuln.get('cvss_score'),
vuln.get('cve_ids', []), vuln.get('description'),
vuln.get('solution'), vuln.get('references', []))
# Close vulnerabilities not seen in this scan
await conn.execute('''
UPDATE vulnerabilities
SET status = 'CLOSED'
WHERE asset_id = $1 AND scanner = $2
AND status = 'POTENTIALLY_FIXED'
''', asset['id'], scanner)
# Update asset last_scanned timestamp
await conn.execute('''
UPDATE assets SET last_scanned = NOW() WHERE id = $1
''', asset['id'])
def generate_vuln_id(self, asset_id: int, scanner: str, vuln: Dict[str, Any]) -> str:
"""Generate unique vulnerability ID"""
# Create a stable ID based on vulnerability characteristics
id_components = [
str(asset_id),
scanner,
vuln.get('title', ''),
vuln.get('plugin_id', ''),
vuln.get('template_id', '')
]
id_string = '|'.join(filter(None, id_components))
return hashlib.sha256(id_string.encode()).hexdigest()[:16]
class OpenVASScanner:
def __init__(self, config: Dict[str, Any]):
self.host = config.get('host', 'localhost')
self.port = config.get('port', 9390)
self.username = config.get('username')
self.password = config.get('password')
self.session = None
async def create_scan(self, target: str) -> str:
"""Create and start OpenVAS scan"""
# Implementation depends on GVM version
# This is a simplified example
# Create target
target_id = await self.create_target(target)
# Create task
task_id = await self.create_task(target_id)
# Start task
await self.start_task(task_id)
return task_id
async def get_scan_results(self, scan_id: str) -> List[Dict[str, Any]]:
"""Get scan results from OpenVAS"""
# Parse OpenVAS XML results
results = []
# Simplified - actual implementation would parse GMP response
xml_results = await self.get_report(scan_id)
return results
class NucleiScanner:
def __init__(self, config: Dict[str, Any]):
self.templates_path = config.get('templates_path', '/opt/nuclei-templates')
self.severity_filter = config.get('severity_filter', ['critical', 'high', 'medium'])
async def scan(self, target: str) -> List[Dict[str, Any]]:
"""Run Nuclei scan against target"""
cmd = [
'nuclei',
'-u', target,
'-t', self.templates_path,
'-severity', ','.join(self.severity_filter),
'-json',
'-silent'
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
vulnerabilities = []
for line in stdout.decode().strip().split('\n'):
if line:
try:
finding = json.loads(line)
vulnerabilities.append({
'title': finding.get('info', {}).get('name'),
'severity': finding.get('info', {}).get('severity', '').upper(),
'template_id': finding.get('template-id'),
'description': finding.get('info', {}).get('description'),
'matcher_name': finding.get('matcher-name'),
'matched_at': finding.get('matched-at'),
'cve_ids': finding.get('info', {}).get('cve', []),
'references': finding.get('info', {}).get('reference', [])
})
except json.JSONDecodeError:
continue
return vulnerabilities
class TrivyScanner:
def __init__(self, config: Dict[str, Any]):
self.severity_threshold = config.get('severity_threshold', 'MEDIUM')
async def scan_image(self, image: str) -> List[Dict[str, Any]]:
"""Scan container image with Trivy"""
cmd = [
'trivy', 'image',
'--format', 'json',
'--severity', f'{self.severity_threshold},HIGH,CRITICAL',
'--quiet',
image
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
vulnerabilities = []
if proc.returncode == 0:
results = json.loads(stdout.decode())
for target in results.get('Results', []):
for vuln in target.get('Vulnerabilities', []):
vulnerabilities.append({
'title': vuln.get('Title', vuln.get('VulnerabilityID')),
'vulnerability_id': vuln.get('VulnerabilityID'),
'package_name': vuln.get('PkgName'),
'installed_version': vuln.get('InstalledVersion'),
'fixed_version': vuln.get('FixedVersion'),
'severity': vuln.get('Severity', 'UNKNOWN').upper(),
'cvss_score': self.extract_cvss_score(vuln),
'cve_ids': [vuln.get('VulnerabilityID')] if vuln.get('VulnerabilityID', '').startswith('CVE') else [],
'description': vuln.get('Description'),
'references': vuln.get('References', [])
})
return vulnerabilities
def extract_cvss_score(self, vuln: Dict[str, Any]) -> float:
"""Extract CVSS score from vulnerability data"""
if 'CVSS' in vuln:
for source, scores in vuln['CVSS'].items():
if 'V3Score' in scores:
return scores['V3Score']
elif 'V2Score' in scores:
return scores['V2Score']
return None
Remediation Tracking and Automation
Vulnerability management isn't just about finding issues – it's about fixing them:
#!/usr/bin/env python3
"""
Automated Remediation and Patch Management
"""
import asyncio
import asyncpg
from datetime import datetime, timedelta
import ansible_runner
import aiohttp
from typing import List, Dict, Any
import yaml
class RemediationOrchestrator:
def __init__(self, db_config: Dict[str, str], ansible_config: Dict[str, str]):
self.db_config = db_config
self.ansible_config = ansible_config
self.remediation_playbooks = self.load_playbooks()
def load_playbooks(self) -> Dict[str, str]:
"""Load remediation playbooks mapping"""
with open('remediation_playbooks.yaml', 'r') as f:
return yaml.safe_load(f)
async def setup_database(self):
"""Initialize remediation tracking tables"""
self.db = await asyncpg.create_pool(**self.db_config)
await self.db.execute('''
CREATE TABLE IF NOT EXISTS remediation_tasks (
id SERIAL PRIMARY KEY,
vulnerability_id INTEGER REFERENCES vulnerabilities(id),
asset_id INTEGER REFERENCES assets(id),
remediation_type TEXT NOT NULL,
priority TEXT NOT NULL,
assigned_to TEXT,
created_at TIMESTAMP DEFAULT NOW(),
scheduled_for TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
status TEXT DEFAULT 'PENDING',
automation_possible BOOLEAN DEFAULT FALSE,
playbook_name TEXT,
patch_id TEXT,
notes TEXT,
verification_status TEXT
);
CREATE INDEX IF NOT EXISTS idx_remediation_status ON remediation_tasks(status);
CREATE INDEX IF NOT EXISTS idx_remediation_priority ON remediation_tasks(priority);
CREATE TABLE IF NOT EXISTS patch_management (
id SERIAL PRIMARY KEY,
patch_id TEXT UNIQUE NOT NULL,
vendor TEXT,
product TEXT,
version TEXT,
severity TEXT,
cve_ids TEXT[],
release_date DATE,
tested_date DATE,
approved_date DATE,
approval_status TEXT DEFAULT 'PENDING',
test_results JSONB,
rollback_plan TEXT
);
''')
async def create_remediation_tasks(self):
"""Create remediation tasks for new vulnerabilities"""
async with self.db.acquire() as conn:
# Get new vulnerabilities that need remediation
new_vulns = await conn.fetch('''
SELECT v.*, a.ip_address, a.hostname, a.operating_system, a.criticality as asset_criticality
FROM vulnerabilities v
JOIN assets a ON v.asset_id = a.id
WHERE v.status = 'OPEN'
AND v.false_positive = FALSE
AND v.risk_accepted = FALSE
AND NOT EXISTS (
SELECT 1 FROM remediation_tasks rt
WHERE rt.vulnerability_id = v.id
AND rt.status NOT IN ('FAILED', 'CANCELLED')
)
''')
for vuln in new_vulns:
# Determine remediation priority
priority = self.calculate_priority(vuln)
# Check if automated remediation is possible
automation_possible, playbook_name = self.check_automation_possibility(vuln)
# Create remediation task
await conn.execute('''
INSERT INTO remediation_tasks (
vulnerability_id, asset_id, remediation_type,
priority, automation_possible, playbook_name
) VALUES ($1, $2, $3, $4, $5, $6)
''', vuln['id'], vuln['asset_id'],
self.determine_remediation_type(vuln),
priority, automation_possible, playbook_name)
def calculate_priority(self, vuln: Dict[str, Any]) -> str:
"""Calculate remediation priority based on multiple factors"""
score = 0
# Severity scoring
severity_scores = {
'CRITICAL': 40,
'HIGH': 30,
'MEDIUM': 20,
'LOW': 10
}
score += severity_scores.get(vuln['severity'], 0)
# Asset criticality scoring
criticality_scores = {
'CRITICAL': 30,
'HIGH': 20,
'MEDIUM': 10,
'LOW': 5
}
score += criticality_scores.get(vuln['asset_criticality'], 0)
# CVSS score factor
if vuln['cvss_score']:
score += int(vuln['cvss_score'] * 3)
# Exploitability factor
if self.check_active_exploitation(vuln['cve_ids']):
score += 20
# Determine priority
if score >= 80:
return 'CRITICAL'
elif score >= 60:
return 'HIGH'
elif score >= 40:
return 'MEDIUM'
else:
return 'LOW'
def check_active_exploitation(self, cve_ids: List[str]) -> bool:
"""Check if CVEs are being actively exploited"""
# Check against CISA KEV, threat intel feeds, etc.
# Simplified example
known_exploited = ['CVE-2021-44228', 'CVE-2021-34527', 'CVE-2023-23397']
return any(cve in known_exploited for cve in cve_ids or [])
def determine_remediation_type(self, vuln: Dict[str, Any]) -> str:
"""Determine the type of remediation needed"""
if vuln.get('solution', '').lower().startswith('upgrade'):
return 'PATCH'
elif 'configuration' in vuln.get('solution', '').lower():
return 'CONFIGURATION'
elif 'disable' in vuln.get('solution', '').lower():
return 'DISABLE_SERVICE'
else:
return 'MANUAL_REVIEW'
def check_automation_possibility(self, vuln: Dict[str, Any]) -> tuple:
"""Check if vulnerability can be automatically remediated"""
# Check against playbook mapping
vuln_key = f"{vuln['scanner']}:{vuln['vulnerability_id']}"
if vuln_key in self.remediation_playbooks:
return True, self.remediation_playbooks[vuln_key]
# Check for generic remediation patterns
if vuln['cve_ids']:
for cve in vuln['cve_ids']:
if cve in self.remediation_playbooks:
return True, self.remediation_playbooks[cve]
# Check OS-specific patches
if vuln['operating_system'] and 'patch' in vuln.get('remediation_type', '').lower():
os_type = self.detect_os_type(vuln['operating_system'])
if os_type in ['ubuntu', 'centos', 'windows']:
return True, f"patch_{os_type}.yml"
return False, None
def detect_os_type(self, os_string: str) -> str:
"""Detect OS type from string"""
os_string_lower = os_string.lower()
if 'ubuntu' in os_string_lower:
return 'ubuntu'
elif 'centos' in os_string_lower or 'red hat' in os_string_lower:
return 'centos'
elif 'windows' in os_string_lower:
return 'windows'
return 'unknown'
async def execute_automated_remediation(self):
"""Execute automated remediation tasks"""
async with self.db.acquire() as conn:
# Get pending automated tasks
tasks = await conn.fetch('''
SELECT rt.*, a.ip_address, a.hostname, v.cve_ids
FROM remediation_tasks rt
JOIN assets a ON rt.asset_id = a.id
JOIN vulnerabilities v ON rt.vulnerability_id = v.id
WHERE rt.status = 'PENDING'
AND rt.automation_possible = TRUE
AND (rt.scheduled_for IS NULL OR rt.scheduled_for <= NOW())
ORDER BY rt.priority DESC, rt.created_at ASC
LIMIT 10
''')
for task in tasks:
await self.run_remediation_playbook(task)
async def run_remediation_playbook(self, task: Dict[str, Any]):
"""Execute Ansible playbook for remediation"""
playbook_path = f"{self.ansible_config['playbook_dir']}/{task['playbook_name']}"
# Update task status
async with self.db.acquire() as conn:
await conn.execute('''
UPDATE remediation_tasks
SET status = 'IN_PROGRESS', started_at = NOW()
WHERE id = $1
''', task['id'])
# Prepare inventory
inventory = {
'all': {
'hosts': {
task['hostname'] or task['ip_address']: {
'ansible_host': task['ip_address']
}
}
}
}
# Prepare extra vars
extra_vars = {
'target_host': task['hostname'] or task['ip_address'],
'vulnerability_cves': task['cve_ids'],
'remediation_task_id': task['id']
}
# Run playbook
result = await asyncio.get_event_loop().run_in_executor(
None,
ansible_runner.run,
{
'playbook': playbook_path,
'inventory': inventory,
'extravars': extra_vars,
'quiet': True
}
)
# Update task based on result
async with self.db.acquire() as conn:
if result.status == 'successful':
await conn.execute('''
UPDATE remediation_tasks
SET status = 'COMPLETED',
completed_at = NOW(),
notes = 'Automated remediation successful'
WHERE id = $1
''', task['id'])
# Schedule verification scan
await self.schedule_verification_scan(task)
else:
await conn.execute('''
UPDATE remediation_tasks
SET status = 'FAILED',
notes = $2
WHERE id = $1
''', task['id'], f"Playbook failed: {result.stderr}")
async def schedule_verification_scan(self, task: Dict[str, Any]):
"""Schedule a verification scan after remediation"""
# Wait 30 minutes before verification
verification_time = datetime.now() + timedelta(minutes=30)
async with self.db.acquire() as conn:
await conn.execute('''
INSERT INTO scan_history (
asset_id, scanner, scan_type,
started_at, status
) VALUES ($1, $2, $3, $4, $5)
''', task['asset_id'], 'verification',
f"verify_remediation_{task['id']}",
verification_time, 'scheduled')
# Example remediation playbooks YAML
remediation_playbooks_yaml = """
# Specific vulnerability remediations
'openvas:1.3.6.1.4.1.25623.1.0.123456':
playbook: 'fix_ssh_weak_ciphers.yml'
'nuclei:apache-struts-rce':
playbook: 'patch_struts.yml'
# CVE-based remediations
'CVE-2021-44228':
playbook: 'fix_log4j.yml'
'CVE-2021-34527':
playbook: 'fix_printnightmare.yml'
# Generic OS patching
'patch_ubuntu':
playbook: 'patch_ubuntu_system.yml'
'patch_centos':
playbook: 'patch_centos_system.yml'
'patch_windows':
playbook: 'patch_windows_system.yml'
"""
# Example Ansible playbook for remediation
ansible_playbook_example = """
---
- name: Fix SSH Weak Ciphers
hosts: ""
become: yes
vars:
strong_ciphers: "chacha20-poly1305@openssh.com,aes256-gcm@openssh.com,aes128-gcm@openssh.com"
strong_macs: "hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com"
strong_kex: "curve25519-sha256,curve25519-sha256@libssh.org"
tasks:
- name: Backup current SSH configuration
copy:
src: /etc/ssh/sshd_config
dest: /etc/ssh/sshd_config.
remote_src: yes
- name: Configure strong ciphers
lineinfile:
path: /etc/ssh/sshd_config
regexp: '^Ciphers'
line: "Ciphers "
state: present
- name: Configure strong MACs
lineinfile:
path: /etc/ssh/sshd_config
regexp: '^MACs'
line: "MACs "
state: present
- name: Configure strong KEX algorithms
lineinfile:
path: /etc/ssh/sshd_config
regexp: '^KexAlgorithms'
line: "KexAlgorithms "
state: present
- name: Test SSH configuration
command: sshd -t
register: sshd_test
- name: Restart SSH service
systemd:
name: sshd
state: restarted
when: sshd_test.rc == 0
- name: Verify SSH is running
wait_for:
port: 22
host: ""
timeout: 30
- name: Report success
uri:
url: "http://vuln-mgmt-api:8080/api/remediation//complete"
method: POST
body_format: json
body:
status: "completed"
message: "SSH weak ciphers remediated successfully"
"""
Dashboards and Reporting
Visualization is crucial for managing vulnerabilities at scale:
#!/usr/bin/env python3
"""
Grafana Dashboard Generator for Vulnerability Management
"""
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, Any, List
class VulnerabilityDashboard:
def __init__(self, grafana_url: str, api_key: str):
self.grafana_url = grafana_url
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def create_dashboard(self) -> Dict[str, Any]:
"""Create comprehensive vulnerability management dashboard"""
dashboard = {
"dashboard": {
"title": "Vulnerability Management Dashboard",
"tags": ["security", "vulnerabilities"],
"timezone": "browser",
"panels": [
self.create_summary_panel(),
self.create_severity_distribution_panel(),
self.create_trend_panel(),
self.create_top_vulnerabilities_panel(),
self.create_asset_risk_panel(),
self.create_remediation_metrics_panel(),
self.create_scanner_performance_panel(),
self.create_compliance_panel()
],
"time": {
"from": "now-30d",
"to": "now"
},
"refresh": "5m"
},
"overwrite": True
}
return dashboard
def create_summary_panel(self) -> Dict[str, Any]:
"""Summary statistics panel"""
return {
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 0},
"type": "stat",
"title": "Vulnerability Summary",
"targets": [
{
"rawSql": """
SELECT COUNT(*) as "Total Open Vulnerabilities"
FROM vulnerabilities
WHERE status = 'OPEN' AND NOT false_positive
""",
"format": "table"
},
{
"rawSql": """
SELECT COUNT(*) as "Critical"
FROM vulnerabilities
WHERE status = 'OPEN' AND severity = 'CRITICAL'
""",
"format": "table"
},
{
"rawSql": """
SELECT COUNT(*) as "High"
FROM vulnerabilities
WHERE status = 'OPEN' AND severity = 'HIGH'
""",
"format": "table"
},
{
"rawSql": """
SELECT COUNT(DISTINCT asset_id) as "Affected Assets"
FROM vulnerabilities
WHERE status = 'OPEN'
""",
"format": "table"
}
],
"options": {
"colorMode": "background",
"graphMode": "none",
"orientation": "horizontal",
"reduceOptions": {
"calcs": ["lastNotNull"]
}
}
}
def create_severity_distribution_panel(self) -> Dict[str, Any]:
"""Severity distribution pie chart"""
return {
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"type": "piechart",
"title": "Vulnerability Severity Distribution",
"targets": [{
"rawSql": """
SELECT severity, COUNT(*) as count
FROM vulnerabilities
WHERE status = 'OPEN' AND NOT false_positive
GROUP BY severity
""",
"format": "table"
}],
"options": {
"pieType": "donut",
"tooltipDisplayMode": "multi",
"legend": {
"displayMode": "table",
"placement": "right",
"values": ["value", "percent"]
}
},
"fieldConfig": {
"defaults": {
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 10},
{"color": "orange", "value": 50},
{"color": "red", "value": 100}
]
}
}
}
}
def create_trend_panel(self) -> Dict[str, Any]:
"""Vulnerability trend over time"""
return {
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
"type": "timeseries",
"title": "Vulnerability Trend",
"targets": [{
"rawSql": """
SELECT
date_trunc('day', detected_at) as time,
severity,
COUNT(*) as count
FROM vulnerabilities
WHERE detected_at > NOW() - INTERVAL '30 days'
GROUP BY time, severity
ORDER BY time
""",
"format": "time_series"
}],
"fieldConfig": {
"defaults": {
"custom": {
"lineInterpolation": "smooth",
"fillOpacity": 10,
"stacking": {
"mode": "normal"
}
}
}
}
}
def create_top_vulnerabilities_panel(self) -> Dict[str, Any]:
"""Table of top vulnerabilities"""
return {
"gridPos": {"h": 10, "w": 24, "x": 0, "y": 16},
"type": "table",
"title": "Top Vulnerabilities by Risk Score",
"targets": [{
"rawSql": """
SELECT
v.title,
v.severity,
v.cvss_score,
COUNT(DISTINCT v.asset_id) as affected_assets,
STRING_AGG(DISTINCT a.criticality, ', ') as asset_criticalities,
v.cve_ids[1] as primary_cve,
CASE
WHEN EXISTS (
SELECT 1 FROM remediation_tasks rt
WHERE rt.vulnerability_id = v.id
AND rt.status = 'COMPLETED'
) THEN 'Remediated'
WHEN EXISTS (
SELECT 1 FROM remediation_tasks rt
WHERE rt.vulnerability_id = v.id
AND rt.status IN ('PENDING', 'IN_PROGRESS')
) THEN 'In Progress'
ELSE 'Open'
END as remediation_status
FROM vulnerabilities v
JOIN assets a ON v.asset_id = a.id
WHERE v.status = 'OPEN' AND NOT v.false_positive
GROUP BY v.title, v.severity, v.cvss_score, v.cve_ids, v.id
ORDER BY
CASE v.severity
WHEN 'CRITICAL' THEN 1
WHEN 'HIGH' THEN 2
WHEN 'MEDIUM' THEN 3
ELSE 4
END,
v.cvss_score DESC NULLS LAST,
affected_assets DESC
LIMIT 20
""",
"format": "table"
}],
"options": {
"showHeader": true
},
"fieldConfig": {
"overrides": [
{
"matcher": {"id": "byName", "options": "severity"},
"properties": [{
"id": "custom.displayMode",
"value": "color-background"
}]
}
]
}
}
def create_compliance_panel(self) -> Dict[str, Any]:
"""Compliance metrics panel"""
return {
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 26},
"type": "gauge",
"title": "Compliance Metrics",
"targets": [
{
"rawSql": """
SELECT
(COUNT(*) FILTER (WHERE
NOT EXISTS (
SELECT 1 FROM vulnerabilities v
WHERE v.asset_id = a.id
AND v.status = 'OPEN'
AND v.severity IN ('CRITICAL', 'HIGH')
)
) * 100.0 / COUNT(*))::INTEGER as "Critical Asset Compliance %"
FROM assets a
WHERE a.criticality = 'CRITICAL' AND a.active = TRUE
""",
"format": "table"
}
],
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"showThresholdLabels": true,
"showThresholdMarkers": true
},
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "red", "value": null},
{"color": "yellow", "value": 80},
{"color": "green", "value": 95}
]
},
"unit": "percent",
"min": 0,
"max": 100
}
}
}
def deploy_dashboard(self):
"""Deploy dashboard to Grafana"""
dashboard = self.create_dashboard()
response = requests.post(
f"{self.grafana_url}/api/dashboards/db",
headers=self.headers,
json=dashboard
)
if response.status_code == 200:
print(f"Dashboard created successfully: {response.json()}")
else:
print(f"Failed to create dashboard: {response.text}")
# Usage
dashboard_creator = VulnerabilityDashboard(
grafana_url="http://localhost:3000",
api_key="your-grafana-api-key"
)
dashboard_creator.deploy_dashboard()
Integration and Automation
The key to scaling vulnerability management is automation:
# Apache Airflow DAG for automated vulnerability management
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'security-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'vulnerability_management',
default_args=default_args,
description='Automated vulnerability management workflow',
schedule_interval='0 */6 * * *', # Every 6 hours
catchup=False
)
# Asset discovery
asset_discovery = PythonOperator(
task_id='asset_discovery',
python_callable=run_asset_discovery,
dag=dag
)
# Vulnerability scanning
vulnerability_scan = PythonOperator(
task_id='vulnerability_scanning',
python_callable=run_vulnerability_scans,
dag=dag
)
# Create remediation tasks
create_remediations = PythonOperator(
task_id='create_remediation_tasks',
python_callable=create_remediation_tasks,
dag=dag
)
# Execute automated remediations
auto_remediate = PythonOperator(
task_id='automated_remediation',
python_callable=execute_automated_remediation,
dag=dag
)
# Generate reports
generate_reports = PythonOperator(
task_id='generate_reports',
python_callable=generate_vulnerability_reports,
dag=dag
)
# Define workflow
asset_discovery >> vulnerability_scan >> create_remediations >> auto_remediate >> generate_reports
Lessons Learned
After years of building and running vulnerability management programs:
1. Start with Asset Management
You can't secure what you don't know exists. Invest heavily in discovery.
2. Automate Everything Possible
Manual processes don't scale. Automate scanning, ticketing, and remediation.
3. Context is King
A critical vulnerability on a development server != critical vulnerability on production domain controller.
4. Measure What Matters
- Mean Time to Detect (MTTD)
- Mean Time to Remediate (MTTR)
- Vulnerability aging
- Coverage percentage
5. Integration is Essential
Vulnerability management tools must integrate with:
- CMDB/Asset Management
- Ticketing systems
- CI/CD pipelines
- SIEM/SOAR platforms
Conclusion
Building an effective vulnerability management program with open source tools is not just possible – it can be superior to commercial solutions. The key is thoughtful integration, automation, and continuous improvement.
Start small with asset discovery and basic scanning, then gradually add automation, integration, and advanced features. The framework presented here has successfully managed vulnerabilities across thousands of systems in production environments.
Remember: vulnerability management is a program, not a project. Build it to be sustainable, scalable, and automated from day one.
Questions about scaling vulnerability management? Want to share your open source security stack? Let's connect and improve our collective security posture!
Related Posts
Implementing DNS-over-HTTPS (DoH) for Home Networks
Complete guide to deploying DNS-over-HTTPS on your home network for enhanced privacy and security, w...
eBPF for Security Monitoring: A Practical Guide
Learn how to leverage eBPF for real-time security monitoring in Linux environments with practical ex...
Local LLM Deployment: Privacy-First Approach
Learn how to deploy Large Language Models locally for maximum privacy and security. Complete guide c...