Artificial intelligence

How to Build an EverMem-Style Persistent AI Agent OS with Hierarchical Memory, FAISS Vector Retrieval, SQLite Storage, and Automated Memory Consolidation

class EverMemAgentOS:
   def __init__(
       self,
       workdir: str = "/content/evermem_agent_os",
       db_name: str = "evermem.sqlite",
       embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
       gen_model: str = "google/flan-t5-small",
       stm_max_turns: int = 10,
       ltm_topk: int = 6,
       consolidate_every: int = 8,
       consolidate_trigger_tokens: int = 1400,
       compress_target_chars: int = 420,
       seed: int = 7,
   ):
       self.workdir = workdir
       _ensure_dir(self.workdir)
       self.db_path = os.path.join(self.workdir, db_name)


       self.embedder = SentenceTransformer(embedding_model)
       self.embed_dim = self.embedder.get_sentence_embedding_dimension()


       self.tokenizer = AutoTokenizer.from_pretrained(gen_model)
       self.model = AutoModelForSeq2SeqLM.from_pretrained(gen_model)
       self.model.to(self.device)
       self.model.eval()


       self.stm_max_turns = stm_max_turns
       self.ltm_topk = ltm_topk
       self.consolidate_every = consolidate_every
       self.consolidate_trigger_tokens = consolidate_trigger_tokens
       self.compress_target_chars = compress_target_chars


       np.random.seed(seed)


       self._init_db()
       self._init_faiss()


       self.stm: List[Dict[str, str]] = []
       self.turns = 0


   def _init_db(self):
       conn = sqlite3.connect(self.db_path)
       cur = conn.cursor()
       cur.execute(
           """
           CREATE TABLE IF NOT EXISTS memories (
               mid TEXT PRIMARY KEY,
               role TEXT,
               text TEXT,
               created_ts INTEGER,
               importance REAL,
               tokens_est INTEGER,
               meta_json TEXT
           )
           """
       )
       cur.execute(
           """
           CREATE TABLE IF NOT EXISTS kv_store (
               k TEXT PRIMARY KEY,
               v_json TEXT,
               updated_ts INTEGER
           )
           """
       )
       cur.execute(
           """
           CREATE TABLE IF NOT EXISTS consolidations (
               cid TEXT PRIMARY KEY,
               created_ts INTEGER,
               summary TEXT,
               source_mids_json TEXT
           )
           """
       )
       conn.commit()
       conn.close()


   def _init_faiss(self):
       self.faiss_index_path = os.path.join(self.workdir, "faiss.index")
       self.faiss_map_path = os.path.join(self.workdir, "faiss_map.json")


       if os.path.exists(self.faiss_index_path) and os.path.exists(self.faiss_map_path):
           self.index = faiss.read_index(self.faiss_index_path)
           with open(self.faiss_map_path, "r", encoding="utf-8") as f:
               self.id_map = json.load(f)
           self.id_map = {int(k): v for k, v in self.id_map.items()}
           self.next_faiss_id = (max(self.id_map.keys()) + 1) if self.id_map else 0
           return


       self.index = faiss.IndexFlatIP(self.embed_dim)
       self.id_map: Dict[int, str] = {}
       self.next_faiss_id = 0
       self._persist_faiss()


   def _persist_faiss(self):
       faiss.write_index(self.index, self.faiss_index_path)
       with open(self.faiss_map_path, "w", encoding="utf-8") as f:
           json.dump({str(k): v for k, v in self.id_map.items()}, f)


   def _embed(self, texts: List[str]) -> np.ndarray:
       vecs = self.embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True)
       if vecs.ndim == 1:
           vecs = vecs.reshape(1, -1)
       return vecs.astype("float32")


   def _tokens_est(self, text: str) -> int:
       text = text or ""
       return max(1, int(len(text.split()) * 1.25))


   def _importance_score(self, role: str, text: str, meta: Dict[str, Any]) -> float:
       base = 0.35
       length_bonus = min(0.45, math.log1p(len(text)) / 20.0)
       role_bonus = 0.08 if role == "user" else 0.03
       pin = 0.35 if meta.get("pinned") else 0.0
       signal = meta.get("signal", "")
       signal_bonus = 0.18 if signal in {"decision", "preference", "fact", "task"} else 0.0
       q_bonus = 0.06 if "?" in text else 0.0
       number_bonus = 0.05 if any(ch.isdigit() for ch in text) else 0.0
       return float(min(1.0, base + length_bonus + role_bonus + pin + signal_bonus + q_bonus + number_bonus))


   def upsert_kv(self, k: str, v: Any):
       conn = sqlite3.connect(self.db_path)
       cur = conn.cursor()
       cur.execute(
           "INSERT INTO kv_store (k, v_json, updated_ts) VALUES (?, ?, ?) ON CONFLICT(k) DO UPDATE SET v_json=excluded.v_json, updated_ts=excluded.updated_ts",
           (k, json.dumps(v, ensure_ascii=False), _now_ts()),
       )
       conn.commit()
       conn.close()


   def get_kv(self, k: str, default=None):
       conn = sqlite3.connect(self.db_path)
       cur = conn.cursor()
       cur.execute("SELECT v_json FROM kv_store WHERE k=?", (k,))
       row = cur.fetchone()
       conn.close()
       if not row:
           return default
       try:
           return json.loads(row[0])
       except Exception:
           return default


   def add_memory(self, role: str, text: str, meta: Optional[Dict[str, Any]] = None) -> str:
       meta = meta or {}
       text = (text or "").strip()
       mid = meta.get("mid") or f"m:{_sha(f'{_now_ts()}::{role}::{text[:80]}::{np.random.randint(0, 10**9)}')}"
       created_ts = _now_ts()
       tokens_est = self._tokens_est(text)
       importance = float(meta.get("importance")) if meta.get("importance") is not None else self._importance_score(role, text, meta)


       conn = sqlite3.connect(self.db_path)
       cur = conn.cursor()
       cur.execute(
           "INSERT OR REPLACE INTO memories (mid, role, text, created_ts, importance, tokens_est, meta_json) VALUES (?, ?, ?, ?, ?, ?, ?)",
           (mid, role, text, created_ts, importance, tokens_est, json.dumps(meta, ensure_ascii=False)),
       )
       conn.commit()
       conn.close()


       vec = self._embed([text])
       fid = self.next_faiss_id
       self.next_faiss_id += 1
       self.index.add(vec)
       self.id_map[fid] = mid
       self._persist_faiss()


       return mid

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button