LangChain Scheduler

Schedule recurring LangChain agent tasks: RAG data refreshes, memory cleanup, periodic workflows, and automated chains.

Overview

LangChain agents often need to run on schedules: refreshing vector store data, cleaning up conversation memory, generating periodic reports, or re-processing documents. ClawTick provides a reliable scheduler for LangChain workflows without managing infrastructure.

Why Schedule LangChain Agents?

  • RAG Data Freshness: Keep vector stores updated with latest documents
  • Memory Management: Archive old conversations, clean up temporary data
  • Periodic Analysis: Daily summaries, weekly reports, monthly insights
  • Batch Processing: Process queued documents, update embeddings

Common Use Cases

RAG Data Refresh

Update vector store with latest documents from your knowledge base

0 2 * * * (Daily at 2 AM)

Memory Cleanup

Archive conversations older than 30 days to optimize retrieval performance

0 3 * * 0 (Weekly, Sunday 3 AM)

Embedding Updates

Re-embed documents when switching models or improving chunking strategy

0 */6 * * * (Every 6 hours)

Periodic Reports

Generate daily summaries of agent conversations and user interactions

0 9 * * * (Daily at 9 AM)

Setup Guide

Architecture

ClawTick triggers your LangChain agent via webhook at scheduled times. Your agent runs the workflow, processes data, and returns results.

ClawTick (Cloud Scheduler)
↓ HTTP POST at scheduled time
Your Webhook Endpoint (FastAPI/Flask)
↓ Invokes LangChain workflow
LangChain Agent Execution
↓ Returns results
Response → ClawTick logs

Prerequisites

  • LangChain installed: pip install langchain langchain-openai
  • Webhook server (FastAPI, Flask, or Express)
  • ClawTick account (free plan works for testing)

Complete Integration Example

FastAPI Webhook Server

Create a webhook endpoint that receives scheduled triggers from ClawTick:

python
from fastapi import FastAPI, HTTPException, Header from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_openai import ChatOpenAI from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.tools import Tool from pydantic import BaseModel import os app = FastAPI() # Authentication WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "your-secret-token") class WebhookPayload(BaseModel): message: str agent: str = "main" taskType: str = "scheduled" def create_langchain_agent(): """Initialize LangChain agent with tools""" llm = ChatOpenAI(temperature=0, model="gpt-4") # Define custom tools tools = [ Tool( name="refresh_vector_store", func=refresh_vector_store, description="Refresh RAG vector store with latest documents" ), Tool( name="cleanup_memory", func=cleanup_old_memory, description="Archive conversations older than 30 days" ), ] prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant that manages data workflows."), MessagesPlaceholder(variable_name="chat_history", optional=True), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ]) agent = create_openai_tools_agent(llm, tools, prompt) return AgentExecutor(agent=agent, tools=tools, verbose=True) def refresh_vector_store(): """Refresh vector store with latest documents""" # Your vector store refresh logic here # Example: Fetch new docs, chunk them, embed, upsert to vector DB return "Vector store refreshed successfully with 150 new documents" def cleanup_old_memory(): """Archive old conversation memory""" # Your memory cleanup logic here # Example: Move conversations >30 days to cold storage return "Archived 42 conversations older than 30 days" @app.post("/webhook/langchain") async def langchain_webhook( payload: WebhookPayload, authorization: str = Header(None) ): """ Webhook endpoint for ClawTick scheduled triggers """ # Verify authentication if authorization != f"Bearer {WEBHOOK_SECRET}": raise HTTPException(status_code=401, detail="Unauthorized") try: # Initialize agent agent_executor = create_langchain_agent() # Run the agent with the scheduled task message result = agent_executor.invoke({ "input": payload.message }) return { "success": True, "output": result["output"], "agent": payload.agent } except Exception as e: raise HTTPException( status_code=500, detail=f"Agent execution failed: {str(e)}" ) @app.get("/health") async def health_check(): return {"status": "healthy"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Environment Variables

bash
# .env OPENAI_API_KEY=sk-... WEBHOOK_SECRET=your-secret-token-here PINECONE_API_KEY=... # If using Pinecone for vectors POSTGRES_URL=... # If using PostgreSQL for memory

Run the Server

bash
# Install dependencies pip install fastapi uvicorn langchain langchain-openai # Start the server python webhook_server.py # Server runs on http://localhost:8000

RAG Data Refresh Pattern

Keep your RAG system up-to-date by scheduling regular vector store refreshes. This pattern fetches new documents, chunks them, generates embeddings, and updates your vector database.

Vector Store Refresh Tool

python
from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_pinecone import PineconeVectorStore from datetime import datetime, timedelta def refresh_vector_store(): """ Scheduled task: Refresh vector store with documents from last 24 hours """ # Fetch new documents (from your data source) new_docs = fetch_documents_since( datetime.now() - timedelta(days=1) ) if not new_docs: return "No new documents to process" # Chunk documents text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) chunks = text_splitter.split_documents(new_docs) # Generate embeddings and upsert to vector store embeddings = OpenAIEmbeddings() vector_store = PineconeVectorStore.from_documents( documents=chunks, embedding=embeddings, index_name="your-index", namespace="knowledge-base" ) return f"Successfully embedded and stored {len(chunks)} chunks from {len(new_docs)} documents" def fetch_documents_since(timestamp): """Fetch documents from your data source""" # Your logic to fetch new docs # Could be: Notion pages, Google Docs, database records, API responses pass

Schedule in ClawTick

bash
clawtick job create \ --name "RAG Vector Store Refresh" \ --cron "0 2 * * *" \ --message "Refresh the vector store with documents from the last 24 hours" \ --webhook-url "https://your-server.com/webhook/langchain" \ --webhook-secret "your-secret-token"

Runs daily at 2 AM UTC

Memory Cleanup Pattern

Prevent memory bloat by archiving old conversations. This keeps your agent's memory retrieval fast while preserving historical data for analysis.

Memory Cleanup Tool

python
from langchain.memory import PostgresChatMessageHistory from datetime import datetime, timedelta import psycopg2 def cleanup_old_memory(days_threshold=30): """ Archive conversations older than threshold to cold storage """ cutoff_date = datetime.now() - timedelta(days=days_threshold) # Connect to your memory store conn = psycopg2.connect(os.getenv("POSTGRES_URL")) cursor = conn.cursor() # Find old conversations cursor.execute(""" SELECT session_id, COUNT(*) as message_count FROM message_store WHERE created_at < %s GROUP BY session_id """, (cutoff_date,)) old_sessions = cursor.fetchall() # Archive to cold storage (S3, separate table, etc.) archived_count = 0 for session_id, count in old_sessions: archive_session_to_s3(session_id) # Delete from active memory cursor.execute(""" DELETE FROM message_store WHERE session_id = %s """, (session_id,)) archived_count += count conn.commit() cursor.close() conn.close() return f"Archived {archived_count} messages from {len(old_sessions)} conversations" def archive_session_to_s3(session_id): """Archive session to S3 for long-term storage""" # Your S3 upload logic pass

Schedule Weekly Cleanup

bash
clawtick job create \ --name "Memory Cleanup" \ --cron "0 3 * * 0" \ --message "Archive conversations older than 30 days to optimize memory retrieval" \ --webhook-url "https://your-server.com/webhook/langchain" \ --webhook-secret "your-secret-token"

Runs every Sunday at 3 AM UTC

Error Handling Best Practices

Robust Error Handling

python
import logging from tenacity import retry, stop_after_attempt, wait_exponential logger = logging.getLogger(__name__) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def refresh_vector_store_with_retry(): """Vector store refresh with automatic retries""" try: return refresh_vector_store() except Exception as e: logger.error(f"Vector store refresh failed: {str(e)}") raise @app.post("/webhook/langchain") async def langchain_webhook(payload: WebhookPayload, authorization: str = Header(None)): """Enhanced webhook with comprehensive error handling""" # Authentication if authorization != f"Bearer {WEBHOOK_SECRET}": logger.warning("Unauthorized webhook attempt") raise HTTPException(status_code=401, detail="Unauthorized") try: agent_executor = create_langchain_agent() # Run agent with timeout result = agent_executor.invoke( {"input": payload.message}, config={"max_execution_time": 300} # 5 minute timeout ) # Log success logger.info(f"Agent task completed: {payload.message[:50]}...") return { "success": True, "output": result["output"], "timestamp": datetime.now().isoformat() } except TimeoutError: logger.error("Agent execution timeout") raise HTTPException(status_code=504, detail="Task timeout") except Exception as e: logger.error(f"Agent execution failed: {str(e)}", exc_info=True) # Return structured error for ClawTick logs raise HTTPException( status_code=500, detail={ "error": str(e), "task": payload.message, "timestamp": datetime.now().isoformat() } )

Error Handling Checklist

  • Use retries with exponential backoff for transient failures
  • Set execution timeouts to prevent hung tasks
  • Log errors with context for debugging
  • Return structured errors to ClawTick for visibility
  • Monitor failed runs in ClawTick dashboard

Advanced Patterns

Conditional Workflows

Skip execution if conditions aren't met (e.g., no new documents, quota exceeded):

python
def conditional_refresh(): """Only refresh if new documents exist""" new_docs = fetch_documents_since(last_refresh_time) if not new_docs: return { "success": True, "skipped": True, "reason": "No new documents to process" } # Proceed with refresh result = refresh_vector_store() return {"success": True, "processed": len(new_docs)}

Multi-Stage Workflows

Chain multiple LangChain operations in a single scheduled task:

python
def daily_maintenance_workflow(): """Multi-stage daily maintenance""" results = { "stages": [], "success": True } # Stage 1: Refresh vector store try: vector_result = refresh_vector_store() results["stages"].append({ "stage": "vector_refresh", "status": "success", "output": vector_result }) except Exception as e: results["stages"].append({ "stage": "vector_refresh", "status": "failed", "error": str(e) }) results["success"] = False # Stage 2: Cleanup old memory try: memory_result = cleanup_old_memory() results["stages"].append({ "stage": "memory_cleanup", "status": "success", "output": memory_result }) except Exception as e: results["stages"].append({ "stage": "memory_cleanup", "status": "failed", "error": str(e) }) results["success"] = False # Stage 3: Generate daily summary try: summary = generate_daily_summary() results["stages"].append({ "stage": "summary", "status": "success", "output": summary }) except Exception as e: results["stages"].append({ "stage": "summary", "status": "failed", "error": str(e) }) results["success"] = False return results

Dynamic Scheduling

Update job schedules programmatically based on workload:

python
import requests def adjust_schedule_based_on_load(): """Adjust refresh frequency based on document volume""" recent_volume = get_recent_document_volume() if recent_volume > 1000: # High volume: refresh every 6 hours new_cron = "0 */6 * * *" elif recent_volume > 100: # Medium volume: refresh daily new_cron = "0 2 * * *" else: # Low volume: refresh weekly new_cron = "0 2 * * 0" # Update via ClawTick API response = requests.put( f"https://api.clawtick.com/v1/jobs/{job_id}", headers={"Authorization": f"Bearer {API_KEY}"}, json={"cron": new_cron} ) return f"Updated schedule to: {new_cron}"

Next Steps

Ready to automate your LangChain workflows? Start by deploying a webhook endpoint and creating your first scheduled job.