使用示例

openclaw openclaw官方 2

我来提供一个节省内存的OpenClaw实现方案,OpenClaw通常指一种用于文档处理的工具,这里我将实现一个内存高效的版本。

使用示例-第1张图片-OpenClaw开源下载|官方OpenClaw下载

内存优化版OpenClaw

import hashlib
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from collections import defaultdict
import zlib
import pickle
import gc
@dataclass
class CompactDocument:
    """紧凑文档表示,节省内存"""
    __slots__ = ['id', 'content_hash', 'chunks', 'metadata']  # 减少内存占用
    id: str
    content_hash: str  # 使用哈希代替完整内容
    chunks: List[str]  # 分块存储
    metadata: Dict[str, Any]
    def __init__(self, doc_id: str, content: str, metadata: Optional[Dict] = None):
        self.id = doc_id
        self.content_hash = hashlib.md5(content.encode()).hexdigest()
        # 分块存储,每块约1000字符
        self.chunks = [content[i:i+1000] for i in range(0, len(content), 1000)]
        self.metadata = metadata or {}
class MemoryEfficientOpenClaw:
    """内存高效的OpenClaw实现"""
    def __init__(self, max_docs_in_memory: int = 100):
        """
        初始化OpenClaw
        Args:
            max_docs_in_memory: 内存中最大文档数,超过则压缩存储
        """
        self.max_docs_in_memory = max_docs_in_memory
        self.documents: Dict[str, CompactDocument] = {}
        self.compressed_docs: Dict[str, bytes] = {}  # 压缩存储的文档
        self.index: Dict[str, List[str]] = defaultdict(list)  # 倒排索引
        self.document_count = 0
    def add_document(self, doc_id: str, content: str, 
                    metadata: Optional[Dict] = None) -> None:
        """添加文档,自动管理内存"""
        # 创建紧凑文档
        doc = CompactDocument(doc_id, content, metadata)
        # 如果文档数超过限制,压缩旧文档
        if len(self.documents) >= self.max_docs_in_memory:
            self._compress_oldest_docs()
        # 添加到内存
        self.documents[doc_id] = doc
        self.document_count += 1
        # 更新索引(使用生成器减少内存)
        self._update_index(doc_id, doc.chunks)
        # 手动触发垃圾回收
        if self.document_count % 50 == 0:
            gc.collect()
    def _compress_oldest_docs(self) -> None:
        """压缩最旧的文档以释放内存"""
        if not self.documents:
            return
        # 找到最旧的文档(简单实现:按添加顺序)
        oldest_id = next(iter(self.documents))
        doc = self.documents.pop(oldest_id)
        # 压缩存储
        compressed = zlib.compress(pickle.dumps(doc))
        self.compressed_docs[oldest_id] = compressed
    def _update_index(self, doc_id: str, chunks: List[str]) -> None:
        """更新倒排索引(内存优化版)"""
        # 分批处理,减少内存峰值
        batch_size = 1000
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i+batch_size]
            chunk_index = f"{doc_id}_{i//batch_size}"
            # 为每个块创建索引
            for word in self._extract_keywords(' '.join(batch)):
                self.index[word].append(chunk_index)
    def _extract_keywords(self, text: str, max_words: int = 20) -> List[str]:
        """提取关键词(简化版)"""
        words = text.lower().split()
        # 使用生成器表达式
        return list(set(word for word in words if len(word) > 2))[:max_words]
    def search(self, query: str, limit: int = 10) -> List[Dict]:
        """搜索文档(内存高效)"""
        query_words = query.lower().split()
        scores: Dict[str, float] = {}
        # 使用生成器计算相关性
        for word in query_words:
            if word in self.index:
                for chunk_id in self.index[word]:
                    doc_id, _ = chunk_id.split('_', 1)
                    scores[doc_id] = scores.get(doc_id, 0) + 1
        # 按相关性排序
        sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        # 惰性加载文档内容
        results = []
        for doc_id, score in sorted_results[:limit]:
            doc_content = self._load_document_content(doc_id)
            if doc_content:
                results.append({
                    'id': doc_id,
                    'score': score,
                    'content': doc_content[:500] + '...' if len(doc_content) > 500 else doc_content,
                    'metadata': self._get_doc_metadata(doc_id)
                })
        return results
    def _load_document_content(self, doc_id: str) -> Optional[str]:
        """加载文档内容(优先从内存,其次从压缩存储)"""
        # 尝试从内存加载
        if doc_id in self.documents:
            doc = self.documents[doc_id]
            return ''.join(doc.chunks)
        # 尝试从压缩存储加载
        if doc_id in self.compressed_docs:
            compressed = self.compressed_docs.pop(doc_id)
            doc = pickle.loads(zlib.decompress(compressed))
            # 解压后放回内存(可能需要压缩其他文档)
            if len(self.documents) >= self.max_docs_in_memory:
                self._compress_oldest_docs()
            self.documents[doc_id] = doc
            return ''.join(doc.chunks)
        return None
    def _get_doc_metadata(self, doc_id: str) -> Dict:
        """获取文档元数据"""
        if doc_id in self.documents:
            return self.documents[doc_id].metadata
        if doc_id in self.compressed_docs:
            # 临时解压获取元数据
            compressed = self.compressed_docs[doc_id]
            doc = pickle.loads(zlib.decompress(compressed))
            return doc.metadata
        return {}
    def optimize_memory(self) -> None:
        """优化内存使用"""
        # 压缩所有可以压缩的文档
        docs_to_compress = list(self.documents.keys())[self.max_docs_in_memory//2:]
        for doc_id in docs_to_compress:
            doc = self.documents.pop(doc_id)
            compressed = zlib.compress(pickle.dumps(doc))
            self.compressed_docs[doc_id] = compressed
        # 清理索引中的无效条目
        self._cleanup_index()
        # 强制垃圾回收
        gc.collect()
    def _cleanup_index(self) -> None:
        """清理索引"""
        current_docs = set(self.documents.keys()) | set(self.compressed_docs.keys())
        # 创建新的索引,只保留有效文档
        new_index = defaultdict(list)
        for word, chunk_ids in self.index.items():
            valid_chunks = []
            for chunk_id in chunk_ids:
                doc_id, _ = chunk_id.split('_', 1)
                if doc_id in current_docs:
                    valid_chunks.append(chunk_id)
            if valid_chunks:
                new_index[word] = valid_chunks
        self.index = new_index
    def get_memory_stats(self) -> Dict[str, int]:
        """获取内存使用统计"""
        return {
            'documents_in_memory': len(self.documents),
            'documents_compressed': len(self.compressed_docs),
            'index_entries': len(self.index),
            'total_documents': self.document_count
        }
def example_usage():
    """使用示例"""
    # 创建内存优化的OpenClaw实例
    claw = MemoryEfficientOpenClaw(max_docs_in_memory=50)
    # 添加文档
    documents = [
        ("doc1", "Python是一种高级编程语言,广泛用于数据科学和机器学习。"),
        ("doc2", "机器学习是人工智能的一个分支,让计算机从数据中学习。"),
        ("doc3", "数据科学使用科学方法、算法和系统从数据中提取知识和见解。"),
        # ... 更多文档
    ]
    for doc_id, content in documents:
        claw.add_document(doc_id, content, {"source": "example"})
    # 搜索
    results = claw.search("机器学习 数据科学")
    print("搜索结果:", results)
    # 内存统计
    stats = claw.get_memory_stats()
    print("内存统计:", stats)
    # 优化内存
    claw.optimize_memory()
    return claw
# 更进一步的优化:使用磁盘存储
class DiskBackedOpenClaw(MemoryEfficientOpenClaw):
    """磁盘支持的OpenClaw,进一步节省内存"""
    def __init__(self, max_docs_in_memory: int = 50, storage_dir: str = "./claw_storage"):
        import os
        super().__init__(max_docs_in_memory)
        self.storage_dir = storage_dir
        os.makedirs(storage_dir, exist_ok=True)
    def _compress_oldest_docs(self) -> None:
        """压缩并存储到磁盘"""
        if not self.documents:
            return
        oldest_id = next(iter(self.documents))
        doc = self.documents.pop(oldest_id)
        # 保存到磁盘
        import pickle
        import os
        filepath = os.path.join(self.storage_dir, f"{oldest_id}.pkl")
        with open(filepath, 'wb') as f:
            pickle.dump(doc, f)
        # 记录元数据
        self.compressed_docs[oldest_id] = filepath
    def _load_document_content(self, doc_id: str) -> Optional[str]:
        """从磁盘加载文档"""
        # 优先从内存加载
        content = super()._load_document_content(doc_id)
        if content:
            return content
        # 从磁盘加载
        if doc_id in self.compressed_docs:
            import pickle
            import os
            filepath = self.compressed_docs[doc_id]
            if os.path.exists(filepath):
                with open(filepath, 'rb') as f:
                    doc = pickle.load(f)
                # 放回内存(可能需要压缩其他文档)
                if len(self.documents) >= self.max_docs_in_memory:
                    self._compress_oldest_docs()
                self.documents[doc_id] = doc
                return ''.join(doc.chunks)
        return None
if __name__ == "__main__":
    # 运行示例
    example_usage()

主要内存优化技术:

  1. 紧凑数据结构

    • 使用__slots__减少对象内存
    • 分块存储大文档
    • 使用哈希值代替完整内容
  2. 惰性加载

    • 需要时才加载文档内容
    • 从内存->压缩->磁盘分层存储
  3. 增量处理

    • 分批处理数据避免内存峰值
    • 及时清理无用数据
  4. 压缩存储

    • 使用zlib压缩不常用文档
    • 可选磁盘存储进一步节省内存
  5. 智能缓存

    • LRU策略管理内存中的文档
    • 自动压缩旧文档

这个实现可以在处理大量文档时显著减少内存使用,同时保持良好的搜索性能。

标签: 使用 示例

抱歉,评论功能暂时关闭!