Skip to content

Scheduler Service

The Aegis Scheduler is a background daemon that executes recurring tasks using APScheduler. It handles cron jobs, health checks, cleanup operations, and scheduled intelligence gathering.

Overview

  • Framework: APScheduler (BackgroundScheduler)
  • Container: aegis-scheduler
  • Command: Infinite sleep after starting scheduler
  • Dependencies: PostgreSQL, Docker socket, Playwright

Architecture

Main Scheduler

Location: /home/agent/projects/aegis-core/aegis/scheduler.py

from aegis.scheduler import scheduler

scheduler.start()  # Start background scheduler
scheduler.stop()   # Graceful shutdown
scheduler.list_jobs()  # List all scheduled jobs

Job Configuration

scheduler = BackgroundScheduler(
    job_defaults={
        'coalesce': True,           # Merge missed runs
        'max_instances': 1,         # Only one instance at a time
        'misfire_grace_time': 300,  # 5 minute grace period
    }
)

Scheduled Jobs

Core System Jobs

Job ID Trigger Schedule Function Description
health_check Interval Every 5 minutes health_check() Database connectivity check
docker_health_check Interval Every 5 minutes docker_health_check() Container health + auto-remediation
morning_status Cron 06:00 UTC morning_status() Daily morning status update
evening_summary Cron 22:00 UTC evening_summary() Daily digest generation
memory_cleanup Cron 00:30 UTC memory_cleanup() Delete old low-importance memories

Module-Specific Jobs

Jobs are added by scheduler modules during initialization:

Module Location Jobs Added
Email aegis.email.scheduled Email triage, draft checking
Monitor aegis.monitor.scheduler Website monitoring, screenshots
Anomaly aegis.infra.anomaly.scheduler Infrastructure anomaly detection
Discord aegis.discord.scheduler Poll #tasks channel for commands
Proactive aegis.proactive.scheduler Pattern learning, opportunity detection
Orchestration aegis.orchestration.scheduler Agent health checks, resource rebalancing
Revenue aegis.revenue.scheduler Demand detection, lead generation
Memory aegis.memory.scheduler Knowledge graph sync (episodic → Graphiti)
Scout aegis.scout.scheduler Trend discovery, Hacker News scanning
News aegis.news.scheduler Breaking news, regional news aggregation
Acquisition aegis.acquisition.scheduler Customer reply automation, closing
Market aegis.market.scheduler Market data collection, screening alerts

Job Details

Health Check (health_check)

Frequency: Every 5 minutes

Purpose: Verify database connectivity

Actions: 1. Execute SELECT 1 against PostgreSQL 2. Update last_health_check state 3. Update db_healthy state 4. Record critical event if database unreachable

Code:

def health_check(self):
    try:
        db.fetch_one("SELECT 1")
        db_healthy = True
    except Exception as e:
        db_healthy = False
        logger.error("health_check_db_failed", error=str(e))

    db.set_state("last_health_check", datetime.now(timezone.utc).isoformat())
    db.set_state("db_healthy", db_healthy)

Docker Health Check (docker_health_check)

Frequency: Every 5 minutes

Purpose: Monitor container health and auto-remediate failures

Three-Strike Protocol: 1. Strike 1: Log unhealthy status 2. Strike 2: Attempt container restart 3. Strike 3: Escalate to Discord/Telegram, await human intervention

Actions: 1. Check health status of all containers 2. Track strike count per container 3. Auto-restart unhealthy containers (if policy allows) 4. Record escalations as critical events

Code:

from aegis.infra.docker_monitor import DockerMonitor

docker_monitor = DockerMonitor()
result = docker_monitor.run_health_cycle()

# result contains:
# - healthy: count of healthy containers
# - unhealthy: count of unhealthy containers
# - actions_taken: number of restart attempts
# - escalations: list of containers escalated to human

Morning Status (morning_status)

Schedule: 06:00 UTC (daily)

Purpose: Generate morning status report

Data Gathered: - Yesterday's task statistics - Event count from previous day - Pending tasks

Output: Recorded to episodic memory with morning_status event type

Evening Summary (evening_summary)

Schedule: 22:00 UTC (daily)

Purpose: Generate comprehensive daily digest

Process: 1. Use DigestGenerator to analyze day's activity 2. Count tasks completed, events recorded, critical issues 3. Save digest to journal (~/memory/journal/YYYY-MM-DD.md) 4. Record to episodic memory

Code:

from aegis.digest.generator import DigestGenerator

generator = DigestGenerator()
digest = generator.generate()
digest_path = generator.save_to_journal(digest)

Digest Contents: - Tasks completed vs pending - Events recorded (total + critical) - Key decisions made - Errors encountered - Tomorrow's priorities

Memory Cleanup (memory_cleanup)

Schedule: 00:30 UTC (daily)

Purpose: Delete old low-importance episodic memories

Retention Policy: Keep last 30 days of low-importance memories

Query:

DELETE FROM episodic_memory
WHERE importance = 'low'
  AND timestamp < NOW() - INTERVAL '30 days'

Note: Medium, high, and critical memories are never deleted automatically.

Modular Job Registration

Each subsystem registers its own jobs via setup_*_scheduler() functions:

Example: Email Scheduler

# aegis/email/scheduled.py
def setup_email_scheduler(scheduler):
    """Register email-related scheduled jobs."""

    # Email triage every hour
    scheduler.scheduler.add_job(
        check_unread_emails,
        IntervalTrigger(hours=1),
        id='email_triage',
        name='Email Triage',
    )

    # Draft reminder every 6 hours
    scheduler.scheduler.add_job(
        check_stale_drafts,
        IntervalTrigger(hours=6),
        id='draft_reminder',
        name='Draft Reminder',
    )

