System Administration: Linux Infrastructure Management and Automation
System Administration: Linux Infrastructure Management and Automation
In this post, I’ll share insights from my System Administration project, which demonstrates comprehensive Linux infrastructure management, automation, monitoring, and security best practices through practical system administration tasks and tools.
Project Overview
The System Administration project showcases advanced Linux system administration skills, including infrastructure automation, monitoring setup, security hardening, backup strategies, and performance optimization. This project demonstrates real-world system administration practices and automation techniques.
Technical Architecture
Project Structure
SystemAdministration/
├── automation/
│ ├── backup_scripts/
│ │ ├── full_backup.sh
│ │ ├── incremental_backup.sh
│ │ └── restore_backup.sh
│ ├── deployment/
│ │ ├── deploy_app.sh
│ │ ├── rollback.sh
│ │ └── health_check.sh
│ ├── maintenance/
│ │ ├── log_rotation.sh
│ │ ├── disk_cleanup.sh
│ │ └── system_update.sh
│ └── monitoring/
│ ├── system_monitor.py
│ ├── alert_manager.py
│ └── log_analyzer.py
├── security/
│ ├── hardening/
│ │ ├── firewall_setup.sh
│ │ ├── ssh_hardening.sh
│ │ └── user_management.sh
│ ├── audit/
│ │ ├── security_audit.sh
│ │ ├── vulnerability_scan.py
│ │ └── compliance_check.sh
│ └── encryption/
│ ├── disk_encryption.sh
│ ├── file_encryption.py
│ └── ssl_setup.sh
├── networking/
│ ├── network_config/
│ │ ├── static_ip.sh
│ │ ├── dns_setup.sh
│ │ └── routing_config.sh
│ ├── services/
│ │ ├── nginx_setup.sh
│ │ ├── apache_setup.sh
│ │ └── database_setup.sh
│ └── load_balancing/
│ ├── haproxy_config.sh
│ ├── nginx_lb.sh
│ └── keepalived_setup.sh
├── storage/
│ ├── lvm/
│ │ ├── lvm_setup.sh
│ │ ├── lvm_extend.sh
│ │ └── lvm_snapshot.sh
│ ├── raid/
│ │ ├── raid_setup.sh
│ │ ├── raid_monitor.sh
│ │ └── raid_recovery.sh
│ └── nfs/
│ ├── nfs_server.sh
│ ├── nfs_client.sh
│ └── nfs_permissions.sh
├── containers/
│ ├── docker/
│ │ ├── docker_setup.sh
│ │ ├── docker_compose.yml
│ │ └── container_monitor.py
│ ├── kubernetes/
│ │ ├── k8s_setup.sh
│ │ ├── pod_deployment.yaml
│ │ └── service_config.yaml
│ └── virtualization/
│ ├── kvm_setup.sh
│ ├── vm_management.py
│ └── vm_backup.sh
└── documentation/
├── procedures/
├── troubleshooting/
└── best_practices/Core Implementation
Automated Backup System
#!/bin/bash
# automation/backup_scripts/full_backup.sh
set -euo pipefail
# Configuration
BACKUP_DIR="/backup"
SOURCE_DIRS=("/home" "/etc" "/var/log" "/opt")
RETENTION_DAYS=30
LOG_FILE="/var/log/backup.log"
EMAIL_ALERT="admin@company.com"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Logging function
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Error handling
error_exit() {
log "${RED}ERROR: $1${NC}"
echo "$1" | mail -s "Backup Failed - $(hostname)" "$EMAIL_ALERT"
exit 1
}
# Success notification
success_notification() {
log "${GREEN}SUCCESS: $1${NC}"
echo "$1" | mail -s "Backup Completed - $(hostname)" "$EMAIL_ALERT"
}
# Check prerequisites
check_prerequisites() {
log "Checking prerequisites..."
# Check if running as root
if [[ $EUID -ne 0 ]]; then
error_exit "This script must be run as root"
fi
# Check available disk space
local available_space=$(df "$BACKUP_DIR" | awk 'NR==2 {print $4}')
local required_space=5000000 # 5GB in KB
if [[ $available_space -lt $required_space ]]; then
error_exit "Insufficient disk space. Required: 5GB, Available: ${available_space}KB"
fi
# Check if backup directory exists
if [[ ! -d "$BACKUP_DIR" ]]; then
log "Creating backup directory: $BACKUP_DIR"
mkdir -p "$BACKUP_DIR"
fi
log "Prerequisites check completed"
}
# Create backup
create_backup() {
local backup_name="full_backup_$(date +%Y%m%d_%H%M%S)"
local backup_path="$BACKUP_DIR/$backup_name"
log "Starting full backup: $backup_name"
# Create backup directory
mkdir -p "$backup_path"
# Backup each source directory
for source_dir in "${SOURCE_DIRS[@]}"; do
if [[ -d "$source_dir" ]]; then
log "Backing up: $source_dir"
# Use rsync for efficient backup
rsync -av --delete --exclude='*.tmp' --exclude='*.log' \
"$source_dir/" "$backup_path/$(basename "$source_dir")/"
if [[ $? -eq 0 ]]; then
log "Successfully backed up: $source_dir"
else
error_exit "Failed to backup: $source_dir"
fi
else
log "${YELLOW}Warning: Source directory not found: $source_dir${NC}"
fi
done
# Create compressed archive
log "Creating compressed archive..."
cd "$BACKUP_DIR"
tar -czf "${backup_name}.tar.gz" "$backup_name"
if [[ $? -eq 0 ]]; then
log "Archive created: ${backup_name}.tar.gz"
rm -rf "$backup_path" # Remove uncompressed directory
else
error_exit "Failed to create archive"
fi
# Calculate backup size
local backup_size=$(du -h "${backup_name}.tar.gz" | cut -f1)
log "Backup size: $backup_size"
echo "$backup_name"
}
# Verify backup integrity
verify_backup() {
local backup_name="$1"
local backup_file="$BACKUP_DIR/${backup_name}.tar.gz"
log "Verifying backup integrity..."
# Test archive integrity
tar -tzf "$backup_file" > /dev/null
if [[ $? -eq 0 ]]; then
log "Backup integrity verified"
else
error_exit "Backup integrity check failed"
fi
}
# Cleanup old backups
cleanup_old_backups() {
log "Cleaning up backups older than $RETENTION_DAYS days..."
find "$BACKUP_DIR" -name "full_backup_*.tar.gz" -type f -mtime +$RETENTION_DAYS -delete
if [[ $? -eq 0 ]]; then
log "Old backups cleaned up"
else
log "${YELLOW}Warning: Failed to clean up old backups${NC}"
fi
}
# Generate backup report
generate_report() {
local backup_name="$1"
local backup_file="$BACKUP_DIR/${backup_name}.tar.gz"
log "Generating backup report..."
local report_file="/tmp/backup_report_$(date +%Y%m%d).txt"
cat > "$report_file" << EOF
Backup Report - $(date)
================================
Backup Name: $backup_name
Backup File: $backup_file
Backup Size: $(du -h "$backup_file" | cut -f1)
Backup Date: $(date)
Hostname: $(hostname)
Source Directories:
$(printf '%s\n' "${SOURCE_DIRS[@]}")
Disk Usage:
$(df -h "$BACKUP_DIR")
Backup Status: SUCCESS
EOF
log "Report generated: $report_file"
}
# Main execution
main() {
log "Starting full backup process..."
check_prerequisites
local backup_name=$(create_backup)
verify_backup "$backup_name"
cleanup_old_backups
generate_report "$backup_name"
success_notification "Full backup completed successfully: $backup_name"
log "Full backup process completed"
}
# Run main function
main "$@"System Monitoring Script
#!/usr/bin/env python3
# automation/monitoring/system_monitor.py
import psutil
import time
import json
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import argparse
import os
class SystemMonitor:
def __init__(self, config_file=None):
self.setup_logging()
self.load_config(config_file)
self.alerts_sent = set()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/system_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_config(self, config_file):
"""Load monitoring configuration"""
default_config = {
'cpu_threshold': 80.0,
'memory_threshold': 85.0,
'disk_threshold': 90.0,
'check_interval': 60,
'email_alerts': True,
'email_recipients': ['admin@company.com'],
'smtp_server': 'localhost',
'smtp_port': 587,
'smtp_username': '',
'smtp_password': ''
}
if config_file and os.path.exists(config_file):
with open(config_file, 'r') as f:
self.config = {**default_config, **json.load(f)}
else:
self.config = default_config
self.logger.info("Configuration loaded")
def get_system_metrics(self):
"""Collect system metrics"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory': psutil.virtual_memory(),
'disk': psutil.disk_usage('/'),
'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else None,
'network': psutil.net_io_counters(),
'processes': len(psutil.pids())
}
return metrics
def check_thresholds(self, metrics):
"""Check if metrics exceed thresholds"""
alerts = []
# CPU check
if metrics['cpu_percent'] > self.config['cpu_threshold']:
alerts.append({
'type': 'cpu',
'value': metrics['cpu_percent'],
'threshold': self.config['cpu_threshold'],
'message': f"High CPU usage: {metrics['cpu_percent']:.1f}%"
})
# Memory check
memory_percent = metrics['memory'].percent
if memory_percent > self.config['memory_threshold']:
alerts.append({
'type': 'memory',
'value': memory_percent,
'threshold': self.config['memory_threshold'],
'message': f"High memory usage: {memory_percent:.1f}%"
})
# Disk check
disk_percent = (metrics['disk'].used / metrics['disk'].total) * 100
if disk_percent > self.config['disk_threshold']:
alerts.append({
'type': 'disk',
'value': disk_percent,
'threshold': self.config['disk_threshold'],
'message': f"High disk usage: {disk_percent:.1f}%"
})
return alerts
def send_alert(self, alert):
"""Send alert notification"""
if not self.config['email_alerts']:
return
# Create alert key to prevent spam
alert_key = f"{alert['type']}_{int(time.time() / 300)}" # 5-minute window
if alert_key in self.alerts_sent:
return
try:
msg = MIMEMultipart()
msg['From'] = self.config['smtp_username']
msg['To'] = ', '.join(self.config['email_recipients'])
msg['Subject'] = f"System Alert - {alert['type'].upper()}"
body = f"""
System Alert Detected
Alert Type: {alert['type'].upper()}
Message: {alert['message']}
Current Value: {alert['value']:.1f}%
Threshold: {alert['threshold']:.1f}%
Timestamp: {datetime.now()}
Hostname: {os.uname().nodename}
Please investigate this issue immediately.
"""
msg.attach(MIMEText(body, 'plain'))
# Send email
server = smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port'])
server.starttls()
server.login(self.config['smtp_username'], self.config['smtp_password'])
text = msg.as_string()
server.sendmail(self.config['smtp_username'],
self.config['email_recipients'], text)
server.quit()
self.alerts_sent.add(alert_key)
self.logger.info(f"Alert sent: {alert['message']}")
except Exception as e:
self.logger.error(f"Failed to send alert: {e}")
def log_metrics(self, metrics):
"""Log metrics to file"""
log_entry = {
'timestamp': metrics['timestamp'],
'cpu_percent': metrics['cpu_percent'],
'memory_percent': metrics['memory'].percent,
'memory_used_gb': metrics['memory'].used / (1024**3),
'memory_total_gb': metrics['memory'].total / (1024**3),
'disk_percent': (metrics['disk'].used / metrics['disk'].total) * 100,
'disk_used_gb': metrics['disk'].used / (1024**3),
'disk_total_gb': metrics['disk'].total / (1024**3),
'processes': metrics['processes']
}
# Log to JSON file for analysis
with open('/var/log/system_metrics.json', 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def run_monitoring_loop(self):
"""Main monitoring loop"""
self.logger.info("Starting system monitoring")
while True:
try:
metrics = self.get_system_metrics()
alerts = self.check_thresholds(metrics)
# Send alerts
for alert in alerts:
self.send_alert(alert)
# Log metrics
self.log_metrics(metrics)
# Log status
self.logger.info(f"CPU: {metrics['cpu_percent']:.1f}%, "
f"Memory: {metrics['memory'].percent:.1f}%, "
f"Disk: {(metrics['disk'].used/metrics['disk'].total)*100:.1f}%")
time.sleep(self.config['check_interval'])
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
break
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
time.sleep(60) # Wait before retrying
def generate_report(self, hours=24):
"""Generate system report"""
report_file = f"/tmp/system_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
# Read metrics from log file
metrics_data = []
try:
with open('/var/log/system_metrics.json', 'r') as f:
for line in f:
metrics_data.append(json.loads(line.strip()))
except FileNotFoundError:
self.logger.error("No metrics data found")
return
# Filter data for specified hours
cutoff_time = datetime.now().timestamp() - (hours * 3600)
recent_metrics = [m for m in metrics_data
if datetime.fromisoformat(m['timestamp']).timestamp() > cutoff_time]
if not recent_metrics:
self.logger.error("No recent metrics data found")
return
# Generate HTML report
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>System Report - {hours}h</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.metric {{ margin: 10px 0; padding: 10px; border: 1px solid #ddd; }}
.high {{ background-color: #ffebee; }}
.medium {{ background-color: #fff3e0; }}
.low {{ background-color: #e8f5e8; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>System Report - Last {hours} Hours</h1>
<p>Generated: {datetime.now()}</p>
<h2>Summary Statistics</h2>
<table>
<tr><th>Metric</th><th>Average</th><th>Maximum</th><th>Minimum</th></tr>
<tr><td>CPU Usage</td><td>{sum(m['cpu_percent'] for m in recent_metrics)/len(recent_metrics):.1f}%</td>
<td>{max(m['cpu_percent'] for m in recent_metrics):.1f}%</td>
<td>{min(m['cpu_percent'] for m in recent_metrics):.1f}%</td></tr>
<tr><td>Memory Usage</td><td>{sum(m['memory_percent'] for m in recent_metrics)/len(recent_metrics):.1f}%</td>
<td>{max(m['memory_percent'] for m in recent_metrics):.1f}%</td>
<td>{min(m['memory_percent'] for m in recent_metrics):.1f}%</td></tr>
<tr><td>Disk Usage</td><td>{sum(m['disk_percent'] for m in recent_metrics)/len(recent_metrics):.1f}%</td>
<td>{max(m['disk_percent'] for m in recent_metrics):.1f}%</td>
<td>{min(m['disk_percent'] for m in recent_metrics):.1f}%</td></tr>
</table>
<h2>Recent Metrics</h2>
<table>
<tr><th>Timestamp</th><th>CPU %</th><th>Memory %</th><th>Disk %</th><th>Processes</th></tr>
"""
for metric in recent_metrics[-50:]: # Last 50 entries
html_content += f"""
<tr>
<td>{metric['timestamp']}</td>
<td>{metric['cpu_percent']:.1f}</td>
<td>{metric['memory_percent']:.1f}</td>
<td>{metric['disk_percent']:.1f}</td>
<td>{metric['processes']}</td>
</tr>
"""
html_content += """
</table>
</body>
</html>
"""
with open(report_file, 'w') as f:
f.write(html_content)
self.logger.info(f"Report generated: {report_file}")
def main():
parser = argparse.ArgumentParser(description='System Monitoring Tool')
parser.add_argument('--config', help='Configuration file path')
parser.add_argument('--report', type=int, help='Generate report for last N hours')
parser.add_argument('--daemon', action='store_true', help='Run as daemon')
args = parser.parse_args()
monitor = SystemMonitor(args.config)
if args.report:
monitor.generate_report(args.report)
else:
monitor.run_monitoring_loop()
if __name__ == '__main__':
main()Security Hardening Script
#!/bin/bash
# security/hardening/firewall_setup.sh
set -euo pipefail
# Configuration
IPTABLES_RULES="/etc/iptables/rules.v4"
IPTABLES_BACKUP="/etc/iptables/rules.v4.backup.$(date +%Y%m%d_%H%M%S)"
LOG_FILE="/var/log/firewall_setup.log"
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
error_exit() {
log "${RED}ERROR: $1${NC}"
exit 1
}
# Backup existing rules
backup_rules() {
log "Backing up existing iptables rules..."
if [[ -f "$IPTABLES_RULES" ]]; then
cp "$IPTABLES_RULES" "$IPTABLES_BACKUP"
log "Rules backed up to: $IPTABLES_BACKUP"
fi
}
# Install required packages
install_packages() {
log "Installing required packages..."
if command -v apt-get &> /dev/null; then
apt-get update
apt-get install -y iptables-persistent ufw fail2ban
elif command -v yum &> /dev/null; then
yum install -y iptables-services firewalld fail2ban
else
error_exit "Unsupported package manager"
fi
}
# Configure basic firewall rules
configure_basic_rules() {
log "Configuring basic firewall rules..."
# Flush existing rules
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X
# Set default policies
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# Allow loopback
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT
# Allow established connections
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# Allow ICMP (ping)
iptables -A INPUT -p icmp --icmp-type echo-request -j ACCEPT
log "Basic rules configured"
}
# Configure SSH access
configure_ssh() {
local ssh_port=${1:-22}
log "Configuring SSH access on port $ssh_port..."
# Allow SSH
iptables -A INPUT -p tcp --dport "$ssh_port" -m state --state NEW,ESTABLISHED -j ACCEPT
# Rate limiting for SSH
iptables -A INPUT -p tcp --dport "$ssh_port" -m recent --set --name SSH
iptables -A INPUT -p tcp --dport "$ssh_port" -m recent --update --seconds 60 --hitcount 4 --name SSH -j DROP
log "SSH access configured with rate limiting"
}
# Configure web server access
configure_web_server() {
local http_port=${1:-80}
local https_port=${2:-443}
log "Configuring web server access..."
# Allow HTTP
iptables -A INPUT -p tcp --dport "$http_port" -m state --state NEW,ESTABLISHED -j ACCEPT
# Allow HTTPS
iptables -A INPUT -p tcp --dport "$https_port" -m state --state NEW,ESTABLISHED -j ACCEPT
log "Web server access configured"
}
# Configure database access
configure_database() {
local db_port=${1:-3306}
local allowed_ips=${2:-"127.0.0.1"}
log "Configuring database access..."
# Allow database access from specific IPs
IFS=',' read -ra IP_ARRAY <<< "$allowed_ips"
for ip in "${IP_ARRAY[@]}"; do
iptables -A INPUT -p tcp --dport "$db_port" -s "$ip" -m state --state NEW,ESTABLISHED -j ACCEPT
done
log "Database access configured for IPs: $allowed_ips"
}
# Configure fail2ban
configure_fail2ban() {
log "Configuring fail2ban..."
# Create fail2ban configuration
cat > /etc/fail2ban/jail.local << EOF
[DEFAULT]
bantime = 3600
findtime = 600
maxretry = 3
[sshd]
enabled = true
port = ssh
filter = sshd
logpath = /var/log/auth.log
maxretry = 3
[nginx-http-auth]
enabled = true
filter = nginx-http-auth
port = http,https
logpath = /var/log/nginx/error.log
[apache-auth]
enabled = true
filter = apache-auth
port = http,https
logpath = /var/log/apache2/error.log
EOF
# Start and enable fail2ban
systemctl enable fail2ban
systemctl start fail2ban
log "Fail2ban configured and started"
}
# Save rules
save_rules() {
log "Saving iptables rules..."
# Create directory if it doesn't exist
mkdir -p "$(dirname "$IPTABLES_RULES")"
# Save rules
iptables-save > "$IPTABLES_RULES"
# Make rules persistent
if command -v iptables-persistent &> /dev/null; then
iptables-persistent save
elif command -v systemctl &> /dev/null; then
systemctl enable iptables
systemctl start iptables
fi
log "Rules saved and made persistent"
}
# Test firewall
test_firewall() {
log "Testing firewall configuration..."
# Test basic connectivity
if ping -c 1 8.8.8.8 &> /dev/null; then
log "${GREEN}External connectivity: OK${NC}"
else
log "${YELLOW}Warning: External connectivity test failed${NC}"
fi
# Test SSH (if configured)
if iptables -L INPUT | grep -q "tcp dpt:ssh"; then
log "${GREEN}SSH access: Configured${NC}"
else
log "${YELLOW}Warning: SSH access not configured${NC}"
fi
# Show current rules
log "Current iptables rules:"
iptables -L -n -v
}
# Main function
main() {
log "Starting firewall setup..."
# Check if running as root
if [[ $EUID -ne 0 ]]; then
error_exit "This script must be run as root"
fi
backup_rules
install_packages
configure_basic_rules
configure_ssh
configure_web_server
configure_database
configure_fail2ban
save_rules
test_firewall
log "${GREEN}Firewall setup completed successfully${NC}"
}
# Run main function
main "$@"Log Analysis Script
#!/usr/bin/env python3
# automation/monitoring/log_analyzer.py
import re
import json
import argparse
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import os
import sys
class LogAnalyzer:
def __init__(self):
self.log_patterns = {
'ssh': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*sshd.*(?:Failed password|Accepted password).*from (\d+\.\d+\.\d+\.\d+)',
'apache': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*"(\d{3})".*"([^"]*)"',
'nginx': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*"(\d{3})".*"([^"]*)"',
'system': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*(?:error|warning|critical)',
'kernel': r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*kernel.*(?:error|warning|panic)'
}
def parse_log_file(self, log_file, log_type):
"""Parse log file and extract relevant information"""
if not os.path.exists(log_file):
print(f"Error: Log file {log_file} not found")
return []
pattern = self.log_patterns.get(log_type)
if not pattern:
print(f"Error: Unknown log type {log_type}")
return []
entries = []
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
match = re.search(pattern, line)
if match:
entries.append({
'timestamp': match.group(1),
'line_number': line_num,
'raw_line': line.strip(),
'matches': match.groups()
})
except Exception as e:
print(f"Error reading log file: {e}")
return []
return entries
def analyze_ssh_logs(self, entries):
"""Analyze SSH log entries for security issues"""
analysis = {
'total_attempts': len(entries),
'failed_attempts': 0,
'successful_logins': 0,
'suspicious_ips': defaultdict(int),
'time_patterns': defaultdict(int),
'recommendations': []
}
for entry in entries:
line = entry['raw_line']
timestamp = entry['timestamp']
if 'Failed password' in line:
analysis['failed_attempts'] += 1
ip_match = re.search(r'from (\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
ip = ip_match.group(1)
analysis['suspicious_ips'][ip] += 1
elif 'Accepted password' in line:
analysis['successful_logins'] += 1
# Extract hour for time pattern analysis
hour = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').hour
analysis['time_patterns'][hour] += 1
# Generate recommendations
if analysis['failed_attempts'] > 100:
analysis['recommendations'].append("High number of failed SSH attempts detected")
suspicious_ips = [ip for ip, count in analysis['suspicious_ips'].items() if count > 10]
if suspicious_ips:
analysis['recommendations'].append(f"Suspicious IPs detected: {', '.join(suspicious_ips)}")
return analysis
def analyze_web_logs(self, entries, log_type):
"""Analyze web server log entries"""
analysis = {
'total_requests': len(entries),
'status_codes': Counter(),
'user_agents': Counter(),
'ip_addresses': Counter(),
'error_requests': 0,
'recommendations': []
}
for entry in entries:
matches = entry['matches']
if len(matches) >= 3:
status_code = matches[1]
user_agent = matches[2] if len(matches) > 2 else 'Unknown'
analysis['status_codes'][status_code] += 1
analysis['user_agents'][user_agent] += 1
# Extract IP from raw line
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', entry['raw_line'])
if ip_match:
analysis['ip_addresses'][ip_match.group(1)] += 1
# Count error requests
if status_code.startswith('4') or status_code.startswith('5'):
analysis['error_requests'] += 1
# Generate recommendations
error_rate = (analysis['error_requests'] / analysis['total_requests']) * 100
if error_rate > 10:
analysis['recommendations'].append(f"High error rate: {error_rate:.1f}%")
# Check for suspicious user agents
suspicious_agents = [ua for ua, count in analysis['user_agents'].items()
if 'bot' in ua.lower() or 'crawler' in ua.lower()]
if suspicious_agents:
analysis['recommendations'].append("Suspicious user agents detected")
return analysis
def generate_report(self, analyses, output_file):
"""Generate comprehensive log analysis report"""
report = {
'generated_at': datetime.now().isoformat(),
'analyses': analyses
}
# Generate HTML report
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Log Analysis Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.section {{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; }}
.warning {{ background-color: #fff3cd; border-color: #ffeaa7; }}
.error {{ background-color: #f8d7da; border-color: #f5c6cb; }}
.success {{ background-color: #d4edda; border-color: #c3e6cb; }}
table {{ border-collapse: collapse; width: 100%; margin: 10px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>Log Analysis Report</h1>
<p>Generated: {datetime.now()}</p>
"""
for log_type, analysis in analyses.items():
html_content += f"""
<div class="section">
<h2>{log_type.upper()} Analysis</h2>
"""
if log_type == 'ssh':
html_content += f"""
<p><strong>Total Attempts:</strong> {analysis['total_attempts']}</p>
<p><strong>Failed Attempts:</strong> {analysis['failed_attempts']}</p>
<p><strong>Successful Logins:</strong> {analysis['successful_logins']}</p>
<h3>Top Suspicious IPs</h3>
<table>
<tr><th>IP Address</th><th>Attempts</th></tr>
"""
for ip, count in sorted(analysis['suspicious_ips'].items(),
key=lambda x: x[1], reverse=True)[:10]:
html_content += f"<tr><td>{ip}</td><td>{count}</td></tr>"
html_content += "</table>"
elif log_type in ['apache', 'nginx']:
html_content += f"""
<p><strong>Total Requests:</strong> {analysis['total_requests']}</p>
<p><strong>Error Requests:</strong> {analysis['error_requests']}</p>
<h3>Status Code Distribution</h3>
<table>
<tr><th>Status Code</th><th>Count</th></tr>
"""
for status, count in analysis['status_codes'].most_common():
html_content += f"<tr><td>{status}</td><td>{count}</td></tr>"
html_content += "</table>"
# Add recommendations
if analysis['recommendations']:
html_content += "<h3>Recommendations</h3><ul>"
for rec in analysis['recommendations']:
html_content += f"<li>{rec}</li>"
html_content += "</ul>"
html_content += "</div>"
html_content += """
</body>
</html>
"""
with open(output_file, 'w') as f:
f.write(html_content)
print(f"Report generated: {output_file}")
def main():
parser = argparse.ArgumentParser(description='Log Analysis Tool')
parser.add_argument('--log-file', required=True, help='Log file to analyze')
parser.add_argument('--log-type', required=True,
choices=['ssh', 'apache', 'nginx', 'system', 'kernel'],
help='Type of log file')
parser.add_argument('--output', default='log_analysis_report.html',
help='Output report file')
args = parser.parse_args()
analyzer = LogAnalyzer()
# Parse log file
entries = analyzer.parse_log_file(args.log_file, args.log_type)
if not entries:
print("No relevant entries found in log file")
return
# Analyze entries
if args.log_type == 'ssh':
analysis = analyzer.analyze_ssh_logs(entries)
elif args.log_type in ['apache', 'nginx']:
analysis = analyzer.analyze_web_logs(entries, args.log_type)
else:
print(f"Analysis for {args.log_type} not implemented yet")
return
# Generate report
analyzer.generate_report({args.log_type: analysis}, args.output)
if __name__ == '__main__':
main()Lessons Learned
System Administration
- Automation: Comprehensive automation of routine tasks
- Monitoring: Real-time system monitoring and alerting
- Security: Security hardening and vulnerability management
- Backup: Reliable backup and recovery strategies
Infrastructure Management
- Configuration Management: Consistent system configuration
- Service Management: Proper service configuration and management
- Network Security: Firewall and network security implementation
- Log Management: Comprehensive log analysis and monitoring
DevOps Practices
- Infrastructure as Code: Scripted infrastructure management
- Monitoring: Proactive monitoring and alerting
- Security: Security-first approach to system administration
- Documentation: Comprehensive documentation and procedures
Future Enhancements
Advanced Features
- Container Orchestration: Kubernetes and Docker management
- Cloud Integration: AWS, Azure, and GCP integration
- Configuration Management: Ansible and Puppet integration
- Monitoring: Prometheus and Grafana integration
Security Improvements
- Intrusion Detection: Advanced intrusion detection systems
- Vulnerability Management: Automated vulnerability scanning
- Compliance: Automated compliance checking
- Incident Response: Automated incident response procedures
Conclusion
The System Administration project demonstrates comprehensive Linux system administration skills and infrastructure management expertise. Key achievements include:
- Automation: Comprehensive automation of system administration tasks
- Monitoring: Real-time monitoring and alerting systems
- Security: Security hardening and vulnerability management
- Backup: Reliable backup and recovery strategies
- Documentation: Comprehensive documentation and procedures
- Best Practices: Implementation of industry best practices
The project is available on GitHub and serves as a comprehensive example of Linux system administration and infrastructure management.
This project represents my comprehensive approach to system administration and demonstrates how automation, monitoring, and security can be integrated to create robust, maintainable infrastructure. The lessons learned here continue to influence my approach to infrastructure management and DevOps practices.