Source code for indexers.weaviate_indexer

import os
from typing import Any, Dict, List, Tuple

import weaviate
from weaviate.classes.config import Configure, DataType, Property, VectorDistances
from weaviate.classes.init import AdditionalConfig, Timeout

from .base import BaseIndexer


[docs] class WeaviateIndexer(BaseIndexer): def __init__(self, dimension: int): super().__init__("Weaviate", dimension) # Using embedded Weaviate for local benchmarking self.client = weaviate.connect_to_embedded( version="1.27.0", persistence_data_path="./weaviate_data", port=8099, grpc_port=50060, additional_config=AdditionalConfig(timeout=Timeout(init=30, query=60, insert=120)), ) self.collection_name = "Benchmark" # Cleanup if exists if self.client.collections.exists(self.collection_name): self.client.collections.delete(self.collection_name) self.client.collections.create( name=self.collection_name, vectorizer_config=Configure.Vectorizer.none(), properties=[ Property(name="metadata_json", data_type=DataType.TEXT), ], vector_index_config=Configure.VectorIndex.hnsw(distance_metric=VectorDistances.COSINE), ) self.collection = self.client.collections.get(self.collection_name)
[docs] def build_index(self, embeddings: List[List[float]], metadata: List[Dict[str, Any]]) -> None: import json with self.collection.batch.dynamic() as batch: for emb, meta in zip(embeddings, metadata): batch.add_object(properties={"metadata_json": json.dumps(meta)}, vector=emb)
[docs] def search( self, query_embedding: List[float], top_k: int = 5 ) -> List[Tuple[Dict[str, Any], float]]: import json response = self.collection.query.near_vector( near_vector=query_embedding, limit=top_k, return_metadata=["distance"] ) results = [] for obj in response.objects: meta = json.loads(obj.properties["metadata_json"]) dist = obj.metadata.distance if obj.metadata.distance is not None else 0.0 results.append((meta, float(dist))) return results
[docs] def get_size(self) -> int: # Estimation based on data folder if exists size = 0 try: if os.path.exists("./weaviate_data"): for root, _dirs, files in os.walk("./weaviate_data"): for f in files: size += os.path.getsize(os.path.join(root, f)) except Exception: pass return size
[docs] def cleanup(self) -> None: self.client.close() import shutil try: if os.path.exists("./weaviate_data"): shutil.rmtree("./weaviate_data") except Exception: pass