Skip to content

PostgreSQL Database

PostgreSQL is the primary relational database for Project Aegis, storing structured data for memory, tasks, API keys, and all operational state.

Overview

  • Version: PostgreSQL 14+ (managed on host)
  • Host: localhost (on host), host.docker.internal (from containers)
  • Port: 5432
  • Database: aegis
  • User: agent
  • Extensions: pgvector (for embeddings)

Architecture

┌───────────────────────────────────────┐
│        PostgreSQL Server              │
│         (localhost:5432)              │
├───────────────────────────────────────┤
│  Database: aegis                      │
│    - 85+ tables                       │
│    - pgvector extension               │
│    - asyncpg connection pool          │
│    - Sync + async clients             │
└───────────────────────────────────────┘
         ↑                ↑
         │                │
   aegis-dashboard    aegis-scheduler
   (asyncpg pool)     (sync client)

Connection Details

From Containers

environment:
  POSTGRES_HOST: host.docker.internal
  POSTGRES_PORT: 5432
  POSTGRES_USER: agent
  POSTGRES_PASSWORD: agent
  POSTGRES_DB: aegis

Note: host.docker.internal is added via extra_hosts in docker-compose.yml

From Host

# psql
PGPASSWORD=agent psql -h localhost -U agent -d aegis

# Python
import psycopg2
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    user="agent",
    password="agent",
    database="aegis"
)

Schema Overview

The database contains 85+ tables organized by domain:

Core Tables

Table Purpose Key Columns
episodic_memory Event log, decisions, interactions id, event_type, summary, details, importance, timestamp
config System configuration (key-value) key, value
state Operational state (key-value) key, value, updated_at

Memory & Knowledge

Table Purpose
episodic_memory Timestamped event log
decision_outcomes Decision tracking with alternatives
journal_entries Structured journal entries
knowledge_documents Document metadata
knowledge_chunks Document chunks with embeddings
execution_traces Task execution traces

Task Management

Table Purpose
async_tasks Background task queue
discord_tasks Tasks from Discord #tasks channel
github_review_jobs PR review jobs
deep_briefing_jobs Intel briefing generation jobs

API Management

Table Purpose
api_keys API key authentication
api_products API product catalog
api_rate_limits Rate limit state
api_usage_events Usage tracking
api_key_credits Credit balances
intel_api_keys Intel API keys (separate from main API)
intel_subscribers Intel digest subscribers

Communication

Table Purpose
email_cache Gmail message cache
email_classifications Triage classifications
email_drafts Draft emails
discord_command_sessions Discord command sessions
discord_command_messages Discord message history

Monitoring & Analytics

Table Purpose
container_metrics Docker container health
infra_anomaly_alerts Infrastructure anomaly detection
competitor_pages Competitor monitoring snapshots
competitor_changes Detected changes
digest_history Daily digest archive

Finance & Revenue

Table Purpose
billing_events Stripe webhook events
marketplace_credit_transactions Credit purchases and usage
crypto_tokens Token balance tracking

Market Intelligence

Table Purpose
market_symbols Stock symbols and metadata
market_quotes Real-time quotes
market_ohlcv_daily Daily OHLCV data
fundamentals Company fundamentals
insider_transactions Insider trading data
institutional_holdings Institutional positions
market_decisions Trading decisions log
alert_configs User-configured alerts
alert_history Alert trigger history

Workflows

Table Purpose
workflow_checkpoints Graph-based workflow state

Content & SEO

Table Purpose
blog_posts Blog post metadata
content_repurpose Content repurposing tasks
documents Document management

WhatsApp & Sessions

Table Purpose
claude_sessions WhatsApp-triggered Claude sessions
claude_task_logs Task execution logs
claude_file_operations File operations during sessions

Geopolitical Intelligence

Table Purpose
geoint_reports Stored intelligence reports

Calendars

Table Purpose
calendar_events Google Calendar event cache
calendar_cache Calendar sync state

GitHub Integration

Table Purpose
github_repos Tracked repositories
github_review_jobs Automated review queue

File Sharing

Table Purpose
file_shares Magic link tokens (managed by ShareTokenRepository)

Key Table Schemas

episodic_memory

Central event log for all system activity:

