Source code for mcp_server

import asyncio
import os
from typing import Any, Dict, List, Union

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import EmbeddedResource, ImageContent, TextContent, Tool

from core import Collection
from llm import Embedder

# --- MCP Server Implementation ---

app = Server("embenx-memory")

[docs] async def list_tools_impl() -> List[Tool]: """List available tools for agentic memory.""" return [ Tool( name="memory_add", description="Add embeddings and metadata to an Embenx collection.", inputSchema={ "type": "object", "properties": { "collection": {"type": "string", "default": "default"}, "texts": {"type": "array", "items": {"type": "string"}}, "metadata": {"type": "array", "items": {"type": "object"}}, "model": {"type": "string", "default": "ollama/nomic-embed-text"} }, "required": ["texts"] } ), Tool( name="memory_search", description="Search for relevant documents in an Embenx collection.", inputSchema={ "type": "object", "properties": { "collection": {"type": "string", "default": "default"}, "query": {"type": "string"}, "top_k": {"type": "number", "default": 5}, "model": {"type": "string", "default": "ollama/nomic-embed-text"} }, "required": ["query"] } ), Tool( name="list_collections", description="List all local Embenx collections available.", inputSchema={"type": "object", "properties": {}} ) ]
[docs] async def call_tool_impl(name: str, arguments: Dict[str, Any]) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: """Handle tool calls from the AI agent.""" if name == "memory_add": texts = arguments["texts"] meta = arguments.get("metadata") col_name = arguments.get("collection", "default") model = arguments.get("model", "ollama/nomic-embed-text") emb = Embedder(model) vectors = emb.embed_texts(texts) col = Collection(name=col_name, indexer_type="faiss") col.add(vectors, meta) path = f"{col_name}.parquet" col.to_parquet(path) return [TextContent(type="text", text=f"Successfully added {len(texts)} items to collection '{col_name}'. Storage: {path}")] elif name == "memory_search": query = arguments["query"] col_name = arguments.get("collection", "default") top_k = int(arguments.get("top_k", 5)) model = arguments.get("model", "ollama/nomic-embed-text") path = f"{col_name}.parquet" if not os.path.exists(path): return [TextContent(type="text", text=f"Collection '{col_name}' not found. Path {path} does not exist.")] col = Collection.from_parquet(path) emb = Embedder(model) q_vec = emb.embed_query(query) results = col.search(q_vec, top_k=top_k) output = [] for meta, dist in results: text = meta.get("text", str(meta)) output.append(f"- {text} (Score: {dist:.4f})") return [TextContent(type="text", text="\n".join(output) if output else "No relevant matches found.")] elif name == "list_collections": import glob files = glob.glob("*.parquet") names = [f.replace(".parquet", "") for f in files] return [TextContent(type="text", text=f"Available collections: {', '.join(names) if names else 'None'}")] else: raise ValueError(f"Unknown tool: {name}")
# Register implementations with the app app.list_tools()(list_tools_impl) app.call_tool()(call_tool_impl)
[docs] async def run(): async with stdio_server() as (read_stream, write_server): await app.run(read_stream, write_server, app.create_initialization_options())
if __name__ == "__main__": asyncio.run(run())