Scheduler Service¶
The Aegis Scheduler is a background daemon that executes recurring tasks using APScheduler. It handles cron jobs, health checks, cleanup operations, and scheduled intelligence gathering.
Current job inventory
This service page explains scheduler architecture. The live 2026-05-23 inventory of cron, systemd, Docker scheduler, Hermes, and Claude automations is in Current Automation.
Overview¶
- Framework: APScheduler (BackgroundScheduler)
- Container:
aegis-scheduler - Command: Infinite sleep after starting scheduler
- Dependencies: PostgreSQL, Docker socket, Playwright
Architecture¶
Main Scheduler¶
Location: /home/agent/projects/aegis-core/aegis/scheduler.py
from aegis.scheduler import scheduler
scheduler.start() # Start background scheduler
scheduler.stop() # Graceful shutdown
scheduler.list_jobs() # List all scheduled jobs
Job Configuration¶
scheduler = BackgroundScheduler(
job_defaults={
'coalesce': True, # Merge missed runs
'max_instances': 1, # Only one instance at a time
'misfire_grace_time': 300, # 5 minute grace period
}
)
Scheduled Jobs¶
Core System Jobs¶
| Job ID | Trigger | Schedule | Function | Description |
|---|---|---|---|---|
health_check |
Interval | Every 5 minutes | health_check() |
Database connectivity check |
docker_health_check |
Interval | Every 5 minutes | docker_health_check() |
Container health + auto-remediation |
morning_status |
Cron | 06:00 UTC | morning_status() |
Daily morning status update |
evening_summary |
Cron | 22:00 UTC | evening_summary() |
Daily digest generation |
memory_cleanup |
Cron | 00:30 UTC | memory_cleanup() |
Delete old low-importance memories |
Module-Specific Jobs¶
Jobs are added by scheduler modules during initialization:
| Module | Location | Jobs Added |
|---|---|---|
aegis.email.scheduled |
Email triage, draft checking | |
| Monitor | aegis.monitor.scheduler |
Website monitoring, screenshots |
| Anomaly | aegis.infra.anomaly.scheduler |
Infrastructure anomaly detection |
| Discord | aegis.discord.scheduler |
Poll #tasks channel for commands |
| Proactive | aegis.proactive.scheduler |
Pattern learning, opportunity detection |
| Orchestration | aegis.orchestration.scheduler |
Agent health checks, resource rebalancing |
| Revenue | aegis.revenue.scheduler |
Demand detection, lead generation |
| Memory | aegis.memory.scheduler |
Knowledge graph sync (episodic → Graphiti) |
| Scout | aegis.scout.scheduler |
Trend discovery, Hacker News scanning |
| News | aegis.news.scheduler |
Breaking news, regional news aggregation |
| Acquisition | aegis.acquisition.scheduler |
Customer reply automation, closing |
Job Details¶
Health Check (health_check)¶
Frequency: Every 5 minutes
Purpose: Verify database connectivity
Actions:
1. Execute SELECT 1 against PostgreSQL
2. Update last_health_check state
3. Update db_healthy state
4. Record critical event if database unreachable
Code:
def health_check(self):
try:
db.fetch_one("SELECT 1")
db_healthy = True
except Exception as e:
db_healthy = False
logger.error("health_check_db_failed", error=str(e))
db.set_state("last_health_check", datetime.now(timezone.utc).isoformat())
db.set_state("db_healthy", db_healthy)
Docker Health Check (docker_health_check)¶
Frequency: Every 5 minutes
Purpose: Monitor container health and auto-remediate failures
Three-Strike Protocol: 1. Strike 1: Log unhealthy status 2. Strike 2: Attempt container restart 3. Strike 3: Escalate to Discord/Telegram, await human intervention
Actions: 1. Check health status of all containers 2. Track strike count per container 3. Auto-restart unhealthy containers (if policy allows) 4. Record escalations as critical events
Code:
from aegis.infra.docker_monitor import DockerMonitor
docker_monitor = DockerMonitor()
result = docker_monitor.run_health_cycle()
# result contains:
# - healthy: count of healthy containers
# - unhealthy: count of unhealthy containers
# - actions_taken: number of restart attempts
# - escalations: list of containers escalated to human
Morning Status (morning_status)¶
Schedule: 06:00 UTC (daily)
Purpose: Generate morning status report
Data Gathered: - Yesterday's task statistics - Event count from previous day - Pending tasks
Output: Recorded to episodic memory with morning_status event type
Evening Summary (evening_summary)¶
Schedule: 22:00 UTC (daily)
Purpose: Generate comprehensive daily digest
Process:
1. Use DigestGenerator to analyze day's activity
2. Count tasks completed, events recorded, critical issues
3. Save digest to journal (~/memory/journal/YYYY-MM-DD.md)
4. Record to episodic memory
Code:
from aegis.digest.generator import DigestGenerator
generator = DigestGenerator()
digest = generator.generate()
digest_path = generator.save_to_journal(digest)
Digest Contents: - Tasks completed vs pending - Events recorded (total + critical) - Key decisions made - Errors encountered - Tomorrow's priorities
Memory Cleanup (memory_cleanup)¶
Schedule: 00:30 UTC (daily)
Purpose: Delete old low-importance episodic memories
Retention Policy: Keep last 30 days of low-importance memories
Query:
Note: Medium, high, and critical memories are never deleted automatically.
Modular Job Registration¶
Each subsystem registers its own jobs via setup_*_scheduler() functions:
Example: Email Scheduler¶
# aegis/email/scheduled.py
def setup_email_scheduler(scheduler):
"""Register email-related scheduled jobs."""
# Email triage every hour
scheduler.scheduler.add_job(
check_unread_emails,
IntervalTrigger(hours=1),
id='email_triage',
name='Email Triage',
)
# Draft reminder every 6 hours
scheduler.scheduler.add_job(
check_stale_drafts,
IntervalTrigger(hours=6),
id='draft_reminder',
name='Draft Reminder',
)
Example: Scout Scheduler¶
# aegis/scout/scheduler.py
def setup_scout_scheduler(scheduler):
"""Register scout jobs for opportunity discovery."""
# Trend discovery every 4 hours
scheduler.scheduler.add_job(
discover_trends,
IntervalTrigger(hours=4),
id='scout_trends',
name='Scout Trend Discovery',
)
# Hacker News scan every 2 hours
scheduler.scheduler.add_job(
scan_hacker_news,
IntervalTrigger(hours=2),
id='scout_hn',
name='Scout HN Scan',
)
Configuration¶
Environment Variables¶
# Database (required)
POSTGRES_HOST=host.docker.internal
POSTGRES_PORT=5432
POSTGRES_USER=agent
POSTGRES_PASSWORD=agent
POSTGRES_DB=aegis
# LLM APIs (for job execution)
ZAI_API_KEY=<key>
ZAI_BASE_URL=https://api.z.ai/api/anthropic
CLAUDE_CODE_OAUTH_TOKEN=<token>
# Communication channels
DISCORD_ALERTS_CHANNEL=<channel_id>
DISCORD_JOURNAL_CHANNEL=<channel_id>
# Playwright for screenshots
PLAYWRIGHT_URL=http://playwright:3000
# Ollama for local inference
OLLAMA_HOST=http://host.docker.internal:11434
Volumes¶
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro # Docker API access
- /home/agent/projects/aegis-core/data/monitor/screenshots:/tmp/aegis/monitor/screenshots
Extra Hosts¶
Job Management¶
List All Jobs¶
from aegis.scheduler import scheduler
jobs = scheduler.list_jobs()
for job in jobs:
print(f"{job['id']}: {job['name']} (next run: {job['next_run']})")
Add Custom Job¶
from apscheduler.triggers.cron import CronTrigger
def my_task():
print("Running custom task")
scheduler.add_job(
func=my_task,
trigger="cron",
job_id="my_custom_job",
hour=8,
minute=30
)
Remove Job¶
Pause/Resume Job¶
Monitoring¶
Container Logs¶
Job Execution Logs¶
All jobs use structured logging:
2026-01-25T06:00:00Z event=morning_status_generated pending_tasks=5
2026-01-25T12:00:00Z event=health_check_complete db_healthy=true
2026-01-25T22:00:00Z event=evening_summary_generated tasks_completed=12 events=45
Episodic Memory¶
Job executions are recorded to episodic_memory table:
SELECT event_type, summary, timestamp
FROM episodic_memory
WHERE event_type IN ('morning_status', 'evening_summary', 'docker_health_check')
ORDER BY timestamp DESC
LIMIT 10;
Error Handling¶
Misfire Grace Time¶
If a job misses its scheduled time (e.g., scheduler was down), it has a 5-minute grace period to execute late:
Coalescing¶
Multiple missed runs are merged into a single execution:
Max Instances¶
Only one instance of each job runs at a time:
Exception Handling¶
Jobs should handle exceptions internally:
def my_job():
try:
# Job logic
pass
except Exception as e:
logger.error("job_failed", job_id="my_job", error=str(e))
EpisodicMemory.record(
event_type="job_error",
summary=f"Job failed: {str(e)}",
importance="high"
)
Development¶
Testing Jobs Locally¶
# Start scheduler container
docker compose up -d scheduler
# Trigger a job manually (Python shell)
docker compose exec scheduler python -c "
from aegis.scheduler import scheduler
scheduler.start()
scheduler.morning_status()
"
Adding a New Job Module¶
-
Create
aegis/my_module/scheduler.py: -
Import in
aegis/scheduler.py: -
Call during initialization:
Debugging Schedule Issues¶
Enable debug logging:
View next run times:
Troubleshooting¶
Job not running¶
-
Check if scheduler is started:
-
Check job is registered:
-
Check for exceptions in logs:
Database connection issues¶
Jobs may fail if PostgreSQL is unreachable. Check:
Docker socket permission denied¶
Scheduler needs read access to Docker socket for health checks:
Verify:
Related Documentation¶
- dashboard.md - FastAPI dashboard (starts scheduler)
- postgresql.md - Database used by jobs
- traefik.md - Reverse proxy for service routing