Example: Scout Scheduler

# aegis/scout/scheduler.py
def setup_scout_scheduler(scheduler):
    """Register scout jobs for opportunity discovery."""

    # Trend discovery every 4 hours
    scheduler.scheduler.add_job(
        discover_trends,
        IntervalTrigger(hours=4),
        id='scout_trends',
        name='Scout Trend Discovery',
    )

    # Hacker News scan every 2 hours
    scheduler.scheduler.add_job(
        scan_hacker_news,
        IntervalTrigger(hours=2),
        id='scout_hn',
        name='Scout HN Scan',
    )

Configuration

Environment Variables

# Database (required)
POSTGRES_HOST=host.docker.internal
POSTGRES_PORT=5432
POSTGRES_USER=agent
POSTGRES_PASSWORD=agent
POSTGRES_DB=aegis

# LLM APIs (for job execution)
ZAI_API_KEY=<key>
ZAI_BASE_URL=https://api.z.ai/api/anthropic
CLAUDE_CODE_OAUTH_TOKEN=<token>

# Communication channels
DISCORD_ALERTS_CHANNEL=<channel_id>
DISCORD_JOURNAL_CHANNEL=<channel_id>

# Playwright for screenshots
PLAYWRIGHT_URL=http://playwright:3000

# Ollama for local inference
OLLAMA_HOST=http://host.docker.internal:11434

Volumes

volumes:
  - /var/run/docker.sock:/var/run/docker.sock:ro  # Docker API access
  - /home/agent/projects/aegis-core/data/monitor/screenshots:/tmp/aegis/monitor/screenshots

Extra Hosts

extra_hosts:
  - "host.docker.internal:host-gateway"  # Access host services

Job Management

List All Jobs

from aegis.scheduler import scheduler

jobs = scheduler.list_jobs()
for job in jobs:
    print(f"{job['id']}: {job['name']} (next run: {job['next_run']})")

Add Custom Job

from apscheduler.triggers.cron import CronTrigger

def my_task():
    print("Running custom task")

scheduler.add_job(
    func=my_task,
    trigger="cron",
    job_id="my_custom_job",
    hour=8,
    minute=30
)

Remove Job

scheduler.scheduler.remove_job("job_id")

Pause/Resume Job

scheduler.scheduler.pause_job("job_id")
scheduler.scheduler.resume_job("job_id")

Monitoring

Container Logs

docker compose logs -f scheduler

Job Execution Logs

All jobs use structured logging:

2026-01-25T06:00:00Z event=morning_status_generated pending_tasks=5
2026-01-25T12:00:00Z event=health_check_complete db_healthy=true
2026-01-25T22:00:00Z event=evening_summary_generated tasks_completed=12 events=45

Episodic Memory

Job executions are recorded to episodic_memory table:

SELECT event_type, summary, timestamp
FROM episodic_memory
WHERE event_type IN ('morning_status', 'evening_summary', 'docker_health_check')
ORDER BY timestamp DESC
LIMIT 10;

Error Handling

Misfire Grace Time

If a job misses its scheduled time (e.g., scheduler was down), it has a 5-minute grace period to execute late:

'misfire_grace_time': 300  # 5 minutes

Coalescing

Multiple missed runs are merged into a single execution:

'coalesce': True

Max Instances

Only one instance of each job runs at a time:

'max_instances': 1

Exception Handling

Jobs should handle exceptions internally:

def my_job():
    try:
        # Job logic
        pass
    except Exception as e:
        logger.error("job_failed", job_id="my_job", error=str(e))
        EpisodicMemory.record(
            event_type="job_error",
            summary=f"Job failed: {str(e)}",
            importance="high"
        )

Development

Testing Jobs Locally

# Start scheduler container
docker compose up -d scheduler

# Trigger a job manually (Python shell)
docker compose exec scheduler python -c "
from aegis.scheduler import scheduler
scheduler.start()
scheduler.morning_status()
"

Adding a New Job Module

  1. Create aegis/my_module/scheduler.py:

    def setup_my_module_scheduler(scheduler):
        scheduler.scheduler.add_job(
            my_job_function,
            IntervalTrigger(hours=1),
            id='my_module_job',
            name='My Module Job',
        )
    

  2. Import in aegis/scheduler.py:

    from aegis.my_module.scheduler import setup_my_module_scheduler
    

  3. Call during initialization:

    def _setup_default_jobs(self):
        # ... existing jobs ...
        setup_my_module_scheduler(self)
    

Debugging Schedule Issues

Enable debug logging:

import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

View next run times:

for job in scheduler.scheduler.get_jobs():
    print(f"{job.id}: next run at {job.next_run_time}")

Troubleshooting

Job not running

  1. Check if scheduler is started:

    docker compose ps scheduler
    

  2. Check job is registered:

    scheduler.list_jobs()
    

  3. Check for exceptions in logs:

    docker compose logs scheduler | grep ERROR
    

Database connection issues

Jobs may fail if PostgreSQL is unreachable. Check:

docker compose exec scheduler ping -c 1 host.docker.internal

Docker socket permission denied

Scheduler needs read access to Docker socket for health checks:

volumes:
  - /var/run/docker.sock:/var/run/docker.sock:ro

Verify:

docker compose exec scheduler docker ps