批量采集:Trafilatura + 亿牛云代理的生产级方案
大规模并发采集的架构设计、并发控制、错误处理与代理轮换策略,对接亿牛云各产品实现稳定生产管线。
亿牛云技术团队2026年4月27日1 分钟阅读
从单页到批量
单页提取是基础,生产环境需要的是可扩展、可容错、可监控的批量管线。本章从架构出发,给出直接可用的批量采集方案。
架构设计
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ URL 发现 │ │ 下载队列 │ │ 存储 │
│ Sitemap/Feed │ ──→ │ Trafilatura │ ──→ │ JSON/Markdown │
│ URL 列表文件 │ │ + 代理轮换 │ │ 数据库 │
└─────────────────┘ └──────────────────┘ └────────────────┘
│
┌──────┴──────┐
│ 错误处理 │
│ 重试/跳过/ │
│ 记录日志 │
└─────────────┘
并发控制与代理轮换
使用 ThreadPoolExecutor 实现并发
import trafilatura
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
# 配置代理 - 以爬虫代理隧道为例
PROXY = {
"http": "http://user:pass@proxy.16yun.cn:8888",
"https": "http://user:pass@proxy.16yun.cn:8888",
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
def process_url(url):
"""下载并提取单篇文章"""
try:
resp = requests.get(
url,
proxies=PROXY,
headers=HEADERS,
timeout=20,
)
resp.encoding = "utf-8"
if resp.status_code != 200:
return {"url": url, "status": resp.status_code, "error": f"HTTP {resp.status_code}"}
result = trafilatura.extract(
resp.text,
output_format="json",
with_metadata=True,
include_tables=True,
)
if result:
data = json.loads(result)
data["url"] = url
data["status"] = 200
return data
else:
return {"url": url, "status": 0, "error": "empty extraction"}
except Exception as e:
return {"url": url, "status": 0, "error": str(e)}
# URL 列表
urls = [
"https://example.com/article-1",
"https://example.com/article-2",
# ...
]
# 并发采集
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(process_url, url): url for url in urls}
for future in as_completed(futures):
result = future.result()
results.append(result)
# 统计
success = [r for r in results if r.get("status") == 200]
failed = [r for r in results if r.get("status") != 200]
print(f"成功: {len(success)}, 失败: {len(failed)}")
# 保存结果
with open("articles.jsonl", "w") as f:
for article in success:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
API 代理 + 动态 IP 池
使用亿牛云 API 代理时,可以维护一个 IP 池,每次请求随机选择:
import requests
import random
# 从 API 代理提取 IP 列表
def refresh_proxy_pool():
api_url = "http://ip.16yun.cn:817/myip/pl/xxx/?s=xxx&u=user&format=json&count=50"
resp = requests.get(api_url)
proxy_list = resp.json()
pool = []
for p in proxy_list:
pool.append({
"http": f"http://user:pass@{p['ip']}:{p['port']}",
"https": f"http://user:pass@{p['ip']}:{p['port']}",
})
return pool
# 每 5 分钟刷新一次 IP 池
proxy_pool = refresh_proxy_pool()
def process_url_with_pool(url, pool):
proxies = random.choice(pool)
try:
resp = requests.get(url, proxies=proxies, timeout=20)
resp.encoding = "utf-8"
result = trafilatura.extract(resp.text, output_format="markdown", with_metadata=True)
return {"url": url, "success": True, "content": result}
except Exception as e:
return {"url": url, "success": False, "error": str(e)}
错误处理与重试策略
生产环境必须有健壮的错误处理。以下是一个带重试的提取器:
import time
from functools import wraps
def retry(max_retries=3, delay=2):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
if result.get("success"):
return result
except Exception as e:
pass
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1)) # 指数退避
return {"success": False, "error": "all retries failed"}
return wrapper
return decorator
@retry(max_retries=3, delay=2)
def fetch_with_retry(url, proxies):
resp = requests.get(url, proxies=proxies, timeout=20)
resp.encoding = "utf-8"
if resp.status_code == 429:
raise Exception("rate_limited")
elif resp.status_code == 504:
raise Exception("gateway_timeout")
elif resp.status_code != 200:
return {"success": False, "error": f"HTTP {resp.status_code}"}
result = trafilatura.extract(resp.text, output_format="json", with_metadata=True)
return {"success": True, "data": result}
常见状态码处理策略
| 状态码 | 含义 | 处理策略 |
|---|---|---|
| 200 | 正常 | 执行提取 |
| 407 | 代理认证失败 | 更换代理,核对凭据 |
| 429 | 请求频率过高 | 降低并发,等待后重试 |
| 504 | 目标站超时 | 重试 2-3 次,跳过持续失败 |
| 403 | 被拦截 | 更换 IP,确认 User-Agent |
存储方案
写入 JSONL
import json
def save_jsonl(articles, filepath):
with open(filepath, "a", encoding="utf-8") as f:
for article in articles:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
写入 SQLite
import sqlite3
def init_db(db_path):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
url TEXT PRIMARY KEY,
title TEXT,
author TEXT,
date TEXT,
content TEXT,
categories TEXT,
tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
return conn
def save_article(conn, article):
conn.execute("""
INSERT OR REPLACE INTO articles (url, title, author, date, content, categories, tags)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
article.get("url"),
article.get("title"),
article.get("author"),
article.get("date"),
article.get("text"),
json.dumps(article.get("categories", [])),
json.dumps(article.get("tags", [])),
))
conn.commit()
生产级配置清单
□ 并发控制: 根据代理配额和目标站限制调整 max_workers(建议 3-10)
□ 代理方案:
· 爬虫代理隧道 — 简单,适合中小规模
· API 代理 IP 池 — 灵活,适合大规模、多站点
· 独享代理 — 稳定,适合固定出口需求
□ 重试策略: 3 次重试 + 指数退避(1s/2s/4s)
□ 超时设置: connect=10s, read=20s
□ 频率控制: 单 IP 不超过 2 QPS(参考值,依目标站调整)
□ 断点续传: 记录已处理的 URL,崩溃后可恢复
□ 输出格式: JSONL 适合后续程序处理,Markdown 适合人工阅读
CLI 批量处理
# 从 URL 列表文件批量提取
trafilatura --list urls.txt --output-dir ./articles
# 限制并发
trafilatura --list urls.txt --output-dir ./articles --parallel 3
需要企业代理方案?
我们可根据目标站点、并发规模与稳定性目标提供定制方案。