Background Jobs
Build background jobs for scheduled tasks, content ingestion, cleanup, and async processing.
On this page
Background jobs handle scheduled tasks, async processing, and maintenance operations. Jobs are registered via the jobs() hook on the Extension trait and executed by the built-in scheduler. For the complete hook reference, see Lifecycle Hooks.
Job Trait
Jobs implement the Job trait:
use systemprompt::traits::{Job, JobContext, JobResult};
use anyhow::Result;
pub struct MyJob;
#[async_trait::async_trait]
impl Job for MyJob {
fn name(&self) -> &'static str {
"my_job"
}
fn description(&self) -> &'static str {
"Performs scheduled maintenance"
}
fn schedule(&self) -> &'static str {
"0 0 * * * *" // Every hour
}
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
// Job logic here
Ok(JobResult::success())
}
}
// Register the job
systemprompt::traits::submit_job!(&MyJob);
Job Registration
Jobs are registered using the submit_job! macro:
// At module level
systemprompt::traits::submit_job!(&MyJob);
The job is automatically discovered at startup and added to the scheduler.
Cron Schedules
Jobs use 6-field cron expressions (seconds included):
| Field | Values | Description |
|---|---|---|
| Seconds | 0-59 | Second of minute |
| Minutes | 0-59 | Minute of hour |
| Hours | 0-23 | Hour of day |
| Day | 1-31 | Day of month |
| Month | 1-12 | Month of year |
| Weekday | 0-6 | Day of week (0=Sun) |
Common patterns:
"0 0 * * * *" // Every hour
"0 */15 * * * *" // Every 15 minutes
"0 0 0 * * *" // Daily at midnight
"0 0 3 * * *" // Daily at 3am
"0 0 */2 * * *" // Every 2 hours
"0 0 0 * * 0" // Weekly on Sunday
JobContext
The JobContext provides access to shared resources:
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
// Get database pool
let db = ctx
.db_pool::<DbPool>()
.ok_or_else(|| anyhow::anyhow!("Database not available"))?;
let pool = db.pool()
.ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;
// Use the pool
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM my_table")
.fetch_one(pool.as_ref())
.await?;
Ok(JobResult::success()
.with_stats(count.unwrap_or(0) as u64, 0))
}
JobResult
Return JobResult to report execution status:
// Success with no details
JobResult::success()
// Success with statistics
JobResult::success()
.with_stats(items_processed, items_failed)
.with_duration(duration_ms)
// Success with message
JobResult::success()
.with_message("Processed 42 items")
// Failure
JobResult::failure("Connection timeout")
Example: Content Ingestion Job
A complete example from the template:
use std::sync::Arc;
use anyhow::Result;
use sqlx::PgPool;
use systemprompt::database::DbPool;
use systemprompt::traits::{Job, JobContext, JobResult};
#[derive(Debug, Clone, Copy, Default)]
pub struct ContentIngestionJob;
#[async_trait::async_trait]
impl Job for ContentIngestionJob {
fn name(&self) -> &'static str {
"blog_content_ingestion"
}
fn description(&self) -> &'static str {
"Ingests markdown content from configured directories"
}
fn schedule(&self) -> &'static str {
"0 0 * * * *" // Hourly
}
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
let start = std::time::Instant::now();
let db = ctx.db_pool::<DbPool>()
.ok_or_else(|| anyhow::anyhow!("Database not available"))?;
let pool = db.pool()
.ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;
let mut processed = 0u64;
let mut errors = 0u64;
// Your ingestion logic here
// ...
let duration_ms = start.elapsed().as_millis() as u64;
Ok(JobResult::success()
.with_stats(processed, errors)
.with_duration(duration_ms))
}
}
systemprompt::traits::submit_job!(&ContentIngestionJob);
CLI Commands
List Jobs
systemprompt infra jobs list
Show Job Details
systemprompt infra jobs show blog_content_ingestion
Run Job Manually
systemprompt infra jobs run blog_content_ingestion
View Execution History
systemprompt infra jobs history --job blog_content_ingestion --limit 10
Enable/Disable Jobs
systemprompt infra jobs disable cleanup_job
systemprompt infra jobs enable cleanup_job
Common Job Patterns
Cleanup Jobs
pub struct CleanupJob;
#[async_trait::async_trait]
impl Job for CleanupJob {
fn name(&self) -> &'static str { "cleanup_old_records" }
fn description(&self) -> &'static str { "Deletes records older than 30 days" }
fn schedule(&self) -> &'static str { "0 0 3 * * *" } // 3am daily
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
let db = ctx.db_pool::<DbPool>().unwrap();
let pool = db.pool().unwrap();
let deleted = sqlx::query!(
"DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'"
)
.execute(pool.as_ref())
.await?
.rows_affected();
Ok(JobResult::success()
.with_stats(deleted, 0)
.with_message(format!("Deleted {} old records", deleted)))
}
}
Pipeline Jobs
Jobs that orchestrate other jobs:
pub struct PublishPipelineJob;
#[async_trait::async_trait]
impl Job for PublishPipelineJob {
fn name(&self) -> &'static str { "publish_pipeline" }
fn description(&self) -> &'static str { "Full content publishing pipeline" }
fn schedule(&self) -> &'static str { "0 */15 * * * *" }
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
// Run ingestion
ContentIngestionJob.execute(ctx).await?;
// Copy assets
CopyAssetsJob.execute(ctx).await?;
Ok(JobResult::success()
.with_message("Pipeline completed"))
}
}
Disabled Jobs (On-Demand Only)
Set an empty schedule for manual-only jobs:
fn schedule(&self) -> &'static str {
"" // No schedule - run manually only
}
Project Structure
extensions/web/src/
├── jobs/
│ ├── mod.rs
│ ├── ingestion.rs
│ ├── cleanup.rs
│ └── publish.rs
└── extension.rs
// jobs/mod.rs
mod ingestion;
mod cleanup;
mod publish;
pub use ingestion::ContentIngestionJob;
pub use cleanup::CleanupJob;
pub use publish::PublishPipelineJob;