连接PostgreSQL

openclaw openclaw官方 1

OpenClaw(通常指类似文档抓取或数据处理工具)与PostgreSQL的集成可以通过多种方式实现,以下是常见的方法:

连接PostgreSQL-第1张图片-OpenClaw开源下载|官方OpenClaw下载

数据采集与存储集成

直接JDBC连接

// Java示例
String url = "jdbc:postgresql://localhost:5432/database";
String user = "postgres";
String password = "password";
try (Connection conn = DriverManager.getConnection(url, user, password)) {
    String sql = "INSERT INTO crawled_data(url, content, timestamp) VALUES (?, ?, ?)";
    PreparedStatement pstmt = conn.prepareStatement(sql);
    pstmt.setString(1, "https://example.com");
    pstmt.setString(2, "抓取的内容");
    pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
    pstmt.executeUpdate();
}

Python + psycopg2

import psycopg2
from openclaw import OpenClaw
conn = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="postgres",
    password="password"
)
# OpenClaw抓取数据
claw = OpenClaw()
data = claw.crawl("https://example.com")
# 存储到PostgreSQL
cursor = conn.cursor()
cursor.execute("""
    INSERT INTO crawled_pages 
    (url, title, content, crawl_time) 
    VALUES (%s, %s, %s, NOW())
""", (data['url'], data['title'], data['content']))
conn.commit()

批量处理优化

使用COPY命令批量导入

import psycopg2
from io import StringIO
def bulk_insert_pages(pages_data):
    conn = psycopg2.connect("your_connection_string")
    cursor = conn.cursor()
    # 准备CSV格式数据
    f = StringIO()
    for page in pages_data:
        f.write(f"{page['url']}\t{page['title']}\t{page['content']}\n")
    f.seek(0)
    cursor.copy_from(f, 'crawled_pages', 
                     columns=('url', 'title', 'content'))
    conn.commit()

连接池配置

from psycopg2.pool import ThreadedConnectionPool
# 创建连接池
pool = ThreadedConnectionPool(
    minconn=5,
    maxconn=20,
    host="localhost",
    database="crawl_db",
    user="postgres",
    password="password"
)
# 使用连接
conn = pool.getconn()
# ... 执行操作
pool.putconn(conn)

表结构设计建议

基础表结构

CREATE TABLE crawled_pages (
    id SERIAL PRIMARY KEY,
    url VARCHAR(2048) UNIQUE NOT NULL,
    domain VARCHAR(255),TEXT,
    content TEXT,
    html_content TEXT,
    crawl_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status_code INTEGER,
    content_hash VARCHAR(64),  -- 用于去重
    metadata JSONB
);
-- 创建索引
CREATE INDEX idx_crawled_url ON crawled_pages(url);
CREATE INDEX idx_crawl_time ON crawled_pages(crawl_timestamp);
CREATE INDEX idx_domain ON crawled_pages(domain);

高级表结构(支持增量爬取)

CREATE TABLE crawl_queue (
    id SERIAL PRIMARY KEY,
    url VARCHAR(2048) UNIQUE NOT NULL,
    priority INTEGER DEFAULT 5,
    last_crawled TIMESTAMP,
    crawl_status VARCHAR(20),  -- 'pending', 'success', 'failed'
    retry_count INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE crawl_stats (
    id SERIAL PRIMARY KEY,
    domain VARCHAR(255),
    pages_crawled INTEGER DEFAULT 0,
    avg_response_time FLOAT,
    last_crawl TIMESTAMP
);

异步处理方案

使用asyncpg(Python异步)

import asyncio
import asyncpg
from openclaw.async_crawler import AsyncOpenClaw
async def crawl_and_store():
    # 连接数据库
    conn = await asyncpg.connect('postgresql://user:password@localhost/db')
    # 异步爬取
    claw = AsyncOpenClaw()
    urls = await claw.fetch_urls_to_crawl(limit=100)
    for url in urls:
        data = await claw.crawl(url)
        # 异步存储
        await conn.execute('''
            INSERT INTO crawled_pages(url, content)
            VALUES($1, $2)
            ON CONFLICT (url) DO UPDATE
            SET content = EXCLUDED.content
        ''', data['url'], data['content'])

消息队列集成

OpenClaw → Kafka/RabbitMQ → PostgreSQL消费者

监控与维护

性能监控查询

-- 查看爬取统计
SELECT 
    DATE(crawl_timestamp) as crawl_date,
    COUNT(*) as pages_crawled,
    AVG(LENGTH(content)) as avg_content_size
FROM crawled_pages
GROUP BY DATE(crawl_timestamp)
ORDER BY crawl_date DESC;
-- 查找重复内容
SELECT content_hash, COUNT(*) 
FROM crawled_pages 
GROUP BY content_hash 
HAVING COUNT(*) > 1;

PostgreSQL配置优化

# postgresql.conf 优化建议
shared_buffers = 4GB
work_mem = 64MB
maintenance_work_mem = 1GB
effective_cache_size = 12GB
max_worker_processes = 8
max_parallel_workers_per_gather = 4

完整示例脚本

# complete_integration.py
import psycopg2
import logging
from datetime import datetime
from openclaw import OpenClaw
class PostgreSQLCrawler:
    def __init__(self, db_config):
        self.db_config = db_config
        self.claw = OpenClaw()
        self.logger = logging.getLogger(__name__)
    def connect(self):
        """建立数据库连接"""
        return psycopg2.connect(**self.db_config)
    def crawl_and_store(self, url):
        """抓取并存储单个URL"""
        try:
            # 抓取数据
            data = self.claw.crawl(url)
            # 存储到PostgreSQL
            with self.connect() as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                        INSERT INTO crawled_pages 
                        (url, title, content, domain, crawl_timestamp)
                        VALUES (%s, %s, %s, %s, %s)
                        ON CONFLICT (url) DO UPDATE SET
                            title = EXCLUDED.title,
                            content = EXCLUDED.content,
                            crawl_timestamp = EXCLUDED.crawl_timestamp
                    """, (
                        data['url'],
                        data['title'],
                        data['content'],
                        self.extract_domain(data['url']),
                        datetime.now()
                    ))
                    conn.commit()
            self.logger.info(f"成功抓取: {url}")
            return True
        except Exception as e:
            self.logger.error(f"抓取失败 {url}: {str(e)}")
            return False
    def extract_domain(self, url):
        """提取域名"""
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc
    def batch_crawl(self, urls, batch_size=10):
        """批量抓取"""
        successful = 0
        for i in range(0, len(urls), batch_size):
            batch = urls[i:i+batch_size]
            for url in batch:
                if self.crawl_and_store(url):
                    successful += 1
        return successful
# 使用示例
if __name__ == "__main__":
    db_config = {
        "host": "localhost",
        "database": "crawl_db",
        "user": "postgres",
        "password": "your_password"
    }
    crawler = PostgreSQLCrawler(db_config)
    # 要抓取的URL列表
    urls = [
        "https://example.com/page1",
        "https://example.com/page2"
    ]
    # 执行批量抓取
    successful = crawler.batch_crawl(urls)
    print(f"成功抓取 {successful}/{len(urls)} 个页面")

最佳实践建议

  1. 连接管理:使用连接池避免频繁建立连接
  2. 错误处理:实现重试机制和错误日志
  3. 数据清理:定期清理旧数据,使用分区表
  4. 索引优化:根据查询模式创建合适的索引
  5. 备份策略:定期备份爬取数据

这种集成方案可以根据具体需求进行调整,特别是数据表结构应根据实际爬取的数据类型进行设计。

标签: 连接 PostgreSQL

抱歉,评论功能暂时关闭!