批量采集:Trafilatura + 亿牛云代理的生产级方案

大规模并发采集的架构设计、并发控制、错误处理与代理轮换策略,对接亿牛云各产品实现稳定生产管线。

亿牛云技术团队2026年4月27日1 分钟阅读

从单页到批量

单页提取是基础,生产环境需要的是可扩展、可容错、可监控的批量管线。本章从架构出发,给出直接可用的批量采集方案。

架构设计

┌─────────────────┐     ┌──────────────────┐     ┌────────────────┐
│  URL 发现       │     │  下载队列         │     │  存储          │
│  Sitemap/Feed   │ ──→ │  Trafilatura     │ ──→ │  JSON/Markdown │
│  URL 列表文件   │     │  + 代理轮换       │     │  数据库        │
└─────────────────┘     └──────────────────┘     └────────────────┘
                               │
                        ┌──────┴──────┐
                        │  错误处理    │
                        │  重试/跳过/  │
                        │  记录日志    │
                        └─────────────┘

并发控制与代理轮换

使用 ThreadPoolExecutor 实现并发

import trafilatura
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import json

# 配置代理 - 以爬虫代理隧道为例
PROXY = {
    "http": "http://user:pass@proxy.16yun.cn:8888",
    "https": "http://user:pass@proxy.16yun.cn:8888",
}

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

def process_url(url):
    """下载并提取单篇文章"""
    try:
        resp = requests.get(
            url,
            proxies=PROXY,
            headers=HEADERS,
            timeout=20,
        )
        resp.encoding = "utf-8"

        if resp.status_code != 200:
            return {"url": url, "status": resp.status_code, "error": f"HTTP {resp.status_code}"}

        result = trafilatura.extract(
            resp.text,
            output_format="json",
            with_metadata=True,
            include_tables=True,
        )

        if result:
            data = json.loads(result)
            data["url"] = url
            data["status"] = 200
            return data
        else:
            return {"url": url, "status": 0, "error": "empty extraction"}

    except Exception as e:
        return {"url": url, "status": 0, "error": str(e)}

# URL 列表
urls = [
    "https://example.com/article-1",
    "https://example.com/article-2",
    # ...
]

# 并发采集
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(process_url, url): url for url in urls}
    for future in as_completed(futures):
        result = future.result()
        results.append(result)

# 统计
success = [r for r in results if r.get("status") == 200]
failed = [r for r in results if r.get("status") != 200]

print(f"成功: {len(success)}, 失败: {len(failed)}")

# 保存结果
with open("articles.jsonl", "w") as f:
    for article in success:
        f.write(json.dumps(article, ensure_ascii=False) + "\n")

API 代理 + 动态 IP 池

使用亿牛云 API 代理时,可以维护一个 IP 池,每次请求随机选择:

import requests
import random

# 从 API 代理提取 IP 列表
def refresh_proxy_pool():
    api_url = "http://ip.16yun.cn:817/myip/pl/xxx/?s=xxx&u=user&format=json&count=50"
    resp = requests.get(api_url)
    proxy_list = resp.json()
    pool = []
    for p in proxy_list:
        pool.append({
            "http": f"http://user:pass@{p['ip']}:{p['port']}",
            "https": f"http://user:pass@{p['ip']}:{p['port']}",
        })
    return pool

# 每 5 分钟刷新一次 IP 池
proxy_pool = refresh_proxy_pool()

def process_url_with_pool(url, pool):
    proxies = random.choice(pool)
    try:
        resp = requests.get(url, proxies=proxies, timeout=20)
        resp.encoding = "utf-8"
        result = trafilatura.extract(resp.text, output_format="markdown", with_metadata=True)
        return {"url": url, "success": True, "content": result}
    except Exception as e:
        return {"url": url, "success": False, "error": str(e)}

错误处理与重试策略

生产环境必须有健壮的错误处理。以下是一个带重试的提取器:

import time
from functools import wraps

def retry(max_retries=3, delay=2):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    result = func(*args, **kwargs)
                    if result.get("success"):
                        return result
                except Exception as e:
                    pass

                if attempt < max_retries - 1:
                    time.sleep(delay * (attempt + 1))  # 指数退避

            return {"success": False, "error": "all retries failed"}
        return wrapper
    return decorator

@retry(max_retries=3, delay=2)
def fetch_with_retry(url, proxies):
    resp = requests.get(url, proxies=proxies, timeout=20)
    resp.encoding = "utf-8"

    if resp.status_code == 429:
        raise Exception("rate_limited")
    elif resp.status_code == 504:
        raise Exception("gateway_timeout")
    elif resp.status_code != 200:
        return {"success": False, "error": f"HTTP {resp.status_code}"}

    result = trafilatura.extract(resp.text, output_format="json", with_metadata=True)
    return {"success": True, "data": result}

常见状态码处理策略

状态码含义处理策略
200正常执行提取
407代理认证失败更换代理,核对凭据
429请求频率过高降低并发,等待后重试
504目标站超时重试 2-3 次,跳过持续失败
403被拦截更换 IP,确认 User-Agent

存储方案

写入 JSONL

import json

def save_jsonl(articles, filepath):
    with open(filepath, "a", encoding="utf-8") as f:
        for article in articles:
            f.write(json.dumps(article, ensure_ascii=False) + "\n")

写入 SQLite

import sqlite3

def init_db(db_path):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS articles (
            url TEXT PRIMARY KEY,
            title TEXT,
            author TEXT,
            date TEXT,
            content TEXT,
            categories TEXT,
            tags TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    return conn

def save_article(conn, article):
    conn.execute("""
        INSERT OR REPLACE INTO articles (url, title, author, date, content, categories, tags)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (
        article.get("url"),
        article.get("title"),
        article.get("author"),
        article.get("date"),
        article.get("text"),
        json.dumps(article.get("categories", [])),
        json.dumps(article.get("tags", [])),
    ))
    conn.commit()

生产级配置清单

□ 并发控制: 根据代理配额和目标站限制调整 max_workers(建议 3-10)
□ 代理方案:
   · 爬虫代理隧道 — 简单,适合中小规模
   · API 代理 IP 池 — 灵活,适合大规模、多站点
   · 独享代理 — 稳定,适合固定出口需求
□ 重试策略: 3 次重试 + 指数退避(1s/2s/4s)
□ 超时设置: connect=10s, read=20s
□ 频率控制: 单 IP 不超过 2 QPS(参考值,依目标站调整)
□ 断点续传: 记录已处理的 URL,崩溃后可恢复
□ 输出格式: JSONL 适合后续程序处理,Markdown 适合人工阅读

CLI 批量处理

# 从 URL 列表文件批量提取
trafilatura --list urls.txt --output-dir ./articles

# 限制并发
trafilatura --list urls.txt --output-dir ./articles --parallel 3

需要企业代理方案?

我们可根据目标站点、并发规模与稳定性目标提供定制方案。