CREATE TABLE episodic_memory (
    id SERIAL PRIMARY KEY,
    event_type VARCHAR(100) NOT NULL,
    summary TEXT NOT NULL,
    details JSONB,
    importance VARCHAR(20),  -- low, medium, high, critical
    tags TEXT[],
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_episodic_memory_timestamp ON episodic_memory(timestamp DESC);
CREATE INDEX idx_episodic_memory_type ON episodic_memory(event_type);
CREATE INDEX idx_episodic_memory_importance ON episodic_memory(importance);
CREATE INDEX idx_episodic_memory_tags ON episodic_memory USING gin(tags);

Common event types: - decision - Strategic decisions - implementation - Code implementation - error - Errors encountered - learning - Lessons learned - milestone - Project milestones - morning_status - Daily morning check - evening_summary - Daily summary - docker_health_check - Container health

workflow_checkpoints

State persistence for graph-based workflows:

CREATE TABLE workflow_checkpoints (
    id SERIAL PRIMARY KEY,
    workflow_id TEXT UNIQUE NOT NULL,
    workflow_type TEXT NOT NULL,
    state_json JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_workflow_checkpoints_type ON workflow_checkpoints(workflow_type);

Used by: aegis.workflows for resumable workflows

api_keys

API key management with tier-based rate limits:

CREATE TABLE api_keys (
    id SERIAL PRIMARY KEY,
    key_hash VARCHAR(255) UNIQUE NOT NULL,
    user_email VARCHAR(255),
    tier VARCHAR(50) DEFAULT 'basic',  -- basic, pro, enterprise
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    expires_at TIMESTAMP WITH TIME ZONE,
    is_active BOOLEAN DEFAULT TRUE,
    metadata JSONB
);

CREATE INDEX idx_api_keys_email ON api_keys(user_email);
CREATE INDEX idx_api_keys_tier ON api_keys(tier);

async_tasks

Background task queue:

CREATE TABLE async_tasks (
    id SERIAL PRIMARY KEY,
    task_type VARCHAR(100) NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',  -- pending, running, completed, failed
    priority INTEGER DEFAULT 0,
    payload JSONB NOT NULL,
    result JSONB,
    error TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    started_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_async_tasks_status ON async_tasks(status);
CREATE INDEX idx_async_tasks_priority ON async_tasks(priority DESC);

claude_sessions

WhatsApp-triggered Claude Code sessions:

CREATE TABLE claude_sessions (
    id SERIAL PRIMARY KEY,
    session_id VARCHAR(255) UNIQUE NOT NULL,
    user_phone VARCHAR(50) NOT NULL,
    status VARCHAR(50) DEFAULT 'active',  -- active, completed, error, terminated
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    initial_task TEXT,
    context_summary TEXT,
    metadata JSONB
);

CREATE INDEX idx_claude_sessions_user ON claude_sessions(user_phone);
CREATE INDEX idx_claude_sessions_status ON claude_sessions(status);

Database Clients

Synchronous Client (aegis.db)

For synchronous operations (used by scheduler, CLI tools):

from aegis.db import db

# Query
result = db.fetch_one("SELECT * FROM episodic_memory WHERE id = %s", (123,))
results = db.fetch_all("SELECT * FROM episodic_memory LIMIT 10")

# Execute
db.execute("INSERT INTO episodic_memory (event_type, summary) VALUES (%s, %s)",
           ("test", "Test event"))

# Config/State helpers
db.set_config("version", "4.16")
version = db.get_config("version")

db.set_state("operational_status", "active")
status = db.get_state("operational_status")

Implementation: Uses psycopg2 (blocking I/O)

Async Client (asyncpg)

For async operations (used by dashboard API):

# In dashboard app.py:
pool = await asyncpg.create_pool(
    host=settings.postgres_host,
    port=settings.postgres_port,
    user=settings.postgres_user,
    password=settings.postgres_password,
    database=settings.postgres_db,
    min_size=2,
    max_size=10,
)

# In route handlers:
async def my_endpoint(request: Request):
    pool = request.app.state.pg_pool
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT * FROM episodic_memory LIMIT 10")
        return {"memories": [dict(r) for r in result]}

Connection Pool: - Min connections: 2 - Max connections: 10 - Shared across dashboard process

pgvector Extension

Used for embedding-based similarity search:

Installation

CREATE EXTENSION IF NOT EXISTS vector;

Usage

# Store embeddings
from aegis.db import db

embedding = [0.1, 0.2, 0.3, ...]  # 768-dim vector
db.execute(
    "INSERT INTO knowledge_chunks (content, embedding) VALUES (%s, %s)",
    (text, embedding)
)

# Similarity search (cosine distance)
results = db.fetch_all(
    """
    SELECT content, 1 - (embedding <=> %s::vector) AS similarity
    FROM knowledge_chunks
    ORDER BY embedding <=> %s::vector
    LIMIT 10
    """,
    (embedding, embedding)
)

Operators: - <-> - L2 distance - <#> - Inner product - <=> - Cosine distance

Indexes:

CREATE INDEX ON knowledge_chunks USING ivfflat (embedding vector_cosine_ops)
  WITH (lists = 100);

Backup and Restore

Backup

# Full database dump
pg_dump -h localhost -U agent aegis > /home/agent/backups/aegis-$(date +%Y%m%d).sql

# Compressed dump
pg_dump -h localhost -U agent aegis | gzip > /home/agent/backups/aegis-$(date +%Y%m%d).sql.gz

# Schema only
pg_dump -h localhost -U agent --schema-only aegis > /home/agent/backups/aegis-schema.sql

# Data only
pg_dump -h localhost -U agent --data-only aegis > /home/agent/backups/aegis-data.sql

# Specific tables
pg_dump -h localhost -U agent --table=episodic_memory aegis > episodic_memory.sql

Restore

# Drop and recreate database
PGPASSWORD=agent psql -h localhost -U agent -d postgres -c "DROP DATABASE aegis;"
PGPASSWORD=agent psql -h localhost -U agent -d postgres -c "CREATE DATABASE aegis;"

# Restore from dump
PGPASSWORD=agent psql -h localhost -U agent -d aegis < /home/agent/backups/aegis-20260125.sql

# Restore compressed dump
gunzip -c /home/agent/backups/aegis-20260125.sql.gz | PGPASSWORD=agent psql -h localhost -U agent -d aegis

Maintenance

Vacuum

Regular maintenance to reclaim space and update statistics:

-- Full vacuum (blocking)
VACUUM FULL;

-- Analyze (update statistics)
ANALYZE;

-- Vacuum and analyze
VACUUM ANALYZE;

-- Specific table
VACUUM ANALYZE episodic_memory;

Automated: PostgreSQL runs autovacuum in background

Reindex

Rebuild indexes for performance:

-- Reindex table
REINDEX TABLE episodic_memory;

-- Reindex database
REINDEX DATABASE aegis;

Check Table Sizes

SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 20;

Check Index Usage

SELECT
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan ASC
LIMIT 20;

Monitoring

Connection Count

SELECT count(*) FROM pg_stat_activity WHERE datname = 'aegis';

Active Queries

SELECT
    pid,
    usename,
    application_name,
    state,
    query,
    query_start
FROM pg_stat_activity
WHERE datname = 'aegis' AND state != 'idle'
ORDER BY query_start;

Long-Running Queries

SELECT
    pid,
    now() - query_start AS duration,
    usename,
    query
FROM pg_stat_activity
WHERE datname = 'aegis' AND state = 'active' AND now() - query_start > interval '1 minute'
ORDER BY duration DESC;

Database Size

SELECT pg_size_pretty(pg_database_size('aegis'));

Table Statistics

SELECT
    schemaname,
    tablename,
    n_live_tup AS live_rows,
    n_dead_tup AS dead_rows,
    last_vacuum,
    last_autovacuum,
    last_analyze,
    last_autoanalyze
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_live_tup DESC;

Performance Tuning

Connection Pooling

Use connection pooling to reduce overhead:

asyncpg (in dashboard):

pool = await asyncpg.create_pool(
    host=settings.postgres_host,
    min_size=2,    # Keep 2 connections warm
    max_size=10,   # Allow burst to 10
    max_inactive_connection_lifetime=300,  # Close idle after 5min
)

psycopg2 (sync):

from psycopg2 import pool

connection_pool = pool.SimpleConnectionPool(
    minconn=1,
    maxconn=5,
    host="localhost",
    user="agent",
    password="agent",
    database="aegis"
)

Query Optimization

  1. Use indexes: Create indexes on frequently filtered columns

    CREATE INDEX idx_episodic_memory_timestamp ON episodic_memory(timestamp DESC);
    

  2. Use EXPLAIN: Analyze query plans

    EXPLAIN ANALYZE SELECT * FROM episodic_memory WHERE timestamp > NOW() - INTERVAL '7 days';
    

  3. Limit results: Always use LIMIT for large tables

    SELECT * FROM episodic_memory ORDER BY timestamp DESC LIMIT 100;
    

  4. Use appropriate data types: JSONB for structured data, TEXT for unstructured

  5. Partition large tables: Consider partitioning by date for time-series data

Troubleshooting

Connection refused

  1. Check PostgreSQL is running:

    sudo systemctl status postgresql
    

  2. Check port is listening:

    netstat -tlnp | grep 5432
    

  3. Check pg_hba.conf allows connections:

    sudo cat /etc/postgresql/*/main/pg_hba.conf | grep -v "^#"
    

Too many connections

  1. Check current connections:

    SELECT count(*) FROM pg_stat_activity;
    

  2. Increase max_connections:

    ALTER SYSTEM SET max_connections = 200;
    SELECT pg_reload_conf();
    

  3. Terminate idle connections:

    SELECT pg_terminate_backend(pid)
    FROM pg_stat_activity
    WHERE datname = 'aegis' AND state = 'idle' AND state_change < NOW() - INTERVAL '1 hour';
    

Slow queries

  1. Enable slow query logging:

    ALTER DATABASE aegis SET log_min_duration_statement = 1000;  -- Log queries > 1s
    

  2. Check logs:

    sudo tail -f /var/log/postgresql/postgresql-*-main.log | grep "duration:"
    

  3. Analyze with EXPLAIN:

    EXPLAIN (ANALYZE, BUFFERS) <your_query>;
    

Disk full

  1. Check disk usage:

    df -h /var/lib/postgresql
    

  2. Find largest tables:

    SELECT tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename))
    FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
    

  3. Vacuum full to reclaim space:

    VACUUM FULL;