Python Web Scraping: Automated Data Collection and Analysis
Python Web Scraping: Automated Data Collection and Analysis
In this post, I’ll share insights from my Python Web Scraping project, which demonstrates advanced web scraping techniques, data collection automation, parsing strategies, and comprehensive data analysis using Python libraries and database integration.
Project Overview
The Python Web Scraping project is a comprehensive data collection system that automates the extraction of information from various websites, processes and cleans the data, and stores it in databases for analysis. Built with Python, it showcases modern web scraping techniques, anti-detection measures, and robust data processing pipelines.
Technical Architecture
Project Structure
WebScrapingProject/
├── src/
│ ├── scrapers/
│ │ ├── base_scraper.py
│ │ ├── news_scraper.py
│ │ ├── ecommerce_scraper.py
│ │ ├── social_media_scraper.py
│ │ └── job_board_scraper.py
│ ├── parsers/
│ │ ├── html_parser.py
│ │ ├── json_parser.py
│ │ ├── xml_parser.py
│ │ └── csv_parser.py
│ ├── data_processing/
│ │ ├── data_cleaner.py
│ │ ├── data_validator.py
│ │ ├── data_transformer.py
│ │ └── data_analyzer.py
│ ├── storage/
│ │ ├── database_manager.py
│ │ ├── file_manager.py
│ │ ├── cache_manager.py
│ │ └── backup_manager.py
│ ├── utils/
│ │ ├── logger.py
│ │ ├── config_manager.py
│ │ ├── proxy_manager.py
│ │ ├── user_agent_manager.py
│ │ └── rate_limiter.py
│ └── api/
│ ├── rest_api.py
│ ├── graphql_api.py
│ └── webhook_handler.py
├── config/
│ ├── settings.py
│ ├── database_config.py
│ └── scraping_config.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── data/
│ ├── raw/
│ ├── processed/
│ └── exports/
├── logs/
├── requirements.txt
├── README.md
└── main.pyCore Implementation
Base Scraper Class
# src/scrapers/base_scraper.py
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import random
from typing import List, Dict, Optional, Any
from abc import ABC, abstractmethod
import logging
class BaseScraper(ABC):
"""
Base class for all web scrapers providing common functionality
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.session = requests.Session()
self.driver = None
self.logger = logging.getLogger(self.__class__.__name__)
# Setup session headers
self._setup_session()
def _setup_session(self):
"""Setup session with random user agents and headers"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
self.session.headers.update({
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
def setup_selenium_driver(self, headless: bool = True) -> webdriver.Chrome:
"""Setup Selenium WebDriver with anti-detection measures"""
chrome_options = Options()
if headless:
chrome_options.add_argument('--headless')
# Anti-detection measures
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Random window size
width = random.randint(1200, 1920)
height = random.randint(800, 1080)
chrome_options.add_argument(f'--window-size={width},{height}')
driver = webdriver.Chrome(options=chrome_options)
# Execute script to remove webdriver property
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
def get_page_content(self, url: str, use_selenium: bool = False) -> Optional[str]:
"""Get page content using requests or Selenium"""
try:
if use_selenium:
return self._get_content_with_selenium(url)
else:
return self._get_content_with_requests(url)
except Exception as e:
self.logger.error(f"Error getting content from {url}: {str(e)}")
return None
def _get_content_with_requests(self, url: str) -> Optional[str]:
"""Get content using requests library"""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.text
except requests.RequestException as e:
self.logger.error(f"Requests error for {url}: {str(e)}")
return None
def _get_content_with_selenium(self, url: str) -> Optional[str]:
"""Get content using Selenium WebDriver"""
try:
if not self.driver:
self.driver = self.setup_selenium_driver()
self.driver.get(url)
# Wait for page to load
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Random delay to mimic human behavior
time.sleep(random.uniform(1, 3))
return self.driver.page_source
except Exception as e:
self.logger.error(f"Selenium error for {url}: {str(e)}")
return None
def parse_html(self, html_content: str) -> BeautifulSoup:
"""Parse HTML content with BeautifulSoup"""
return BeautifulSoup(html_content, 'html.parser')
def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""Extract all links from parsed HTML"""
links = []
for link in soup.find_all('a', href=True):
href = link['href']
if href.startswith('http'):
links.append(href)
elif href.startswith('/'):
links.append(base_url + href)
elif not href.startswith('#'):
links.append(base_url + '/' + href)
return list(set(links)) # Remove duplicates
def wait_random_time(self, min_seconds: float = 1.0, max_seconds: float = 3.0):
"""Wait for random time to avoid detection"""
time.sleep(random.uniform(min_seconds, max_seconds))
def rotate_proxy(self):
"""Rotate proxy if available"""
if 'proxies' in self.config and self.config['proxies']:
proxy = random.choice(self.config['proxies'])
self.session.proxies.update(proxy)
self.logger.info(f"Rotated to proxy: {proxy}")
def cleanup(self):
"""Cleanup resources"""
if self.driver:
self.driver.quit()
self.driver = None
@abstractmethod
def scrape(self, url: str) -> List[Dict[str, Any]]:
"""Abstract method to be implemented by subclasses"""
pass
@abstractmethod
def parse_data(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
"""Abstract method to parse scraped data"""
passNews Scraper Implementation
# src/scrapers/news_scraper.py
from typing import List, Dict, Any
from datetime import datetime
import re
from .base_scraper import BaseScraper
class NewsScraper(BaseScraper):
"""
Scraper for news websites with article extraction
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.article_selectors = config.get('article_selectors', {})
def scrape(self, url: str) -> List[Dict[str, Any]]:
"""Scrape news articles from the given URL"""
self.logger.info(f"Scraping news from: {url}")
html_content = self.get_page_content(url, use_selenium=True)
if not html_content:
return []
soup = self.parse_html(html_content)
articles = self.parse_data(soup)
# Extract additional article details
detailed_articles = []
for article in articles:
if article.get('link'):
detailed_article = self.scrape_article_details(article['link'])
if detailed_article:
article.update(detailed_article)
detailed_articles.append(article)
return detailed_articles
def parse_data(self, soup) -> List[Dict[str, Any]]:
"""Parse news articles from HTML"""
articles = []
# Find article containers
article_containers = soup.find_all(
self.article_selectors.get('container_tag', 'article'),
class_=self.article_selectors.get('container_class')
)
for container in article_containers:
try:
article_data = self._extract_article_data(container)
if article_data:
articles.append(article_data)
except Exception as e:
self.logger.error(f"Error parsing article: {str(e)}")
continue
return articles
def _extract_article_data(self, container) -> Dict[str, Any]:
"""Extract article data from container element"""
article_data = {}
# Extract title
title_element = container.find(
self.article_selectors.get('title_tag', 'h2'),
class_=self.article_selectors.get('title_class')
)
if title_element:
article_data['title'] = title_element.get_text(strip=True)
# Extract link
link_element = container.find('a', href=True)
if link_element:
href = link_element['href']
if href.startswith('/'):
href = self.config.get('base_url', '') + href
article_data['link'] = href
# Extract summary/description
summary_element = container.find(
self.article_selectors.get('summary_tag', 'p'),
class_=self.article_selectors.get('summary_class')
)
if summary_element:
article_data['summary'] = summary_element.get_text(strip=True)
# Extract publish date
date_element = container.find(
self.article_selectors.get('date_tag', 'time'),
class_=self.article_selectors.get('date_class')
)
if date_element:
date_text = date_element.get_text(strip=True)
article_data['publish_date'] = self._parse_date(date_text)
# Extract author
author_element = container.find(
self.article_selectors.get('author_tag', 'span'),
class_=self.article_selectors.get('author_class')
)
if author_element:
article_data['author'] = author_element.get_text(strip=True)
# Extract category/tags
category_element = container.find(
self.article_selectors.get('category_tag', 'span'),
class_=self.article_selectors.get('category_class')
)
if category_element:
article_data['category'] = category_element.get_text(strip=True)
return article_data
def scrape_article_details(self, article_url: str) -> Dict[str, Any]:
"""Scrape detailed content from individual article page"""
self.logger.info(f"Scraping article details from: {article_url}")
html_content = self.get_page_content(article_url, use_selenium=True)
if not html_content:
return {}
soup = self.parse_html(html_content)
details = {}
# Extract full article content
content_element = soup.find(
self.article_selectors.get('content_tag', 'div'),
class_=self.article_selectors.get('content_class')
)
if content_element:
# Remove script and style elements
for script in content_element(["script", "style"]):
script.decompose()
details['content'] = content_element.get_text(strip=True)
# Extract images
images = []
for img in soup.find_all('img'):
src = img.get('src')
if src:
if src.startswith('/'):
src = self.config.get('base_url', '') + src
images.append(src)
details['images'] = images
# Extract tags
tags = []
tag_elements = soup.find_all(
self.article_selectors.get('tag_tag', 'a'),
class_=self.article_selectors.get('tag_class')
)
for tag in tag_elements:
tags.append(tag.get_text(strip=True))
details['tags'] = tags
return details
def _parse_date(self, date_text: str) -> Optional[datetime]:
"""Parse various date formats"""
date_patterns = [
r'(\d{4}-\d{2}-\d{2})',
r'(\d{2}/\d{2}/\d{4})',
r'(\d{1,2}\s+\w+\s+\d{4})',
r'(\w+\s+\d{1,2},\s+\d{4})'
]
for pattern in date_patterns:
match = re.search(pattern, date_text)
if match:
try:
return datetime.strptime(match.group(1), '%Y-%m-%d')
except ValueError:
try:
return datetime.strptime(match.group(1), '%m/%d/%Y')
except ValueError:
continue
return NoneE-commerce Scraper
# src/scrapers/ecommerce_scraper.py
from typing import List, Dict, Any
import re
from .base_scraper import BaseScraper
class EcommerceScraper(BaseScraper):
"""
Scraper for e-commerce websites with product extraction
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.product_selectors = config.get('product_selectors', {})
def scrape(self, url: str) -> List[Dict[str, Any]]:
"""Scrape products from the given URL"""
self.logger.info(f"Scraping products from: {url}")
html_content = self.get_page_content(url, use_selenium=True)
if not html_content:
return []
soup = self.parse_html(html_content)
products = self.parse_data(soup)
return products
def parse_data(self, soup) -> List[Dict[str, Any]]:
"""Parse products from HTML"""
products = []
# Find product containers
product_containers = soup.find_all(
self.product_selectors.get('container_tag', 'div'),
class_=self.product_selectors.get('container_class')
)
for container in product_containers:
try:
product_data = self._extract_product_data(container)
if product_data:
products.append(product_data)
except Exception as e:
self.logger.error(f"Error parsing product: {str(e)}")
continue
return products
def _extract_product_data(self, container) -> Dict[str, Any]:
"""Extract product data from container element"""
product_data = {}
# Extract product name
name_element = container.find(
self.product_selectors.get('name_tag', 'h3'),
class_=self.product_selectors.get('name_class')
)
if name_element:
product_data['name'] = name_element.get_text(strip=True)
# Extract price
price_element = container.find(
self.product_selectors.get('price_tag', 'span'),
class_=self.product_selectors.get('price_class')
)
if price_element:
price_text = price_element.get_text(strip=True)
product_data['price'] = self._extract_price(price_text)
# Extract original price (if on sale)
original_price_element = container.find(
self.product_selectors.get('original_price_tag', 'span'),
class_=self.product_selectors.get('original_price_class')
)
if original_price_element:
original_price_text = original_price_element.get_text(strip=True)
product_data['original_price'] = self._extract_price(original_price_text)
# Extract product link
link_element = container.find('a', href=True)
if link_element:
href = link_element['href']
if href.startswith('/'):
href = self.config.get('base_url', '') + href
product_data['link'] = href
# Extract image
img_element = container.find('img')
if img_element:
src = img_element.get('src') or img_element.get('data-src')
if src:
if src.startswith('/'):
src = self.config.get('base_url', '') + src
product_data['image'] = src
# Extract rating
rating_element = container.find(
self.product_selectors.get('rating_tag', 'span'),
class_=self.product_selectors.get('rating_class')
)
if rating_element:
rating_text = rating_element.get_text(strip=True)
product_data['rating'] = self._extract_rating(rating_text)
# Extract availability
availability_element = container.find(
self.product_selectors.get('availability_tag', 'span'),
class_=self.product_selectors.get('availability_class')
)
if availability_element:
product_data['availability'] = availability_element.get_text(strip=True)
return product_data
def _extract_price(self, price_text: str) -> Optional[float]:
"""Extract numeric price from text"""
# Remove currency symbols and extract number
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if price_match:
try:
return float(price_match.group())
except ValueError:
return None
return None
def _extract_rating(self, rating_text: str) -> Optional[float]:
"""Extract numeric rating from text"""
rating_match = re.search(r'(\d+\.?\d*)', rating_text)
if rating_match:
try:
return float(rating_match.group(1))
except ValueError:
return None
return NoneData Processing Pipeline
# src/data_processing/data_cleaner.py
import re
import pandas as pd
from typing import List, Dict, Any, Optional
import logging
class DataCleaner:
"""
Data cleaning and preprocessing utilities
"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def clean_text(self, text: str) -> str:
"""Clean and normalize text data"""
if not text:
return ""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?;:\-()]', '', text)
# Normalize quotes
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
return text.strip()
def clean_price(self, price_text: str) -> Optional[float]:
"""Clean and extract numeric price"""
if not price_text:
return None
# Remove currency symbols and extract number
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if price_match:
try:
return float(price_match.group())
except ValueError:
return None
return None
def clean_date(self, date_text: str) -> Optional[str]:
"""Clean and standardize date format"""
if not date_text:
return None
# Common date patterns
date_patterns = [
r'(\d{4}-\d{2}-\d{2})',
r'(\d{2}/\d{2}/\d{4})',
r'(\d{1,2}\s+\w+\s+\d{4})',
r'(\w+\s+\d{1,2},\s+\d{4})'
]
for pattern in date_patterns:
match = re.search(pattern, date_text)
if match:
return match.group(1)
return None
def clean_url(self, url: str, base_url: str = "") -> str:
"""Clean and normalize URLs"""
if not url:
return ""
# Handle relative URLs
if url.startswith('/'):
url = base_url + url
elif not url.startswith('http'):
url = base_url + '/' + url
# Remove fragments and query parameters if needed
# url = url.split('#')[0].split('?')[0]
return url
def remove_duplicates(self, data: List[Dict[str, Any]], key_field: str) -> List[Dict[str, Any]]:
"""Remove duplicate records based on key field"""
seen = set()
unique_data = []
for item in data:
key_value = item.get(key_field)
if key_value and key_value not in seen:
seen.add(key_value)
unique_data.append(item)
return unique_data
def validate_data(self, data: List[Dict[str, Any]], schema: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Validate data against schema"""
valid_data = []
for item in data:
if self._validate_item(item, schema):
valid_data.append(item)
else:
self.logger.warning(f"Invalid item skipped: {item}")
return valid_data
def _validate_item(self, item: Dict[str, Any], schema: Dict[str, Any]) -> bool:
"""Validate individual item against schema"""
for field, rules in schema.items():
value = item.get(field)
# Check required fields
if rules.get('required', False) and not value:
return False
# Check data type
if value and 'type' in rules:
if not isinstance(value, rules['type']):
return False
# Check value constraints
if value and 'min_length' in rules:
if len(str(value)) < rules['min_length']:
return False
if value and 'max_length' in rules:
if len(str(value)) > rules['max_length']:
return False
return True
def normalize_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Normalize data structure and values"""
normalized_data = []
for item in data:
normalized_item = {}
for key, value in item.items():
# Normalize key names
normalized_key = self._normalize_key(key)
# Normalize values based on type
normalized_value = self._normalize_value(value)
normalized_item[normalized_key] = normalized_value
normalized_data.append(normalized_item)
return normalized_data
def _normalize_key(self, key: str) -> str:
"""Normalize field names"""
# Convert to lowercase and replace spaces with underscores
normalized = re.sub(r'[^a-zA-Z0-9_]', '_', key.lower())
normalized = re.sub(r'_+', '_', normalized)
return normalized.strip('_')
def _normalize_value(self, value: Any) -> Any:
"""Normalize field values"""
if isinstance(value, str):
return self.clean_text(value)
elif isinstance(value, (int, float)):
return value
elif isinstance(value, list):
return [self._normalize_value(item) for item in value]
elif isinstance(value, dict):
return {self._normalize_key(k): self._normalize_value(v) for k, v in value.items()}
else:
return valueDatabase Integration
# src/storage/database_manager.py
import sqlite3
import psycopg2
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime
class DatabaseManager:
"""
Database manager for storing scraped data
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
self.connection = None
def connect(self) -> bool:
"""Connect to database"""
try:
db_type = self.config.get('type', 'sqlite')
if db_type == 'sqlite':
self.connection = sqlite3.connect(self.config['database'])
elif db_type == 'postgresql':
self.connection = psycopg2.connect(
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config['user'],
password=self.config['password']
)
else:
raise ValueError(f"Unsupported database type: {db_type}")
self.logger.info(f"Connected to {db_type} database")
return True
except Exception as e:
self.logger.error(f"Database connection failed: {str(e)}")
return False
def create_tables(self) -> bool:
"""Create necessary tables"""
try:
cursor = self.connection.cursor()
# Create scraped_data table
cursor.execute("""
CREATE TABLE IF NOT EXISTS scraped_data (
id SERIAL PRIMARY KEY,
source_url TEXT NOT NULL,
data_type TEXT NOT NULL,
data JSONB NOT NULL,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP,
status TEXT DEFAULT 'raw'
)
""")
# Create processing_log table
cursor.execute("""
CREATE TABLE IF NOT EXISTS processing_log (
id SERIAL PRIMARY KEY,
data_id INTEGER REFERENCES scraped_data(id),
operation TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.connection.commit()
self.logger.info("Database tables created successfully")
return True
except Exception as e:
self.logger.error(f"Error creating tables: {str(e)}")
return False
def insert_data(self, source_url: str, data_type: str, data: List[Dict[str, Any]]) -> bool:
"""Insert scraped data into database"""
try:
cursor = self.connection.cursor()
cursor.execute("""
INSERT INTO scraped_data (source_url, data_type, data)
VALUES (%s, %s, %s)
""", (source_url, data_type, json.dumps(data)))
self.connection.commit()
self.logger.info(f"Inserted {len(data)} records for {data_type}")
return True
except Exception as e:
self.logger.error(f"Error inserting data: {str(e)}")
return False
def get_data(self, data_type: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve data from database"""
try:
cursor = self.connection.cursor()
cursor.execute("""
SELECT id, source_url, data, scraped_at
FROM scraped_data
WHERE data_type = %s
ORDER BY scraped_at DESC
LIMIT %s
""", (data_type, limit))
results = []
for row in cursor.fetchall():
results.append({
'id': row[0],
'source_url': row[1],
'data': json.loads(row[2]),
'scraped_at': row[3]
})
return results
except Exception as e:
self.logger.error(f"Error retrieving data: {str(e)}")
return []
def update_status(self, data_id: int, status: str) -> bool:
"""Update data processing status"""
try:
cursor = self.connection.cursor()
cursor.execute("""
UPDATE scraped_data
SET status = %s, processed_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (status, data_id))
self.connection.commit()
return True
except Exception as e:
self.logger.error(f"Error updating status: {str(e)}")
return False
def close(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.logger.info("Database connection closed")Lessons Learned
Web Scraping Techniques
- Anti-Detection: Implementing measures to avoid bot detection
- Rate Limiting: Proper rate limiting to respect website resources
- Error Handling: Comprehensive error handling and retry mechanisms
- Data Validation: Robust data validation and cleaning
Python Development
- Object-Oriented Design: Clean class hierarchies and abstractions
- Async Programming: Asynchronous operations for better performance
- Data Processing: Efficient data processing and transformation
- Database Integration: Multiple database support and optimization
Data Management
- Data Cleaning: Comprehensive data cleaning and normalization
- Data Validation: Schema validation and data quality assurance
- Storage Optimization: Efficient data storage and retrieval
- Backup Strategies: Data backup and recovery mechanisms
System Architecture
- Modular Design: Clean separation of concerns
- Configuration Management: Flexible configuration system
- Logging: Comprehensive logging and monitoring
- Testing: Unit and integration testing strategies
Future Enhancements
Advanced Features
- Machine Learning: ML-based data extraction and analysis
- Real-time Processing: Real-time data processing pipelines
- API Integration: RESTful API for data access
- Visualization: Data visualization and dashboard
Technical Improvements
- Performance: Optimized scraping performance
- Scalability: Distributed scraping architecture
- Monitoring: Advanced monitoring and alerting
- Security: Enhanced security measures
Conclusion
The Python Web Scraping project demonstrates comprehensive web scraping expertise and data processing capabilities. Key achievements include:
- Web Scraping: Advanced scraping techniques with anti-detection
- Data Processing: Comprehensive data cleaning and validation
- Database Integration: Multiple database support
- Error Handling: Robust error handling and recovery
- Testing: Comprehensive testing framework
- Documentation: Clear documentation and examples
The project is available on GitHub and serves as a comprehensive example of Python web scraping and data processing.
This project represents my deep dive into web scraping and data collection, demonstrating how Python can be used to build robust, efficient data collection systems. The lessons learned here continue to influence my approach to data engineering and automation.