options-analytics-agent-langgraph

Build AI agents for real-time financial options analysis with LangGraph, ChromaDB RAG, and Polygon.io data

Skill file

Preview skill file
---
name: options-analytics-agent-langgraph
description: Build AI agents for real-time financial options analysis with LangGraph, ChromaDB RAG, and Polygon.io data
triggers:
  - analyze stock options using LangGraph agent
  - build financial options analysis agent
  - create options trading analysis with RAG
  - integrate Polygon.io options data with AI
  - implement options analytics with persistent memory
  - set up ChromaDB for financial data caching
  - build multi-agent workflow for options trading
  - create intelligent options data processing system
---

# Options Analytics Agent with LangGraph

> Skill by [ara.so](https://ara.so) — Data Skills collection.

A sophisticated LangGraph-based agent that automates financial options analysis with real-time data from Polygon.io, smart caching via ChromaDB, persistent memory, and professional-grade analysis. Built for creating intelligent trading assistants with RAG capabilities and microservice architecture.

## What It Does

This project provides a complete AI agent system for:
- **Real-time options data retrieval** from Polygon.io with intelligent caching
- **RAG-powered knowledge base** using ChromaDB for semantic search
- **Persistent conversation memory** across sessions via SQLite
- **Professional options analysis** with Greeks, sentiment, and anomaly detection
- **Multi-format exports** (CSV, charts, reports)
- **LangGraph orchestration** for multi-agent workflows
- **FastAPI microservice** deployment

## Installation

### Prerequisites

```bash
# Python 3.10+
python --version

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
```

### Install Dependencies

```bash
pip install -r requirements.txt
```

**Key dependencies:**
```
langchain>=0.3.0
langgraph>=0.2.45
langchain-openai>=0.2.6
langchain-chroma>=0.1.4
chromadb>=0.5.20
fastapi>=0.115.5
uvicorn>=0.32.1
pandas>=2.2.3
matplotlib>=3.9.2
tavily-python>=0.5.0
```

### Environment Configuration

Create `.env` file in project root:

```bash
# Required
OPENAI_API_KEY=your_openai_api_key
POLYGON_API_KEY=your_polygon_io_api_key

# Optional
TAVILY_API_KEY=your_tavily_api_key  # For web search
LANGCHAIN_API_KEY=your_langchain_api_key  # For tracing
LANGCHAIN_TRACING_V2=true
```

### Verify Installation

```python
# Test import
from agent_main import create_agent_workflow
from config.settings import validate_api_keys

# Validate API keys
validate_api_keys()
print("✓ Installation successful")
```

## Project Structure

```
project/
├── agent_main.py              # Main agent entry point
├── config/settings.py         # Configuration management
├── tools/
│   ├── search/               # Options search tools
│   ├── export/               # Data export tools
│   └── analysis/             # Analysis tools
├── rag/                      # RAG knowledge base
├── monitoring/               # Performance tracking
└── microservice/             # FastAPI deployment
```

## Core Usage

### Basic Agent Interaction

```python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage

# Create agent
workflow = create_agent_workflow()
app = workflow.compile()

# Simple query
config = {"configurable": {"thread_id": "session_1"}}
query = "Search for AAPL options expiring this week"

result = app.invoke(
    {"messages": [HumanMessage(content=query)]},
    config=config
)

print(result["messages"][-1].content)
```

### Interactive Chat Loop

```python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage

def chat():
    workflow = create_agent_workflow()
    app = workflow.compile()
    session_id = "user_session_1"
    
    print("Options Analytics Agent (type 'exit' to quit)")
    
    while True:
        user_input = input("\nYou: ").strip()
        if user_input.lower() in ['exit', 'quit']:
            break
            
        config = {"configurable": {"thread_id": session_id}}
        result = app.invoke(
            {"messages": [HumanMessage(content=user_input)]},
            config=config
        )
        
        response = result["messages"][-1].content
        print(f"\nAgent: {response}")

if __name__ == "__main__":
    chat()
```

## Key Tools & Commands

### 1. Options Search Tool

Search for options data with automatic caching:

```python
from tools.search.options_search import OptionsSearchTool

tool = OptionsSearchTool()

# Search with automatic caching
result = tool._run(
    ticker="NVDA",
    expiration_date="2024-12-20",
    option_type="call",
    force_refresh=False  # Use cache if available
)

# Force fresh API call
result = tool._run(
    ticker="NVDA",
    expiration_date="2024-12-20",
    option_type="call",
    force_refresh=True
)
```

### 2. Batch Search Tool

Search multiple tickers efficiently:

```python
from tools.search.batch_search import BatchOptionsSearchTool

tool = BatchOptionsSearchTool()

result = tool._run(
    tickers=["AAPL", "MSFT", "GOOGL"],
    expiration_date="2024-12-31",
    option_type="call"
)
```

### 3. RAG Knowledge Base

Query cached options data semantically:

```python
from rag.rag_tools import RAGQueryTool

rag_tool = RAGQueryTool()

# Semantic search
results = rag_tool._run(
    query="high volume AAPL calls near the money",
    top_k=5
)

# Date-based retrieval
from rag.rag_collection_tools import DateRangeCollectionTool

date_tool = DateRangeCollectionTool()
data = date_tool._run(
    ticker="AAPL",
    start_date="2024-12-01",
    end_date="2024-12-31"
)
```

### 4. Options Analysis

Professional-grade analysis:

```python
from tools.analysis.analysis_tools import AnalyzeOptionsTool

analysis_tool = AnalyzeOptionsTool()

result = analysis_tool._run(
    ticker="TSLA",
    expiration_date="2024-12-20",
    analysis_type="sentiment"  # or "greeks", "anomaly"
)

print(result)
```

### 5. Data Export

Export to CSV or charts:

```python
from tools.export.csv_export import CSVExportTool
from tools.export.visualization import ChartVisualizationTool

# CSV export
csv_tool = CSVExportTool()
csv_tool._run(
    ticker="AAPL",
    expiration_date="2024-12-20",
    option_type="call",
    output_filename="aapl_calls.csv"
)

# Chart generation
chart_tool = ChartVisualizationTool()
chart_tool._run(
    ticker="AAPL",
    expiration_date="2024-12-20",
    chart_type="volume_oi"
)
```

## Configuration

### Settings Management

```python
# config/settings.py
from config.settings import (
    OPENAI_API_KEY,
    POLYGON_API_KEY,
    MODEL_NAME,
    CHROMA_PERSIST_DIR,
    validate_api_keys
)

# Validate all keys
validate_api_keys()

# Access configuration
print(f"Model: {MODEL_NAME}")
print(f"ChromaDB: {CHROMA_PERSIST_DIR}")
```

### Custom Agent Configuration

```python
from agent_main import create_agent_workflow
from langgraph.checkpoint.memory import MemorySaver

# Create with custom checkpointer
memory = MemorySaver()
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=memory)

# Or use SQLite checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    app = workflow.compile(checkpointer=checkpointer)
```

## Advanced Patterns

### Custom Tool Integration

```python
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

@tool
def custom_options_analyzer(ticker: str, metric: str) -> str:
    """Analyze specific option metric.
    
    Args:
        ticker: Stock symbol
        metric: Metric to analyze (volatility, skew, etc.)
    """
    # Your custom logic
    return f"Analysis for {ticker}: {metric}"

# Add to agent
from config.settings import get_llm

llm = get_llm()
tools = [custom_options_analyzer]
agent = create_react_agent(llm, tools)
```

### RAG Knowledge Base Setup

```python
from rag.rag_knowledge_base import RAGKnowledgeBase

# Initialize
kb = RAGKnowledgeBase(
    persist_directory="./data/chroma_db",
    collection_name="options_data"
)

# Add documents
kb.add_documents([
    {
        "ticker": "AAPL",
        "expiration": "2024-12-20",
        "strike": 180.0,
        "type": "call",
        "volume": 5000,
        "open_interest": 10000
    }
])

# Query
results = kb.query(
    query_text="high volume Apple calls",
    n_results=5
)
```

### Persistent Memory Across Sessions

```python
from langgraph.checkpoint.sqlite import SqliteSaver

# Create persistent checkpointer
checkpointer = SqliteSaver.from_conn_string("./data/conversation_memory.db")

workflow = create_agent_workflow()
app = workflow.compile(checkpointer=checkpointer)

# Session 1
config1 = {"configurable": {"thread_id": "user_123"}}
app.invoke({"messages": [HumanMessage("Search AAPL options")]}, config1)

# Session 2 (remembers previous context)
app.invoke({"messages": [HumanMessage("Show me the calls")]}, config1)
```

### Streaming Responses

```python
from langchain_core.messages import HumanMessage

workflow = create_agent_workflow()
app = workflow.compile()

config = {"configurable": {"thread_id": "session_1"}}
query = HumanMessage(content="Analyze TSLA options")

# Stream tokens
for chunk in app.stream({"messages": [query]}, config):
    if "messages" in chunk:
        print(chunk["messages"][-1].content, end="", flush=True)
```

## Microservice Deployment

### FastAPI Server

```python
# microservice/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage

app = FastAPI(title="Options Analytics API")
workflow = create_agent_workflow()
agent_app = workflow.compile()

class QueryRequest(BaseModel):
    query: str
    session_id: str = "default"

@app.post("/query")
async def query_agent(request: QueryRequest):
    try:
        config = {"configurable": {"thread_id": request.session_id}}
        result = agent_app.invoke(
            {"messages": [HumanMessage(content=request.query)]},
            config=config
        )
        return {
            "response": result["messages"][-1].content,
            "session_id": request.session_id
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Run: uvicorn microservice.app:app --reload
```

### Docker Deployment

```dockerfile
# microservice/Dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "microservice.app:app", "--host", "0.0.0.0", "--port", "8000"]
```

```yaml
# docker-compose.yml
version: '3.8'

services:
  options-agent:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - POLYGON_API_KEY=${POLYGON_API_KEY}
    volumes:
      - ./data:/app/data
      - ./outputs:/app/outputs
```

**Start service:**
```bash
docker-compose up -d
```

## Common Workflows

### Complete Options Analysis Pipeline

```python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage

def analyze_options_workflow(ticker: str, expiration: str):
    workflow = create_agent_workflow()
    app = workflow.compile()
    
    config = {"configurable": {"thread_id": f"analysis_{ticker}"}}
    
    # Step 1: Search options
    query1 = f"Search {ticker} options expiring {expiration}"
    result1 = app.invoke({"messages": [HumanMessage(query1)]}, config)
    
    # Step 2: Analyze sentiment
    query2 = f"Analyze sentiment for {ticker} options"
    result2 = app.invoke({"messages": [HumanMessage(query2)]}, config)
    
    # Step 3: Export to CSV
    query3 = f"Export {ticker} options to CSV"
    result3 = app.invoke({"messages": [HumanMessage(query3)]}, config)
    
    return {
        "search": result1["messages"][-1].content,
        "analysis": result2["messages"][-1].content,
        "export": result3["messages"][-1].content
    }

# Run pipeline
results = analyze_options_workflow("NVDA", "2024-12-31")
```

### Batch Processing Multiple Tickers

```python
from tools.search.batch_search import BatchOptionsSearchTool
from tools.analysis.analysis_tools import AnalyzeOptionsTool

def batch_analysis(tickers: list, expiration: str):
    search_tool = BatchOptionsSearchTool()
    analysis_tool = AnalyzeOptionsTool()
    
    results = {}
    
    # Batch search
    search_result = search_tool._run(
        tickers=tickers,
        expiration_date=expiration,
        option_type="call"
    )
    
    # Individual analysis
    for ticker in tickers:
        analysis = analysis_tool._run(
            ticker=ticker,
            expiration_date=expiration,
            analysis_type="sentiment"
        )
        results[ticker] = analysis
    
    return results

# Process watchlist
watchlist = ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]
results = batch_analysis(watchlist, "2024-12-31")
```

### Real-time Monitoring with Callbacks

```python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.messages import HumanMessage

class OptionsMonitorCallback(BaseCallbackHandler):
    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"🔧 Tool: {serialized['name']}")
    
    def on_tool_end(self, output, **kwargs):
        print(f"✅ Result: {output[:100]}...")

# Use callback
workflow = create_agent_workflow()
app = workflow.compile()

config = {
    "configurable": {"thread_id": "monitor_session"},
    "callbacks": [OptionsMonitorCallback()]
}

result = app.invoke(
    {"messages": [HumanMessage("Search AAPL options")]},
    config=config
)
```

## Troubleshooting

### API Key Issues

```python
# Validate API keys
from config.settings import validate_api_keys

try:
    validate_api_keys()
    print("✓ All API keys valid")
except ValueError as e:
    print(f"✗ Missing: {e}")
    # Set missing keys in .env file
```

### ChromaDB Connection Errors

```python
# Reset ChromaDB
import shutil
import os

chroma_path = "./data/chroma_db"
if os.path.exists(chroma_path):
    shutil.rmtree(chroma_path)
    print("ChromaDB reset")

# Reinitialize
from rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()
```

### Clear Conversation Memory

```python
# Clear SQLite memory
import os

memory_file = "./data/conversation_memory.db"
if os.path.exists(memory_file):
    os.remove(memory_file)
    print("Conversation memory cleared")
```

### Polygon.io Rate Limits

```python
# Use caching to reduce API calls
from tools.search.options_search import OptionsSearchTool

tool = OptionsSearchTool()

# Always try cache first
result = tool._run(
    ticker="AAPL",
    expiration_date="2024-12-20",
    option_type="call",
    force_refresh=False  # Use cached data
)

# Only force refresh when absolutely necessary
```

### Debug Agent State

```python
# Inspect agent state
from agent_main import create_agent_workflow

workflow = create_agent_workflow()
app = workflow.compile()

config = {"configurable": {"thread_id": "debug_session"}}
result = app.invoke(
    {"messages": [HumanMessage("Search AAPL options")]},
    config=config
)

# Print full state
print("Messages:", result["messages"])
print("Tools called:", [m.additional_kwargs for m in result["messages"]])
```

### Enable Tracing

```python
# Set in .env
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_API_KEY=your_key

# Or in code
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"

# View traces at https://smith.langchain.com
```

## Performance Optimization

### Batch Embeddings

```python
from rag.rag_knowledge_base import RAGKnowledgeBase

kb = RAGKnowledgeBase()

# Batch add for efficiency
documents = [
    {"ticker": "AAPL", "data": "..."},
    {"ticker": "MSFT", "data": "..."},
    # ... more documents
]

kb.add_documents(documents)  # Processes in batches
```

### Parallel Tool Execution

```python
from concurrent.futures import ThreadPoolExecutor
from tools.search.options_search import OptionsSearchTool

def search_ticker(ticker, date):
    tool = OptionsSearchTool()
    return tool._run(ticker=ticker, expiration_date=date, option_type="call")

tickers = ["AAPL", "MSFT", "GOOGL", "NVDA"]
date = "2024-12-31"

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(lambda t: search_ticker(t, date), tickers))
```

## Testing

### Unit Test Example

```python
import unittest
from tools.search.options_search import OptionsSearchTool

class TestOptionsSearch(unittest.TestCase):
    def setUp(self):
        self.tool = OptionsSearchTool()
    
    def test_search_call_options(self):
        result = self.tool._run(
            ticker="AAPL",
            expiration_date="2024-12-20",
            option_type="call"
        )
        self.assertIn("AAPL", result)
        self.assertIn("call", result.lower())

if __name__ == "__main__":
    unittest.main()
```

This skill provides comprehensive coverage of the Options Analytics Agent project, enabling AI coding agents to effectively assist developers in building sophisticated financial analysis systems with LangGraph, RAG, and persistent memory.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk