agentic-automation-lib
v1.0.12
Published
A production-grade library for agentic automation with persistent task storage, RabbitMQ queuing, CRON scheduling, and worker support.
Downloads
73
Readme
Agentic Automation Library
A production-grade library for agentic automation with persistent task storage, RabbitMQ queuing, scheduling, and worker support.
Features
- 🔄 Recurring Tasks: Schedule tasks that repeat at specified intervals
- 🕒 Scheduled Execution: Set tasks to run at specific times in the future
- 📝 Persistent Storage: Tasks are stored in Redis for durability
- 📋 Task Management: Get, cancel, and monitor tasks with a simple API
- 🔎 Filtering: Find tasks by user, agent, or other criteria
- 📬 Message Queue: Uses RabbitMQ for reliable task distribution
- 🚀 Worker Support: Built-in worker functionality for processing tasks
Installation
npm install agentic-automation-libGetting Started
The library requires Redis and RabbitMQ services. Make sure they are running before using this library.
import {
Config,
RedisClient,
RabbitMQService,
TaskRepository,
AgentTaskScheduler
} from 'agentic-automation-lib';
// Create a configuration instance
const config = new Config({
rabbitmq_url: "amqp://localhost:5672",
redis_url: "redis://localhost:6379",
check_interval_ms: 1000
});
// Initialize services
const redisClient = new RedisClient(config.redisUrl);
const rabbitMQService = new RabbitMQService(config);
await rabbitMQService.init();
// Create repository and scheduler
const taskRepository = new TaskRepository(redisClient);
const scheduler = new AgentTaskScheduler(config, taskRepository, rabbitMQService);
// Now you can start scheduling tasks!API Reference
Scheduling Tasks
Schedule a Single Task
// Schedule a task to run in 30 seconds
const now = new Date();
const thirtySecondsLater = new Date(now.getTime() + 30000);
const taskId = await scheduler.scheduleTask(
"agent-123", // Agent ID
"user-456", // User ID
"What's the weather today?", // Task query
"Check the current weather", // Task description
thirtySecondsLater // When to execute
);
console.log(`Task scheduled with ID: ${taskId}`);Schedule Multiple Tasks at Once
// Schedule several tasks with different parameters
const tasks = [
{
agentId: "agent-123",
userId: "user-456",
taskQuery: "What are my appointments today?",
taskDescription: "Check calendar for today",
scheduledTime: new Date(Date.now() + 45000), // 45 seconds later
},
{
agentId: "agent-123",
userId: "user-456",
taskQuery: "Remind me to call John",
taskDescription: "Call reminder",
scheduledTime: new Date(Date.now() + 60000), // 60 seconds later
},
{
agentId: "agent-789",
userId: "user-456",
taskQuery: "Daily news summary",
taskDescription: "Get news summary",
scheduledTime: new Date(Date.now() + 15000), // 15 seconds later
recurrenceInterval: 86400000, // Daily (24 hours)
recurrenceEndTime: new Date(Date.now() + 604800000), // 1 week from now
}
];
const taskIds = await scheduler.scheduleMultipleTasks(tasks);
console.log(`Tasks scheduled with IDs: ${taskIds.join(", ")}`);Schedule a Recurring Task
// Schedule a task to run every 10 seconds for 1 minute
const agentId = "agent-1";
const userId = "user-123";
const taskDescription = "Check the weather in Tokyo";
const firstRun = new Date(Date.now() + 5000); // first run in 5 seconds
const recurrenceInterval = 10000; // every 10 seconds
const recurrenceEndTime = new Date(Date.now() + 60000); // stop after 1 minute
const recurringTaskId = await scheduler.scheduleTask(
agentId,
userId,
"Current weather in Tokyo?",
taskDescription,
firstRun,
recurrenceInterval,
recurrenceEndTime
);
console.log(`Recurring task scheduled with ID: ${recurringTaskId}`);Managing Tasks
Get a Task by ID
const task = await scheduler.getTaskById("task-id-here");
if (task) {
console.log(`Task ${task.id} is scheduled for ${task.scheduledTime}`);
console.log(`Description: ${task.taskDescription}`);
console.log(`Status: ${task.status}`);
} else {
console.log("Task not found");
}Get All Active Tasks
// Get all tasks that are not expired or completed
const activeTasks = await scheduler.getActiveTasks();
console.log(`Found ${activeTasks.length} active tasks:`);
activeTasks.forEach((task, index) => {
console.log(`[${index + 1}] Task ${task.id}: ${task.taskDescription}`);
console.log(` Scheduled for: ${task.scheduledTime}`);
console.log(` Status: ${task.status}`);
});Get Tasks by User ID
// Get all tasks for a specific user
const userTasks = await scheduler.getTasksByUserId("user-456");
console.log(`Found ${userTasks.length} tasks for user-456:`);
userTasks.forEach((task, index) => {
console.log(`[${index + 1}] Task ${task.id}: ${task.taskDescription}`);
console.log(` Scheduled for: ${task.scheduledTime}`);
});Get Tasks by Agent ID
// Get all tasks assigned to a specific agent
const agentTasks = await scheduler.getTasksByAgentId("agent-123");
console.log(`Found ${agentTasks.length} tasks for agent-123:`);
agentTasks.forEach((task, index) => {
console.log(`[${index + 1}] Task ${task.id}: ${task.taskDescription}`);
console.log(` Scheduled for: ${task.scheduledTime}`);
});Canceling Tasks
Cancel a Single Task
// Cancel a task by its ID
const cancelled = await scheduler.cancelTask("task-id-here");
if (cancelled) {
console.log("Task cancelled successfully");
} else {
console.log("Failed to cancel task");
}Cancel Multiple Tasks
// Cancel multiple tasks at once
const tasksToCancel = ["task-id-1", "task-id-2", "task-id-3"];
const cancelResults = await scheduler.cancelMultipleTasks(tasksToCancel);
// Check results for each task
for (const [taskId, success] of Object.entries(cancelResults)) {
console.log(`Task ${taskId}: ${success ? "Cancelled successfully" : "Failed to cancel"}`);
}Worker Setup
To process tasks, you need to set up a worker:
import { Worker, AgentTask } from 'agentic-automation-lib';
// Define how your tasks should be processed
const dispatchAgentTask = async (task: AgentTask) => {
console.log(`Processing task ${task.id} for agent ${task.agentId}`);
console.log(`Task instruction: "${task.taskDescription}"`);
// Your agent's task processing logic here
console.log(`Task ${task.id} completed successfully`);
};
// Create and start the worker
const worker = new Worker(rabbitMQService, dispatchAgentTask);
await worker.start();
// To stop the worker when needed
process.on('SIGTERM', async () => {
await worker.stop();
process.exit(0);
});Complete Example
Here's a complete example of how to use the library:
import {
Config,
RedisClient,
RabbitMQService,
TaskRepository,
AgentTaskScheduler,
Worker,
AgentTask
} from 'agentic-automation-lib';
async function main() {
// Setup
const config = new Config({
rabbitmq_url: "amqp://localhost:5672",
redis_url: "redis://localhost:6379",
check_interval_ms: 1000
});
const redisClient = new RedisClient(config.redisUrl);
const rabbitMQService = new RabbitMQService(config);
await rabbitMQService.init();
const taskRepository = new TaskRepository(redisClient);
const scheduler = new AgentTaskScheduler(config, taskRepository, rabbitMQService);
// Schedule a task
const taskId = await scheduler.scheduleTask(
"agent-123",
"user-456",
"What's the weather today?",
"Check the current weather",
new Date(Date.now() + 5000) // Run in 5 seconds
);
console.log(`Task scheduled with ID: ${taskId}`);
// Setup a worker to process the task
const processTask = async (task: AgentTask) => {
console.log(`Processing task: ${task.taskDescription}`);
// Your processing logic here
console.log(`Task ${task.id} completed`);
};
const worker = new Worker(rabbitMQService, processTask);
await worker.start();
// Wait for 10 seconds to allow task processing
console.log("Waiting for task to be processed...");
await new Promise(resolve => setTimeout(resolve, 10000));
// Cleanup
await worker.stop();
scheduler.stop();
await redisClient.client.quit();
console.log("Cleanup completed");
}
main().catch(console.error);Task Data Structure
Each task has the following properties:
| Property | Type | Description | |----------|------|-------------| | id | string | Unique identifier for the task | | userId | string | ID of the user who owns the task | | agentId | string | ID of the agent that will process the task | | taskQuery | string | The query or command for the agent | | taskDescription | string | Human-readable description of the task | | scheduledTime | Date | When the task should be executed | | status | string | Current status: "pending", "queued", "running", "completed", or "failed" | | recurrenceInterval | number (optional) | Time in milliseconds between recurrences | | recurrenceEndTime | Date (optional) | When to stop recurring tasks |
Error Handling
The library uses try-catch blocks for error handling. For production use, it's recommended to implement additional error handling and logging.
Shutting Down
When your application is shutting down, make sure to clean up resources:
// Stop the scheduler
scheduler.stop();
// Close Redis connection
await redisClient.client.quit();
// Stop any workers
await worker.stop();License
MIT
