Background Jobs

Build background jobs for scheduled tasks, content ingestion, cleanup, and async processing.

Background jobs handle scheduled tasks, async processing, and maintenance operations. Jobs are registered via the jobs() hook on the Extension trait and executed by the built-in scheduler. For the complete hook reference, see Lifecycle Hooks.

Job Trait

Jobs implement the Job trait:

use systemprompt::traits::{Job, JobContext, JobResult};
use anyhow::Result;

pub struct MyJob;

#[async_trait::async_trait]
impl Job for MyJob {
    fn name(&self) -> &'static str {
        "my_job"
    }

    fn description(&self) -> &'static str {
        "Performs scheduled maintenance"
    }

    fn schedule(&self) -> &'static str {
        "0 0 * * * *"  // Every hour
    }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        // Job logic here
        Ok(JobResult::success())
    }
}

// Register the job
systemprompt::traits::submit_job!(&MyJob);

Job Registration

Jobs are registered using the submit_job! macro:

// At module level
systemprompt::traits::submit_job!(&MyJob);

The job is automatically discovered at startup and added to the scheduler.

Cron Schedules

Jobs use 6-field cron expressions (seconds included):

Field Values Description
Seconds 0-59 Second of minute
Minutes 0-59 Minute of hour
Hours 0-23 Hour of day
Day 1-31 Day of month
Month 1-12 Month of year
Weekday 0-6 Day of week (0=Sun)

Common patterns:

"0 0 * * * *"      // Every hour
"0 */15 * * * *"   // Every 15 minutes
"0 0 0 * * *"      // Daily at midnight
"0 0 3 * * *"      // Daily at 3am
"0 0 */2 * * *"    // Every 2 hours
"0 0 0 * * 0"      // Weekly on Sunday

JobContext

The JobContext provides access to shared resources:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    // Get database pool
    let db = ctx
        .db_pool::<DbPool>()
        .ok_or_else(|| anyhow::anyhow!("Database not available"))?;

    let pool = db.pool()
        .ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;

    // Use the pool
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM my_table")
        .fetch_one(pool.as_ref())
        .await?;

    Ok(JobResult::success()
        .with_stats(count.unwrap_or(0) as u64, 0))
}

JobResult

Return JobResult to report execution status:

// Success with no details
JobResult::success()

// Success with statistics
JobResult::success()
    .with_stats(items_processed, items_failed)
    .with_duration(duration_ms)

// Success with message
JobResult::success()
    .with_message("Processed 42 items")

// Failure
JobResult::failure("Connection timeout")

Example: Content Ingestion Job

A complete example from the template:

use std::sync::Arc;
use anyhow::Result;
use sqlx::PgPool;
use systemprompt::database::DbPool;
use systemprompt::traits::{Job, JobContext, JobResult};

#[derive(Debug, Clone, Copy, Default)]
pub struct ContentIngestionJob;

#[async_trait::async_trait]
impl Job for ContentIngestionJob {
    fn name(&self) -> &'static str {
        "blog_content_ingestion"
    }

    fn description(&self) -> &'static str {
        "Ingests markdown content from configured directories"
    }

    fn schedule(&self) -> &'static str {
        "0 0 * * * *"  // Hourly
    }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        let start = std::time::Instant::now();
        let db = ctx.db_pool::<DbPool>()
            .ok_or_else(|| anyhow::anyhow!("Database not available"))?;
        let pool = db.pool()
            .ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;

        let mut processed = 0u64;
        let mut errors = 0u64;

        // Your ingestion logic here
        // ...

        let duration_ms = start.elapsed().as_millis() as u64;

        Ok(JobResult::success()
            .with_stats(processed, errors)
            .with_duration(duration_ms))
    }
}

systemprompt::traits::submit_job!(&ContentIngestionJob);

CLI Commands

List Jobs

systemprompt infra jobs list

Show Job Details

systemprompt infra jobs show blog_content_ingestion

Run Job Manually

systemprompt infra jobs run blog_content_ingestion

View Execution History

systemprompt infra jobs history --job blog_content_ingestion --limit 10

Enable/Disable Jobs

systemprompt infra jobs disable cleanup_job
systemprompt infra jobs enable cleanup_job

Common Job Patterns

Cleanup Jobs

pub struct CleanupJob;

#[async_trait::async_trait]
impl Job for CleanupJob {
    fn name(&self) -> &'static str { "cleanup_old_records" }
    fn description(&self) -> &'static str { "Deletes records older than 30 days" }
    fn schedule(&self) -> &'static str { "0 0 3 * * *" }  // 3am daily

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        let db = ctx.db_pool::<DbPool>().unwrap();
        let pool = db.pool().unwrap();

        let deleted = sqlx::query!(
            "DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'"
        )
        .execute(pool.as_ref())
        .await?
        .rows_affected();

        Ok(JobResult::success()
            .with_stats(deleted, 0)
            .with_message(format!("Deleted {} old records", deleted)))
    }
}

Pipeline Jobs

Jobs that orchestrate other jobs:

pub struct PublishPipelineJob;

#[async_trait::async_trait]
impl Job for PublishPipelineJob {
    fn name(&self) -> &'static str { "publish_pipeline" }
    fn description(&self) -> &'static str { "Full content publishing pipeline" }
    fn schedule(&self) -> &'static str { "0 */15 * * * *" }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        // Run ingestion
        ContentIngestionJob.execute(ctx).await?;

        // Copy assets
        CopyAssetsJob.execute(ctx).await?;

        Ok(JobResult::success()
            .with_message("Pipeline completed"))
    }
}

Disabled Jobs (On-Demand Only)

Set an empty schedule for manual-only jobs:

fn schedule(&self) -> &'static str {
    ""  // No schedule - run manually only
}

Project Structure

extensions/web/src/
├── jobs/
│   ├── mod.rs
│   ├── ingestion.rs
│   ├── cleanup.rs
│   └── publish.rs
└── extension.rs
// jobs/mod.rs
mod ingestion;
mod cleanup;
mod publish;

pub use ingestion::ContentIngestionJob;
pub use cleanup::CleanupJob;
pub use publish::PublishPipelineJob;