PostgreSQL Database¶
PostgreSQL is the primary relational database for Project Aegis, storing structured data for memory, tasks, API keys, and all operational state.
Overview¶
- Version: PostgreSQL 14+ (managed on host)
- Host:
localhost(on host),host.docker.internal(from containers) - Port: 5432
- Database:
aegis - User:
agent - Extensions: pgvector (for embeddings)
Architecture¶
┌───────────────────────────────────────┐
│ PostgreSQL Server │
│ (localhost:5432) │
├───────────────────────────────────────┤
│ Database: aegis │
│ - 85+ tables │
│ - pgvector extension │
│ - asyncpg connection pool │
│ - Sync + async clients │
└───────────────────────────────────────┘
↑ ↑
│ │
aegis-dashboard aegis-scheduler
(asyncpg pool) (sync client)
Connection Details¶
From Containers¶
environment:
POSTGRES_HOST: host.docker.internal
POSTGRES_PORT: 5432
POSTGRES_USER: agent
POSTGRES_PASSWORD: agent
POSTGRES_DB: aegis
Note: host.docker.internal is added via extra_hosts in docker-compose.yml
From Host¶
# psql
PGPASSWORD=agent psql -h localhost -U agent -d aegis
# Python
import psycopg2
conn = psycopg2.connect(
host="localhost",
port=5432,
user="agent",
password="agent",
database="aegis"
)
Schema Overview¶
The database contains 85+ tables organized by domain:
Core Tables¶
| Table | Purpose | Key Columns |
|---|---|---|
episodic_memory |
Event log, decisions, interactions | id, event_type, summary, details, importance, timestamp |
config |
System configuration (key-value) | key, value |
state |
Operational state (key-value) | key, value, updated_at |
Memory & Knowledge¶
| Table | Purpose |
|---|---|
episodic_memory |
Timestamped event log |
decision_outcomes |
Decision tracking with alternatives |
journal_entries |
Structured journal entries |
knowledge_documents |
Document metadata |
knowledge_chunks |
Document chunks with embeddings |
execution_traces |
Task execution traces |
Task Management¶
| Table | Purpose |
|---|---|
async_tasks |
Background task queue |
discord_tasks |
Tasks from Discord #tasks channel |
github_review_jobs |
PR review jobs |
deep_briefing_jobs |
Intel briefing generation jobs |
API Management¶
| Table | Purpose |
|---|---|
api_keys |
API key authentication |
api_products |
API product catalog |
api_rate_limits |
Rate limit state |
api_usage_events |
Usage tracking |
api_key_credits |
Credit balances |
intel_api_keys |
Intel API keys (separate from main API) |
intel_subscribers |
Intel digest subscribers |
Communication¶
| Table | Purpose |
|---|---|
email_cache |
Gmail message cache |
email_classifications |
Triage classifications |
email_drafts |
Draft emails |
discord_command_sessions |
Discord command sessions |
discord_command_messages |
Discord message history |
Monitoring & Analytics¶
| Table | Purpose |
|---|---|
container_metrics |
Docker container health |
infra_anomaly_alerts |
Infrastructure anomaly detection |
competitor_pages |
Competitor monitoring snapshots |
competitor_changes |
Detected changes |
digest_history |
Daily digest archive |
Finance & Revenue¶
| Table | Purpose |
|---|---|
billing_events |
Stripe webhook events |
marketplace_credit_transactions |
Credit purchases and usage |
crypto_tokens |
Token balance tracking |
Market Intelligence¶
| Table | Purpose |
|---|---|
market_symbols |
Stock symbols and metadata |
market_quotes |
Real-time quotes |
market_ohlcv_daily |
Daily OHLCV data |
fundamentals |
Company fundamentals |
insider_transactions |
Insider trading data |
institutional_holdings |
Institutional positions |
market_decisions |
Trading decisions log |
alert_configs |
User-configured alerts |
alert_history |
Alert trigger history |
Workflows¶
| Table | Purpose |
|---|---|
workflow_checkpoints |
Graph-based workflow state |
Content & SEO¶
| Table | Purpose |
|---|---|
blog_posts |
Blog post metadata |
content_repurpose |
Content repurposing tasks |
documents |
Document management |
WhatsApp & Sessions¶
| Table | Purpose |
|---|---|
claude_sessions |
WhatsApp-triggered Claude sessions |
claude_task_logs |
Task execution logs |
claude_file_operations |
File operations during sessions |
Geopolitical Intelligence¶
| Table | Purpose |
|---|---|
geoint_reports |
Stored intelligence reports |
Calendars¶
| Table | Purpose |
|---|---|
calendar_events |
Google Calendar event cache |
calendar_cache |
Calendar sync state |
GitHub Integration¶
| Table | Purpose |
|---|---|
github_repos |
Tracked repositories |
github_review_jobs |
Automated review queue |
File Sharing¶
| Table | Purpose |
|---|---|
file_shares |
Magic link tokens (managed by ShareTokenRepository) |
Key Table Schemas¶
episodic_memory¶
Central event log for all system activity:
CREATE TABLE episodic_memory (
id SERIAL PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
summary TEXT NOT NULL,
details JSONB,
importance VARCHAR(20), -- low, medium, high, critical
tags TEXT[],
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_episodic_memory_timestamp ON episodic_memory(timestamp DESC);
CREATE INDEX idx_episodic_memory_type ON episodic_memory(event_type);
CREATE INDEX idx_episodic_memory_importance ON episodic_memory(importance);
CREATE INDEX idx_episodic_memory_tags ON episodic_memory USING gin(tags);
Common event types:
- decision - Strategic decisions
- implementation - Code implementation
- error - Errors encountered
- learning - Lessons learned
- milestone - Project milestones
- morning_status - Daily morning check
- evening_summary - Daily summary
- docker_health_check - Container health
workflow_checkpoints¶
State persistence for graph-based workflows:
CREATE TABLE workflow_checkpoints (
id SERIAL PRIMARY KEY,
workflow_id TEXT UNIQUE NOT NULL,
workflow_type TEXT NOT NULL,
state_json JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_workflow_checkpoints_type ON workflow_checkpoints(workflow_type);
Used by: aegis.workflows for resumable workflows
api_keys¶
API key management with tier-based rate limits:
CREATE TABLE api_keys (
id SERIAL PRIMARY KEY,
key_hash VARCHAR(255) UNIQUE NOT NULL,
user_email VARCHAR(255),
tier VARCHAR(50) DEFAULT 'basic', -- basic, pro, enterprise
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN DEFAULT TRUE,
metadata JSONB
);
CREATE INDEX idx_api_keys_email ON api_keys(user_email);
CREATE INDEX idx_api_keys_tier ON api_keys(tier);
async_tasks¶
Background task queue:
CREATE TABLE async_tasks (
id SERIAL PRIMARY KEY,
task_type VARCHAR(100) NOT NULL,
status VARCHAR(50) DEFAULT 'pending', -- pending, running, completed, failed
priority INTEGER DEFAULT 0,
payload JSONB NOT NULL,
result JSONB,
error TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX idx_async_tasks_status ON async_tasks(status);
CREATE INDEX idx_async_tasks_priority ON async_tasks(priority DESC);
claude_sessions¶
WhatsApp-triggered Claude Code sessions:
CREATE TABLE claude_sessions (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) UNIQUE NOT NULL,
user_phone VARCHAR(50) NOT NULL,
status VARCHAR(50) DEFAULT 'active', -- active, completed, error, terminated
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
initial_task TEXT,
context_summary TEXT,
metadata JSONB
);
CREATE INDEX idx_claude_sessions_user ON claude_sessions(user_phone);
CREATE INDEX idx_claude_sessions_status ON claude_sessions(status);
Database Clients¶
Synchronous Client (aegis.db)¶
For synchronous operations (used by scheduler, CLI tools):
from aegis.db import db
# Query
result = db.fetch_one("SELECT * FROM episodic_memory WHERE id = %s", (123,))
results = db.fetch_all("SELECT * FROM episodic_memory LIMIT 10")
# Execute
db.execute("INSERT INTO episodic_memory (event_type, summary) VALUES (%s, %s)",
("test", "Test event"))
# Config/State helpers
db.set_config("version", "4.16")
version = db.get_config("version")
db.set_state("operational_status", "active")
status = db.get_state("operational_status")
Implementation: Uses psycopg2 (blocking I/O)
Async Client (asyncpg)¶
For async operations (used by dashboard API):
# In dashboard app.py:
pool = await asyncpg.create_pool(
host=settings.postgres_host,
port=settings.postgres_port,
user=settings.postgres_user,
password=settings.postgres_password,
database=settings.postgres_db,
min_size=2,
max_size=10,
)
# In route handlers:
async def my_endpoint(request: Request):
pool = request.app.state.pg_pool
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM episodic_memory LIMIT 10")
return {"memories": [dict(r) for r in result]}
Connection Pool: - Min connections: 2 - Max connections: 10 - Shared across dashboard process
pgvector Extension¶
Used for embedding-based similarity search:
Installation¶
Usage¶
# Store embeddings
from aegis.db import db
embedding = [0.1, 0.2, 0.3, ...] # 768-dim vector
db.execute(
"INSERT INTO knowledge_chunks (content, embedding) VALUES (%s, %s)",
(text, embedding)
)
# Similarity search (cosine distance)
results = db.fetch_all(
"""
SELECT content, 1 - (embedding <=> %s::vector) AS similarity
FROM knowledge_chunks
ORDER BY embedding <=> %s::vector
LIMIT 10
""",
(embedding, embedding)
)
Operators:
- <-> - L2 distance
- <#> - Inner product
- <=> - Cosine distance
Indexes:
Backup and Restore¶
Backup¶
# Full database dump
pg_dump -h localhost -U agent aegis > /home/agent/backups/aegis-$(date +%Y%m%d).sql
# Compressed dump
pg_dump -h localhost -U agent aegis | gzip > /home/agent/backups/aegis-$(date +%Y%m%d).sql.gz
# Schema only
pg_dump -h localhost -U agent --schema-only aegis > /home/agent/backups/aegis-schema.sql
# Data only
pg_dump -h localhost -U agent --data-only aegis > /home/agent/backups/aegis-data.sql
# Specific tables
pg_dump -h localhost -U agent --table=episodic_memory aegis > episodic_memory.sql
Restore¶
# Drop and recreate database
PGPASSWORD=agent psql -h localhost -U agent -d postgres -c "DROP DATABASE aegis;"
PGPASSWORD=agent psql -h localhost -U agent -d postgres -c "CREATE DATABASE aegis;"
# Restore from dump
PGPASSWORD=agent psql -h localhost -U agent -d aegis < /home/agent/backups/aegis-20260125.sql
# Restore compressed dump
gunzip -c /home/agent/backups/aegis-20260125.sql.gz | PGPASSWORD=agent psql -h localhost -U agent -d aegis
Maintenance¶
Vacuum¶
Regular maintenance to reclaim space and update statistics:
-- Full vacuum (blocking)
VACUUM FULL;
-- Analyze (update statistics)
ANALYZE;
-- Vacuum and analyze
VACUUM ANALYZE;
-- Specific table
VACUUM ANALYZE episodic_memory;
Automated: PostgreSQL runs autovacuum in background
Reindex¶
Rebuild indexes for performance:
Check Table Sizes¶
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 20;
Check Index Usage¶
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan ASC
LIMIT 20;
Monitoring¶
Connection Count¶
Active Queries¶
SELECT
pid,
usename,
application_name,
state,
query,
query_start
FROM pg_stat_activity
WHERE datname = 'aegis' AND state != 'idle'
ORDER BY query_start;
Long-Running Queries¶
SELECT
pid,
now() - query_start AS duration,
usename,
query
FROM pg_stat_activity
WHERE datname = 'aegis' AND state = 'active' AND now() - query_start > interval '1 minute'
ORDER BY duration DESC;
Database Size¶
Table Statistics¶
SELECT
schemaname,
tablename,
n_live_tup AS live_rows,
n_dead_tup AS dead_rows,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_live_tup DESC;
Performance Tuning¶
Connection Pooling¶
Use connection pooling to reduce overhead:
asyncpg (in dashboard):
pool = await asyncpg.create_pool(
host=settings.postgres_host,
min_size=2, # Keep 2 connections warm
max_size=10, # Allow burst to 10
max_inactive_connection_lifetime=300, # Close idle after 5min
)
psycopg2 (sync):
from psycopg2 import pool
connection_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=5,
host="localhost",
user="agent",
password="agent",
database="aegis"
)
Query Optimization¶
-
Use indexes: Create indexes on frequently filtered columns
-
Use EXPLAIN: Analyze query plans
-
Limit results: Always use
LIMITfor large tables -
Use appropriate data types: JSONB for structured data, TEXT for unstructured
-
Partition large tables: Consider partitioning by date for time-series data
Troubleshooting¶
Connection refused¶
-
Check PostgreSQL is running:
-
Check port is listening:
-
Check pg_hba.conf allows connections:
Too many connections¶
-
Check current connections:
-
Increase max_connections:
-
Terminate idle connections:
Slow queries¶
-
Enable slow query logging:
-
Check logs:
-
Analyze with EXPLAIN:
Disk full¶
-
Check disk usage:
-
Find largest tables:
-
Vacuum full to reclaim space:
Related Documentation¶
- dashboard.md - Uses asyncpg pool for API queries
- scheduler.md - Uses sync client for scheduled jobs
- falkordb.md - Complementary graph database