Scheduler Service¶
The Aegis Scheduler is a background daemon that executes recurring tasks using APScheduler. It handles cron jobs, health checks, cleanup operations, and scheduled intelligence gathering.
Overview¶
- Framework: APScheduler (BackgroundScheduler)
- Container:
aegis-scheduler - Command: Infinite sleep after starting scheduler
- Dependencies: PostgreSQL, Docker socket, Playwright
Architecture¶
Main Scheduler¶
Location: /home/agent/projects/aegis-core/aegis/scheduler.py
from aegis.scheduler import scheduler
scheduler.start() # Start background scheduler
scheduler.stop() # Graceful shutdown
scheduler.list_jobs() # List all scheduled jobs
Job Configuration¶
scheduler = BackgroundScheduler(
job_defaults={
'coalesce': True, # Merge missed runs
'max_instances': 1, # Only one instance at a time
'misfire_grace_time': 300, # 5 minute grace period
}
)
Scheduled Jobs¶
Core System Jobs¶
| Job ID | Trigger | Schedule | Function | Description |
|---|---|---|---|---|
health_check |
Interval | Every 5 minutes | health_check() |
Database connectivity check |
docker_health_check |
Interval | Every 5 minutes | docker_health_check() |
Container health + auto-remediation |
morning_status |
Cron | 06:00 UTC | morning_status() |
Daily morning status update |
evening_summary |
Cron | 22:00 UTC | evening_summary() |
Daily digest generation |
memory_cleanup |
Cron | 00:30 UTC | memory_cleanup() |
Delete old low-importance memories |
Module-Specific Jobs¶
Jobs are added by scheduler modules during initialization:
| Module | Location | Jobs Added |
|---|---|---|
aegis.email.scheduled |
Email triage, draft checking | |
| Monitor | aegis.monitor.scheduler |
Website monitoring, screenshots |
| Anomaly | aegis.infra.anomaly.scheduler |
Infrastructure anomaly detection |
| Discord | aegis.discord.scheduler |
Poll #tasks channel for commands |
| Proactive | aegis.proactive.scheduler |
Pattern learning, opportunity detection |
| Orchestration | aegis.orchestration.scheduler |
Agent health checks, resource rebalancing |
| Revenue | aegis.revenue.scheduler |
Demand detection, lead generation |
| Memory | aegis.memory.scheduler |
Knowledge graph sync (episodic → Graphiti) |
| Scout | aegis.scout.scheduler |
Trend discovery, Hacker News scanning |
| News | aegis.news.scheduler |
Breaking news, regional news aggregation |
| Acquisition | aegis.acquisition.scheduler |
Customer reply automation, closing |
| Market | aegis.market.scheduler |
Market data collection, screening alerts |
Job Details¶
Health Check (health_check)¶
Frequency: Every 5 minutes
Purpose: Verify database connectivity
Actions:
1. Execute SELECT 1 against PostgreSQL
2. Update last_health_check state
3. Update db_healthy state
4. Record critical event if database unreachable
Code:
def health_check(self):
try:
db.fetch_one("SELECT 1")
db_healthy = True
except Exception as e:
db_healthy = False
logger.error("health_check_db_failed", error=str(e))
db.set_state("last_health_check", datetime.now(timezone.utc).isoformat())
db.set_state("db_healthy", db_healthy)
Docker Health Check (docker_health_check)¶
Frequency: Every 5 minutes
Purpose: Monitor container health and auto-remediate failures
Three-Strike Protocol: 1. Strike 1: Log unhealthy status 2. Strike 2: Attempt container restart 3. Strike 3: Escalate to Discord/Telegram, await human intervention
Actions: 1. Check health status of all containers 2. Track strike count per container 3. Auto-restart unhealthy containers (if policy allows) 4. Record escalations as critical events
Code:
from aegis.infra.docker_monitor import DockerMonitor
docker_monitor = DockerMonitor()
result = docker_monitor.run_health_cycle()
# result contains:
# - healthy: count of healthy containers
# - unhealthy: count of unhealthy containers
# - actions_taken: number of restart attempts
# - escalations: list of containers escalated to human
Morning Status (morning_status)¶
Schedule: 06:00 UTC (daily)
Purpose: Generate morning status report
Data Gathered: - Yesterday's task statistics - Event count from previous day - Pending tasks
Output: Recorded to episodic memory with morning_status event type
Evening Summary (evening_summary)¶
Schedule: 22:00 UTC (daily)
Purpose: Generate comprehensive daily digest
Process:
1. Use DigestGenerator to analyze day's activity
2. Count tasks completed, events recorded, critical issues
3. Save digest to journal (~/memory/journal/YYYY-MM-DD.md)
4. Record to episodic memory
Code:
from aegis.digest.generator import DigestGenerator
generator = DigestGenerator()
digest = generator.generate()
digest_path = generator.save_to_journal(digest)
Digest Contents: - Tasks completed vs pending - Events recorded (total + critical) - Key decisions made - Errors encountered - Tomorrow's priorities
Memory Cleanup (memory_cleanup)¶
Schedule: 00:30 UTC (daily)
Purpose: Delete old low-importance episodic memories
Retention Policy: Keep last 30 days of low-importance memories
Query:
Note: Medium, high, and critical memories are never deleted automatically.
Modular Job Registration¶
Each subsystem registers its own jobs via setup_*_scheduler() functions:
Example: Email Scheduler¶
# aegis/email/scheduled.py
def setup_email_scheduler(scheduler):
"""Register email-related scheduled jobs."""
# Email triage every hour
scheduler.scheduler.add_job(
check_unread_emails,
IntervalTrigger(hours=1),
id='email_triage',
name='Email Triage',
)
# Draft reminder every 6 hours
scheduler.scheduler.add_job(
check_stale_drafts,
IntervalTrigger(hours=6),
id='draft_reminder',
name='Draft Reminder',
)
Example: Scout Scheduler¶
# aegis/scout/scheduler.py
def setup_scout_scheduler(scheduler):
"""Register scout jobs for opportunity discovery."""
# Trend discovery every 4 hours
scheduler.scheduler.add_job(
discover_trends,
IntervalTrigger(hours=4),
id='scout_trends',
name='Scout Trend Discovery',
)
# Hacker News scan every 2 hours
scheduler.scheduler.add_job(
scan_hacker_news,
IntervalTrigger(hours=2),
id='scout_hn',
name='Scout HN Scan',
)
Configuration¶
Environment Variables¶
# Database (required)
POSTGRES_HOST=host.docker.internal
POSTGRES_PORT=5432
POSTGRES_USER=agent
POSTGRES_PASSWORD=agent
POSTGRES_DB=aegis
# LLM APIs (for job execution)
ZAI_API_KEY=<key>
ZAI_BASE_URL=https://api.z.ai/api/anthropic
CLAUDE_CODE_OAUTH_TOKEN=<token>
# Communication channels
DISCORD_ALERTS_CHANNEL=<channel_id>
DISCORD_JOURNAL_CHANNEL=<channel_id>
# Playwright for screenshots
PLAYWRIGHT_URL=http://playwright:3000
# Ollama for local inference
OLLAMA_HOST=http://host.docker.internal:11434
Volumes¶
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro # Docker API access
- /home/agent/projects/aegis-core/data/monitor/screenshots:/tmp/aegis/monitor/screenshots
Extra Hosts¶
Job Management¶
List All Jobs¶
from aegis.scheduler import scheduler
jobs = scheduler.list_jobs()
for job in jobs:
print(f"{job['id']}: {job['name']} (next run: {job['next_run']})")
Add Custom Job¶
from apscheduler.triggers.cron import CronTrigger
def my_task():
print("Running custom task")
scheduler.add_job(
func=my_task,
trigger="cron",
job_id="my_custom_job",
hour=8,
minute=30
)
Remove Job¶
Pause/Resume Job¶
Monitoring¶
Container Logs¶
Job Execution Logs¶
All jobs use structured logging:
2026-01-25T06:00:00Z event=morning_status_generated pending_tasks=5
2026-01-25T12:00:00Z event=health_check_complete db_healthy=true
2026-01-25T22:00:00Z event=evening_summary_generated tasks_completed=12 events=45
Episodic Memory¶
Job executions are recorded to episodic_memory table:
SELECT event_type, summary, timestamp
FROM episodic_memory
WHERE event_type IN ('morning_status', 'evening_summary', 'docker_health_check')
ORDER BY timestamp DESC
LIMIT 10;
Error Handling¶
Misfire Grace Time¶
If a job misses its scheduled time (e.g., scheduler was down), it has a 5-minute grace period to execute late:
Coalescing¶
Multiple missed runs are merged into a single execution:
Max Instances¶
Only one instance of each job runs at a time:
Exception Handling¶
Jobs should handle exceptions internally:
def my_job():
try:
# Job logic
pass
except Exception as e:
logger.error("job_failed", job_id="my_job", error=str(e))
EpisodicMemory.record(
event_type="job_error",
summary=f"Job failed: {str(e)}",
importance="high"
)
Development¶
Testing Jobs Locally¶
# Start scheduler container
docker compose up -d scheduler
# Trigger a job manually (Python shell)
docker compose exec scheduler python -c "
from aegis.scheduler import scheduler
scheduler.start()
scheduler.morning_status()
"
Adding a New Job Module¶
-
Create
aegis/my_module/scheduler.py: -
Import in
aegis/scheduler.py: -
Call during initialization:
Debugging Schedule Issues¶
Enable debug logging:
View next run times:
Troubleshooting¶
Job not running¶
-
Check if scheduler is started:
-
Check job is registered:
-
Check for exceptions in logs:
Database connection issues¶
Jobs may fail if PostgreSQL is unreachable. Check:
Docker socket permission denied¶
Scheduler needs read access to Docker socket for health checks:
Verify:
Related Documentation¶
- dashboard.md - FastAPI dashboard (starts scheduler)
- postgresql.md - Database used by jobs
- traefik.md - Reverse proxy for service routing