OpenClaw(通常指类似文档抓取或数据处理工具)与PostgreSQL的集成可以通过多种方式实现,以下是常见的方法:

数据采集与存储集成
直接JDBC连接
// Java示例
String url = "jdbc:postgresql://localhost:5432/database";
String user = "postgres";
String password = "password";
try (Connection conn = DriverManager.getConnection(url, user, password)) {
String sql = "INSERT INTO crawled_data(url, content, timestamp) VALUES (?, ?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setString(1, "https://example.com");
pstmt.setString(2, "抓取的内容");
pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
pstmt.executeUpdate();
}
Python + psycopg2
import psycopg2
from openclaw import OpenClaw
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="postgres",
password="password"
)
# OpenClaw抓取数据
claw = OpenClaw()
data = claw.crawl("https://example.com")
# 存储到PostgreSQL
cursor = conn.cursor()
cursor.execute("""
INSERT INTO crawled_pages
(url, title, content, crawl_time)
VALUES (%s, %s, %s, NOW())
""", (data['url'], data['title'], data['content']))
conn.commit()
批量处理优化
使用COPY命令批量导入
import psycopg2
from io import StringIO
def bulk_insert_pages(pages_data):
conn = psycopg2.connect("your_connection_string")
cursor = conn.cursor()
# 准备CSV格式数据
f = StringIO()
for page in pages_data:
f.write(f"{page['url']}\t{page['title']}\t{page['content']}\n")
f.seek(0)
cursor.copy_from(f, 'crawled_pages',
columns=('url', 'title', 'content'))
conn.commit()
连接池配置
from psycopg2.pool import ThreadedConnectionPool
# 创建连接池
pool = ThreadedConnectionPool(
minconn=5,
maxconn=20,
host="localhost",
database="crawl_db",
user="postgres",
password="password"
)
# 使用连接
conn = pool.getconn()
# ... 执行操作
pool.putconn(conn)
表结构设计建议
基础表结构
CREATE TABLE crawled_pages (
id SERIAL PRIMARY KEY,
url VARCHAR(2048) UNIQUE NOT NULL,
domain VARCHAR(255),TEXT,
content TEXT,
html_content TEXT,
crawl_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status_code INTEGER,
content_hash VARCHAR(64), -- 用于去重
metadata JSONB
);
-- 创建索引
CREATE INDEX idx_crawled_url ON crawled_pages(url);
CREATE INDEX idx_crawl_time ON crawled_pages(crawl_timestamp);
CREATE INDEX idx_domain ON crawled_pages(domain);
高级表结构(支持增量爬取)
CREATE TABLE crawl_queue (
id SERIAL PRIMARY KEY,
url VARCHAR(2048) UNIQUE NOT NULL,
priority INTEGER DEFAULT 5,
last_crawled TIMESTAMP,
crawl_status VARCHAR(20), -- 'pending', 'success', 'failed'
retry_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE crawl_stats (
id SERIAL PRIMARY KEY,
domain VARCHAR(255),
pages_crawled INTEGER DEFAULT 0,
avg_response_time FLOAT,
last_crawl TIMESTAMP
);
异步处理方案
使用asyncpg(Python异步)
import asyncio
import asyncpg
from openclaw.async_crawler import AsyncOpenClaw
async def crawl_and_store():
# 连接数据库
conn = await asyncpg.connect('postgresql://user:password@localhost/db')
# 异步爬取
claw = AsyncOpenClaw()
urls = await claw.fetch_urls_to_crawl(limit=100)
for url in urls:
data = await claw.crawl(url)
# 异步存储
await conn.execute('''
INSERT INTO crawled_pages(url, content)
VALUES($1, $2)
ON CONFLICT (url) DO UPDATE
SET content = EXCLUDED.content
''', data['url'], data['content'])
消息队列集成
OpenClaw → Kafka/RabbitMQ → PostgreSQL消费者
监控与维护
性能监控查询
-- 查看爬取统计
SELECT
DATE(crawl_timestamp) as crawl_date,
COUNT(*) as pages_crawled,
AVG(LENGTH(content)) as avg_content_size
FROM crawled_pages
GROUP BY DATE(crawl_timestamp)
ORDER BY crawl_date DESC;
-- 查找重复内容
SELECT content_hash, COUNT(*)
FROM crawled_pages
GROUP BY content_hash
HAVING COUNT(*) > 1;
PostgreSQL配置优化
# postgresql.conf 优化建议 shared_buffers = 4GB work_mem = 64MB maintenance_work_mem = 1GB effective_cache_size = 12GB max_worker_processes = 8 max_parallel_workers_per_gather = 4
完整示例脚本
# complete_integration.py
import psycopg2
import logging
from datetime import datetime
from openclaw import OpenClaw
class PostgreSQLCrawler:
def __init__(self, db_config):
self.db_config = db_config
self.claw = OpenClaw()
self.logger = logging.getLogger(__name__)
def connect(self):
"""建立数据库连接"""
return psycopg2.connect(**self.db_config)
def crawl_and_store(self, url):
"""抓取并存储单个URL"""
try:
# 抓取数据
data = self.claw.crawl(url)
# 存储到PostgreSQL
with self.connect() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO crawled_pages
(url, title, content, domain, crawl_timestamp)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
content = EXCLUDED.content,
crawl_timestamp = EXCLUDED.crawl_timestamp
""", (
data['url'],
data['title'],
data['content'],
self.extract_domain(data['url']),
datetime.now()
))
conn.commit()
self.logger.info(f"成功抓取: {url}")
return True
except Exception as e:
self.logger.error(f"抓取失败 {url}: {str(e)}")
return False
def extract_domain(self, url):
"""提取域名"""
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc
def batch_crawl(self, urls, batch_size=10):
"""批量抓取"""
successful = 0
for i in range(0, len(urls), batch_size):
batch = urls[i:i+batch_size]
for url in batch:
if self.crawl_and_store(url):
successful += 1
return successful
# 使用示例
if __name__ == "__main__":
db_config = {
"host": "localhost",
"database": "crawl_db",
"user": "postgres",
"password": "your_password"
}
crawler = PostgreSQLCrawler(db_config)
# 要抓取的URL列表
urls = [
"https://example.com/page1",
"https://example.com/page2"
]
# 执行批量抓取
successful = crawler.batch_crawl(urls)
print(f"成功抓取 {successful}/{len(urls)} 个页面")
最佳实践建议
- 连接管理:使用连接池避免频繁建立连接
- 错误处理:实现重试机制和错误日志
- 数据清理:定期清理旧数据,使用分区表
- 索引优化:根据查询模式创建合适的索引
- 备份策略:定期备份爬取数据
这种集成方案可以根据具体需求进行调整,特别是数据表结构应根据实际爬取的数据类型进行设计。
标签: 连接 PostgreSQL
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。