LangChain Scheduler
Schedule recurring LangChain agent tasks: RAG data refreshes, memory cleanup, periodic workflows, and automated chains.
On this page
Overview
LangChain agents often need to run on schedules: refreshing vector store data, cleaning up conversation memory, generating periodic reports, or re-processing documents. ClawTick provides a reliable scheduler for LangChain workflows without managing infrastructure.
Why Schedule LangChain Agents?
- •RAG Data Freshness: Keep vector stores updated with latest documents
- •Memory Management: Archive old conversations, clean up temporary data
- •Periodic Analysis: Daily summaries, weekly reports, monthly insights
- •Batch Processing: Process queued documents, update embeddings
Common Use Cases
RAG Data Refresh
Update vector store with latest documents from your knowledge base
Memory Cleanup
Archive conversations older than 30 days to optimize retrieval performance
Embedding Updates
Re-embed documents when switching models or improving chunking strategy
Periodic Reports
Generate daily summaries of agent conversations and user interactions
Setup Guide
Architecture
ClawTick triggers your LangChain agent via webhook at scheduled times. Your agent runs the workflow, processes data, and returns results.
Prerequisites
- LangChain installed:
pip install langchain langchain-openai - Webhook server (FastAPI, Flask, or Express)
- ClawTick account (free plan works for testing)
Complete Integration Example
FastAPI Webhook Server
Create a webhook endpoint that receives scheduled triggers from ClawTick:
from fastapi import FastAPI, HTTPException, Header
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import Tool
from pydantic import BaseModel
import os
app = FastAPI()
# Authentication
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "your-secret-token")
class WebhookPayload(BaseModel):
message: str
agent: str = "main"
taskType: str = "scheduled"
def create_langchain_agent():
"""Initialize LangChain agent with tools"""
llm = ChatOpenAI(temperature=0, model="gpt-4")
# Define custom tools
tools = [
Tool(
name="refresh_vector_store",
func=refresh_vector_store,
description="Refresh RAG vector store with latest documents"
),
Tool(
name="cleanup_memory",
func=cleanup_old_memory,
description="Archive conversations older than 30 days"
),
]
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that manages data workflows."),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_tools_agent(llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools, verbose=True)
def refresh_vector_store():
"""Refresh vector store with latest documents"""
# Your vector store refresh logic here
# Example: Fetch new docs, chunk them, embed, upsert to vector DB
return "Vector store refreshed successfully with 150 new documents"
def cleanup_old_memory():
"""Archive old conversation memory"""
# Your memory cleanup logic here
# Example: Move conversations >30 days to cold storage
return "Archived 42 conversations older than 30 days"
@app.post("/webhook/langchain")
async def langchain_webhook(
payload: WebhookPayload,
authorization: str = Header(None)
):
"""
Webhook endpoint for ClawTick scheduled triggers
"""
# Verify authentication
if authorization != f"Bearer {WEBHOOK_SECRET}":
raise HTTPException(status_code=401, detail="Unauthorized")
try:
# Initialize agent
agent_executor = create_langchain_agent()
# Run the agent with the scheduled task message
result = agent_executor.invoke({
"input": payload.message
})
return {
"success": True,
"output": result["output"],
"agent": payload.agent
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Agent execution failed: {str(e)}"
)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Environment Variables
# .env
OPENAI_API_KEY=sk-...
WEBHOOK_SECRET=your-secret-token-here
PINECONE_API_KEY=... # If using Pinecone for vectors
POSTGRES_URL=... # If using PostgreSQL for memoryRun the Server
# Install dependencies
pip install fastapi uvicorn langchain langchain-openai
# Start the server
python webhook_server.py
# Server runs on http://localhost:8000RAG Data Refresh Pattern
Keep your RAG system up-to-date by scheduling regular vector store refreshes. This pattern fetches new documents, chunks them, generates embeddings, and updates your vector database.
Vector Store Refresh Tool
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from datetime import datetime, timedelta
def refresh_vector_store():
"""
Scheduled task: Refresh vector store with documents from last 24 hours
"""
# Fetch new documents (from your data source)
new_docs = fetch_documents_since(
datetime.now() - timedelta(days=1)
)
if not new_docs:
return "No new documents to process"
# Chunk documents
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(new_docs)
# Generate embeddings and upsert to vector store
embeddings = OpenAIEmbeddings()
vector_store = PineconeVectorStore.from_documents(
documents=chunks,
embedding=embeddings,
index_name="your-index",
namespace="knowledge-base"
)
return f"Successfully embedded and stored {len(chunks)} chunks from {len(new_docs)} documents"
def fetch_documents_since(timestamp):
"""Fetch documents from your data source"""
# Your logic to fetch new docs
# Could be: Notion pages, Google Docs, database records, API responses
passSchedule in ClawTick
clawtick job create \
--name "RAG Vector Store Refresh" \
--cron "0 2 * * *" \
--message "Refresh the vector store with documents from the last 24 hours" \
--webhook-url "https://your-server.com/webhook/langchain" \
--webhook-secret "your-secret-token"Runs daily at 2 AM UTC
Memory Cleanup Pattern
Prevent memory bloat by archiving old conversations. This keeps your agent's memory retrieval fast while preserving historical data for analysis.
Memory Cleanup Tool
from langchain.memory import PostgresChatMessageHistory
from datetime import datetime, timedelta
import psycopg2
def cleanup_old_memory(days_threshold=30):
"""
Archive conversations older than threshold to cold storage
"""
cutoff_date = datetime.now() - timedelta(days=days_threshold)
# Connect to your memory store
conn = psycopg2.connect(os.getenv("POSTGRES_URL"))
cursor = conn.cursor()
# Find old conversations
cursor.execute("""
SELECT session_id, COUNT(*) as message_count
FROM message_store
WHERE created_at < %s
GROUP BY session_id
""", (cutoff_date,))
old_sessions = cursor.fetchall()
# Archive to cold storage (S3, separate table, etc.)
archived_count = 0
for session_id, count in old_sessions:
archive_session_to_s3(session_id)
# Delete from active memory
cursor.execute("""
DELETE FROM message_store
WHERE session_id = %s
""", (session_id,))
archived_count += count
conn.commit()
cursor.close()
conn.close()
return f"Archived {archived_count} messages from {len(old_sessions)} conversations"
def archive_session_to_s3(session_id):
"""Archive session to S3 for long-term storage"""
# Your S3 upload logic
passSchedule Weekly Cleanup
clawtick job create \
--name "Memory Cleanup" \
--cron "0 3 * * 0" \
--message "Archive conversations older than 30 days to optimize memory retrieval" \
--webhook-url "https://your-server.com/webhook/langchain" \
--webhook-secret "your-secret-token"Runs every Sunday at 3 AM UTC
Error Handling Best Practices
Robust Error Handling
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def refresh_vector_store_with_retry():
"""Vector store refresh with automatic retries"""
try:
return refresh_vector_store()
except Exception as e:
logger.error(f"Vector store refresh failed: {str(e)}")
raise
@app.post("/webhook/langchain")
async def langchain_webhook(payload: WebhookPayload, authorization: str = Header(None)):
"""Enhanced webhook with comprehensive error handling"""
# Authentication
if authorization != f"Bearer {WEBHOOK_SECRET}":
logger.warning("Unauthorized webhook attempt")
raise HTTPException(status_code=401, detail="Unauthorized")
try:
agent_executor = create_langchain_agent()
# Run agent with timeout
result = agent_executor.invoke(
{"input": payload.message},
config={"max_execution_time": 300} # 5 minute timeout
)
# Log success
logger.info(f"Agent task completed: {payload.message[:50]}...")
return {
"success": True,
"output": result["output"],
"timestamp": datetime.now().isoformat()
}
except TimeoutError:
logger.error("Agent execution timeout")
raise HTTPException(status_code=504, detail="Task timeout")
except Exception as e:
logger.error(f"Agent execution failed: {str(e)}", exc_info=True)
# Return structured error for ClawTick logs
raise HTTPException(
status_code=500,
detail={
"error": str(e),
"task": payload.message,
"timestamp": datetime.now().isoformat()
}
)Error Handling Checklist
- ✓Use retries with exponential backoff for transient failures
- ✓Set execution timeouts to prevent hung tasks
- ✓Log errors with context for debugging
- ✓Return structured errors to ClawTick for visibility
- ✓Monitor failed runs in ClawTick dashboard
Advanced Patterns
Conditional Workflows
Skip execution if conditions aren't met (e.g., no new documents, quota exceeded):
def conditional_refresh():
"""Only refresh if new documents exist"""
new_docs = fetch_documents_since(last_refresh_time)
if not new_docs:
return {
"success": True,
"skipped": True,
"reason": "No new documents to process"
}
# Proceed with refresh
result = refresh_vector_store()
return {"success": True, "processed": len(new_docs)}Multi-Stage Workflows
Chain multiple LangChain operations in a single scheduled task:
def daily_maintenance_workflow():
"""Multi-stage daily maintenance"""
results = {
"stages": [],
"success": True
}
# Stage 1: Refresh vector store
try:
vector_result = refresh_vector_store()
results["stages"].append({
"stage": "vector_refresh",
"status": "success",
"output": vector_result
})
except Exception as e:
results["stages"].append({
"stage": "vector_refresh",
"status": "failed",
"error": str(e)
})
results["success"] = False
# Stage 2: Cleanup old memory
try:
memory_result = cleanup_old_memory()
results["stages"].append({
"stage": "memory_cleanup",
"status": "success",
"output": memory_result
})
except Exception as e:
results["stages"].append({
"stage": "memory_cleanup",
"status": "failed",
"error": str(e)
})
results["success"] = False
# Stage 3: Generate daily summary
try:
summary = generate_daily_summary()
results["stages"].append({
"stage": "summary",
"status": "success",
"output": summary
})
except Exception as e:
results["stages"].append({
"stage": "summary",
"status": "failed",
"error": str(e)
})
results["success"] = False
return resultsDynamic Scheduling
Update job schedules programmatically based on workload:
import requests
def adjust_schedule_based_on_load():
"""Adjust refresh frequency based on document volume"""
recent_volume = get_recent_document_volume()
if recent_volume > 1000:
# High volume: refresh every 6 hours
new_cron = "0 */6 * * *"
elif recent_volume > 100:
# Medium volume: refresh daily
new_cron = "0 2 * * *"
else:
# Low volume: refresh weekly
new_cron = "0 2 * * 0"
# Update via ClawTick API
response = requests.put(
f"https://api.clawtick.com/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"cron": new_cron}
)
return f"Updated schedule to: {new_cron}"Next Steps
Ready to automate your LangChain workflows? Start by deploying a webhook endpoint and creating your first scheduled job.