Batch Scraping: Trafilatura + 16Yun Proxies in Production
Architecture for large-scale concurrent scraping with proxy rotation, retry strategies, error handling, and storage pipelines.
16Yun Engineering TeamApr 27, 20261 min read
From Single Page to Scale
Single-page extraction is the foundation. Production requires a scalable, fault-tolerant, monitorable pipeline. This guide provides a ready-to-use architecture.
Architecture
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ URL Discovery │ │ Download Queue │ │ Storage │
│ Sitemap/Feed │ ──→ │ Trafilatura │ ──→ │ JSON/Markdown │
│ URL list file │ │ + Proxy Rotation│ │ Database │
└─────────────────┘ └──────────────────┘ └────────────────┘
│
┌──────┴──────┐
│ Error │
│ Handling │
│ Retry/Skip │
│ Logging │
└─────────────┘
Concurrency with Proxy Rotation
Using ThreadPoolExecutor
import trafilatura
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
PROXY = {
"http": "http://user:pass@proxy.16yun.cn:8888",
"https": "http://user:pass@proxy.16yun.cn:8888",
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
def process_url(url):
"""Download and extract a single article"""
try:
resp = requests.get(url, proxies=PROXY, headers=HEADERS, timeout=20)
resp.encoding = "utf-8"
if resp.status_code != 200:
return {"url": url, "status": resp.status_code, "error": f"HTTP {resp.status_code}"}
result = trafilatura.extract(
resp.text, output_format="json", with_metadata=True, include_tables=True,
)
if result:
data = json.loads(result)
data["url"] = url
data["status"] = 200
return data
else:
return {"url": url, "status": 0, "error": "empty extraction"}
except Exception as e:
return {"url": url, "status": 0, "error": str(e)}
urls = ["https://example.com/article-1", "https://example.com/article-2"]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(process_url, url): url for url in urls}
results = [future.result() for future in as_completed(futures)]
success = [r for r in results if r.get("status") == 200]
failed = [r for r in results if r.get("status") != 200]
print(f"Success: {len(success)}, Failed: {len(failed)}")
with open("articles.jsonl", "w") as f:
for article in success:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
API Proxy with Dynamic IP Pool
import requests
import random
def refresh_proxy_pool():
api_url = "http://ip.16yun.cn:817/myip/pl/xxx/?s=xxx&u=user&format=json&count=50"
resp = requests.get(api_url)
proxy_list = resp.json()
return [
{"http": f"http://user:pass@{p['ip']}:{p['port']}",
"https": f"http://user:pass@{p['ip']}:{p['port']}"}
for p in proxy_list
]
proxy_pool = refresh_proxy_pool()
def process_with_pool(url, pool):
proxies = random.choice(pool)
try:
resp = requests.get(url, proxies=proxies, timeout=20)
resp.encoding = "utf-8"
result = trafilatura.extract(resp.text, output_format="markdown", with_metadata=True)
return {"url": url, "success": True, "content": result}
except Exception as e:
return {"url": url, "success": False, "error": str(e)}
Error Handling with Retry
import time
from functools import wraps
def retry(max_retries=3, delay=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
if result.get("success"):
return result
except Exception:
pass
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1))
return {"success": False, "error": "all retries failed"}
return wrapper
return decorator
@retry(max_retries=3, delay=2)
def fetch_with_retry(url, proxies):
resp = requests.get(url, proxies=proxies, timeout=20)
resp.encoding = "utf-8"
if resp.status_code == 429:
raise Exception("rate_limited")
elif resp.status_code == 504:
raise Exception("gateway_timeout")
result = trafilatura.extract(resp.text, output_format="json", with_metadata=True)
return {"success": True, "data": result}
Status Code Strategy
| Status | Meaning | Action |
|---|---|---|
| 200 | OK | Extract |
| 407 | Proxy auth failed | Change proxy, verify credentials |
| 429 | Rate limited | Reduce concurrency, backoff |
| 504 | Target timeout | Retry 2-3 times, skip persistent failures |
| 403 | Blocked | Change IP, check User-Agent |
Storage Options
JSONL Output
import json
def save_jsonl(articles, filepath):
with open(filepath, "a", encoding="utf-8") as f:
for article in articles:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
SQLite
import sqlite3, json
def init_db(db_path):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
url TEXT PRIMARY KEY, title TEXT, author TEXT,
date TEXT, content TEXT, categories TEXT, tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
return conn
def save_article(conn, article):
conn.execute("""
INSERT OR REPLACE INTO articles (url, title, author, date, content, categories, tags)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
article.get("url"), article.get("title"), article.get("author"),
article.get("date"), article.get("text"),
json.dumps(article.get("categories", [])),
json.dumps(article.get("tags", [])),
))
conn.commit()
Production Checklist
□ Concurrency: 3-10 workers depending on proxy quota and target limits
□ Proxy strategy:
· Crawler Proxy (tunnel) — simple, good for medium scale
· API Proxy (IP pool) — flexible, good for large multi-site scraping
· Dedicated Proxy — stable, good for fixed-exit scenarios
□ Retry: 3 attempts with exponential backoff (1s/2s/4s)
□ Timeout: connect=10s, read=20s
□ Rate limit: max 2 QPS per IP
□ Checkpoint: track processed URLs for crash recovery
□ Output: JSONL for programmatic use, Markdown for human reading
CLI Batch Processing
trafilatura --list urls.txt --output-dir ./articles
trafilatura --list urls.txt --output-dir ./articles --parallel 3
Need an enterprise proxy plan?
We can tailor architecture to your target domains, concurrency, and reliability goals.