Automating Home Network Security with Python and Open Source Tools
Practical automation scripts and tools I've built to keep my home network secure without constant manual intervention
Reading time: 9 minutes
The Problem: Security Doesn't Scale Without Automation
Managing home network security is like being a one-person SOC (Security Operations Center). You've got multiple devices, various family members with different tech literacy levels, and new threats emerging daily. Manual security management simply doesn't scale – especially when you're also trying to be present for bedtime stories.
This post shares the Python scripts and automation workflows I've developed to maintain security without sacrificing family time.
The Foundation: Network Discovery and Asset Management
First challenge: knowing what's actually on your network. New devices appear constantly – kids' friends' phones, that new smart gadget someone bought, the mysterious device that might be the neighbor's printer.
Automated Device Discovery
Here's my network discovery script that runs hourly:
#!/usr/bin/env python3
import nmap
import json
import sqlite3
from datetime import datetime
from pathlib import Path
class NetworkAssetManager:
def __init__(self, db_path="network_assets.db"):
self.db_path = db_path
self.nm = nmap.PortScanner()
self.setup_database()
def setup_database(self):
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS devices (
mac_address TEXT PRIMARY KEY,
ip_address TEXT,
hostname TEXT,
vendor TEXT,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
trusted BOOLEAN DEFAULT 0,
notes TEXT
)
''')
conn.commit()
conn.close()
def scan_network(self, network="192.168.1.0/24"):
"""Scan network and identify all devices"""
print(f"Scanning {network}...")
self.nm.scan(hosts=network, arguments='-sn')
devices = []
for host in self.nm.all_hosts():
if 'mac' in self.nm[host]['addresses']:
device = {
'ip': host,
'mac': self.nm[host]['addresses']['mac'],
'hostname': self.nm[host].hostname(),
'vendor': self.nm[host]['vendor'].get(
self.nm[host]['addresses']['mac'], 'Unknown'
)
}
devices.append(device)
self.update_device(device)
return devices
def update_device(self, device):
"""Update or insert device in database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO devices (mac_address, ip_address, hostname, vendor, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(mac_address) DO UPDATE SET
ip_address = excluded.ip_address,
hostname = excluded.hostname,
last_seen = excluded.last_seen
''', (
device['mac'],
device['ip'],
device['hostname'],
device['vendor'],
datetime.now(),
datetime.now()
))
conn.commit()
conn.close()
def get_new_devices(self, hours=1):
"""Get devices first seen in the last N hours"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM devices
WHERE first_seen > datetime('now', '-{} hours')
AND trusted = 0
'''.format(hours))
new_devices = cursor.fetchall()
conn.close()
return new_devices
# Alert on new devices
if __name__ == "__main__":
manager = NetworkAssetManager()
manager.scan_network()
new_devices = manager.get_new_devices()
if new_devices:
print(f"⚠️ Found {len(new_devices)} new devices!")
# Send notification (covered in the notification section)
DNS Monitoring and Ad Blocking
One of the most effective security measures is controlling DNS. I use Pi-hole for ad blocking but enhanced it with security monitoring.
Detecting Suspicious DNS Queries
This script monitors DNS logs for suspicious patterns:
import re
import sqlite3
from collections import defaultdict
from datetime import datetime, timedelta
class DNSSecurityMonitor:
def __init__(self, pihole_db="/etc/pihole/pihole-FTL.db"):
self.pihole_db = pihole_db
self.suspicious_patterns = [
r'.*\.tk$', # Suspicious TLD
r'.*\.ml$',
r'.*\.ga$',
r'[0-9]{1,3}-[0-9]{1,3}-[0-9]{1,3}-[0-9]{1,3}', # IP in domain
r'.*\..*\..*\..*\..*\.', # Excessive subdomains
r'.*(bitcoin|crypto|miner).*', # Crypto mining
r'^[a-f0-9]{32}\..*', # Potential DGA domains
]
# Known malware C2 domains (simplified list)
self.known_bad_domains = self.load_threat_intel()
def check_dns_queries(self, time_window_minutes=60):
"""Analyze recent DNS queries for suspicious activity"""
conn = sqlite3.connect(self.pihole_db)
cursor = conn.cursor()
# Get recent queries
timestamp = int((datetime.now() - timedelta(minutes=time_window_minutes)).timestamp())
cursor.execute('''
SELECT domain, client, COUNT(*) as count
FROM queries
WHERE timestamp > ?
GROUP BY domain, client
''', (timestamp,))
alerts = []
for domain, client, count in cursor.fetchall():
# Check against patterns
for pattern in self.suspicious_patterns:
if re.match(pattern, domain, re.IGNORECASE):
alerts.append({
'type': 'suspicious_pattern',
'domain': domain,
'client': client,
'pattern': pattern,
'count': count
})
# Check against threat intel
if domain in self.known_bad_domains:
alerts.append({
'type': 'known_malware',
'domain': domain,
'client': client,
'count': count,
'severity': 'high'
})
# Check for DNS tunneling (high frequency of unique subdomains)
if self.detect_dns_tunneling(domain, count):
alerts.append({
'type': 'possible_dns_tunnel',
'domain': domain,
'client': client,
'count': count
})
conn.close()
return alerts
def detect_dns_tunneling(self, domain, query_count):
"""Detect potential DNS tunneling based on query patterns"""
# High number of queries to single domain with varying subdomains
if query_count > 100:
# Additional logic to check subdomain randomness
return True
return False
def load_threat_intel(self):
"""Load threat intelligence feeds"""
# In production, this would fetch from multiple sources
# For now, a simple set of known bad domains
return {
'malware-c2-server.com',
'evil-phishing-site.net',
# Add more from threat feeds
}
Automated Vulnerability Scanning
Keeping devices patched is crucial. This script runs weekly to identify vulnerable services:
import nmap
import vulners
import json
from datetime import datetime
class HomeVulnerabilityScanner:
def __init__(self):
self.nm = nmap.PortScanner()
self.vulners_api = vulners.Vulners()
def scan_device(self, ip_address):
"""Perform vulnerability scan on a single device"""
print(f"Scanning {ip_address}...")
# Service detection scan
self.nm.scan(ip_address, '1-65535', '-sV')
vulnerabilities = []
for port in self.nm[ip_address]['tcp']:
service_info = self.nm[ip_address]['tcp'][port]
if 'product' in service_info and 'version' in service_info:
# Search for CVEs
search_query = f"{service_info['product']} {service_info['version']}"
try:
results = self.vulners_api.search(search_query, limit=5)
for vuln in results:
if vuln['cvss']['score'] >= 7.0: # High severity
vulnerabilities.append({
'port': port,
'service': service_info['product'],
'version': service_info['version'],
'cve': vuln.get('id'),
'cvss': vuln['cvss']['score'],
'description': vuln.get('description', '')[:200]
})
except:
pass
return vulnerabilities
def generate_report(self, scan_results):
"""Generate actionable vulnerability report"""
report = {
'scan_date': datetime.now().isoformat(),
'total_devices': len(scan_results),
'vulnerable_devices': sum(1 for r in scan_results if r['vulnerabilities']),
'critical_findings': []
}
for result in scan_results:
if result['vulnerabilities']:
for vuln in result['vulnerabilities']:
if vuln['cvss'] >= 9.0:
report['critical_findings'].append({
'device': result['ip'],
'vulnerability': vuln
})
return report
Smart Firewall Rules Management
Static firewall rules don't adapt to changing threats. Here's how I automate rule updates:
import subprocess
import ipaddress
from datetime import datetime, timedelta
class DynamicFirewall:
def __init__(self):
self.rules_db = "firewall_rules.db"
self.setup_database()
def add_temporary_block(self, ip_address, duration_hours=24, reason=""):
"""Add temporary firewall block"""
expiry = datetime.now() + timedelta(hours=duration_hours)
# Add to pfSense (using pfSsh.php)
command = f"""
pfSsh.php playback addtable bruteforce {ip_address}
"""
subprocess.run(['ssh', 'pfsense', command])
# Log the action
conn = sqlite3.connect(self.rules_db)
conn.execute('''
INSERT INTO temporary_blocks (ip_address, reason, created_at, expires_at)
VALUES (?, ?, ?, ?)
''', (ip_address, reason, datetime.now(), expiry))
conn.commit()
conn.close()
def auto_block_failed_auth(self, threshold=5):
"""Automatically block IPs with excessive failed authentications"""
# Parse auth logs
failed_attempts = defaultdict(int)
with open('/var/log/auth.log', 'r') as f:
for line in f:
if 'Failed password' in line:
# Extract IP address
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
failed_attempts[ip_match.group(1)] += 1
# Block IPs exceeding threshold
for ip, count in failed_attempts.items():
if count >= threshold:
self.add_temporary_block(
ip,
duration_hours=48,
reason=f"Failed auth attempts: {count}"
)
def update_geo_blocks(self):
"""Update geo-blocking rules based on threat intelligence"""
# Fetch current threat landscape
high_risk_countries = self.get_high_risk_countries()
for country_code in high_risk_countries:
# Update pfBlockerNG lists
self.update_pf_blocker_list(country_code)
Notification System
All this automation is useless if you don't know what's happening. Here's my notification system:
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class SecurityNotifier:
def __init__(self, config):
self.config = config
self.notification_history = []
def send_alert(self, alert_type, message, severity="medium"):
"""Send security alert through multiple channels"""
# Determine notification method based on severity
if severity == "critical":
self.send_sms(message)
self.send_email(f"CRITICAL: {alert_type}", message)
self.send_pushover(message, priority=2)
elif severity == "high":
self.send_email(f"Alert: {alert_type}", message)
self.send_pushover(message, priority=1)
else:
self.send_email(f"Notice: {alert_type}", message)
# Log notification
self.notification_history.append({
'timestamp': datetime.now(),
'type': alert_type,
'severity': severity,
'message': message
})
def send_pushover(self, message, priority=0):
"""Send push notification to phone"""
requests.post("https://api.pushover.net/1/messages.json", data={
"token": self.config['pushover_token'],
"user": self.config['pushover_user'],
"message": message,
"priority": priority
})
def daily_summary(self):
"""Send daily security summary"""
summary = self.generate_daily_summary()
html_content = f"""
<html>
<body>
<h2>Daily Security Summary</h2>
<p>Report for {datetime.now().strftime('%Y-%m-%d')}</p>
<h3>Network Status</h3>
<ul>
<li>Total Devices: {summary['total_devices']}</li>
<li>New Devices: {summary['new_devices']}</li>
<li>Security Events: {summary['security_events']}</li>
</ul>
<h3>Blocked Threats</h3>
<ul>
<li>Malicious IPs: {summary['blocked_ips']}</li>
<li>Suspicious DNS: {summary['blocked_dns']}</li>
<li>Failed Auth: {summary['failed_auth']}</li>
</ul>
<h3>Action Items</h3>
{self.format_action_items(summary['action_items'])}
</body>
</html>
"""
self.send_email("Daily Security Summary", html_content, html=True)
Putting It All Together
The real power comes from orchestrating these scripts. Here's my master automation script:
#!/usr/bin/env python3
"""
Home Security Automation Orchestrator
Runs various security checks and responds to threats
"""
import schedule
import time
from threading import Thread
class SecurityOrchestrator:
def __init__(self):
self.asset_manager = NetworkAssetManager()
self.dns_monitor = DNSSecurityMonitor()
self.vuln_scanner = HomeVulnerabilityScanner()
self.firewall = DynamicFirewall()
self.notifier = SecurityNotifier(config)
def hourly_tasks(self):
"""Run every hour"""
# Network discovery
self.asset_manager.scan_network()
new_devices = self.asset_manager.get_new_devices()
if new_devices:
message = f"Found {len(new_devices)} new devices on network"
self.notifier.send_alert("New Devices", message, "medium")
# DNS monitoring
dns_alerts = self.dns_monitor.check_dns_queries()
for alert in dns_alerts:
if alert['type'] == 'known_malware':
self.firewall.add_temporary_block(alert['client'])
self.notifier.send_alert("Malware Blocked",
f"Blocked {alert['client']} accessing {alert['domain']}",
"high")
def daily_tasks(self):
"""Run daily"""
# Update firewall rules
self.firewall.auto_block_failed_auth()
self.firewall.update_geo_blocks()
# Send summary
self.notifier.daily_summary()
def weekly_tasks(self):
"""Run weekly"""
# Vulnerability scanning
devices = self.asset_manager.get_all_devices()
scan_results = []
for device in devices:
if device['trusted']:
vulns = self.vuln_scanner.scan_device(device['ip'])
scan_results.append({
'ip': device['ip'],
'vulnerabilities': vulns
})
report = self.vuln_scanner.generate_report(scan_results)
if report['critical_findings']:
self.notifier.send_alert("Critical Vulnerabilities Found",
json.dumps(report, indent=2), "critical")
def run(self):
"""Start the orchestrator"""
# Schedule tasks
schedule.every().hour.do(self.hourly_tasks)
schedule.every().day.at("09:00").do(self.daily_tasks)
schedule.every().monday.at("10:00").do(self.weekly_tasks)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
orchestrator = SecurityOrchestrator()
orchestrator.run()
Lessons Learned
1. Start with Visibility
You can't secure what you can't see. Network discovery and asset management should be your first automation project.
2. Alert Fatigue is Real
Fine-tune your alerts. Too many notifications and you'll start ignoring them. I learned this the hard way when my phone wouldn't stop buzzing.
3. Family-Friendly Automation
Your security automation shouldn't impact family life. My scripts include:
- Whitelisting for family devices
- "Quiet hours" for non-critical alerts
- Easy override mechanisms
4. Test in Isolation
Always test security automation in an isolated environment first. I once accidentally blocked my entire home network. The family was... not amused.
5. Document Everything
Future you (or your family when you're not home) needs to understand how to disable things. I maintain a simple wiki with:
- What each script does
- How to temporarily disable automation
- Emergency contacts
Tools and Resources
Here are the key tools I use:
- nmap: Network discovery and port scanning
- Pi-hole: DNS filtering and logging
- pfSense: Firewall and routing
- Python libraries: python-nmap, vulners, schedule
- Notification: Pushover for mobile alerts
What's Next?
Security automation is an ongoing journey. My upcoming projects include:
- Machine learning for anomaly detection
- Automated incident response playbooks
- Integration with threat intelligence feeds
- Voice alerts for critical events ("Alexa, announce security alert")
Conclusion
Automating home network security has transformed my approach to protecting my family's digital life. Instead of constantly checking logs and running manual scans, I can focus on improving defenses while automation handles the routine work.
The best part? When my kids ask what I do for work, I can show them our "home security robot" in action. Nothing beats seeing their faces light up when they understand that Dad's job is basically being a cyber superhero.
Remember: the goal isn't to build Fort Knox, it's to raise the bar high enough that attackers move on to easier targets. Automation helps you maintain that bar without burning out.
Have questions about any of these scripts? Want to share your own automation ideas? Drop me a line – I love connecting with fellow security automation enthusiasts!
Related Posts
eBPF for Security Monitoring: A Practical Guide
Learn how to leverage eBPF for real-time security monitoring in Linux environments with practical ex...
Building a Security Mindset: Lessons from the Field
After 15+ years in cybersecurity, here are the key lessons about developing a security-first mindset...
From IT Support to Senior InfoSec Engineer: My 15+ Year Journey
The winding path from fixing printers to securing federal systems – lessons learned, mistakes made,...