Skip to main content

Overview

Follow these best practices to build robust, efficient applications with the Fintool API.

Authentication & Security

Never hardcode API keys in your application code. Use environment variables or secure credential management systems.
# ✅ Good - Use environment variables
import os
API_KEY = os.getenv('FINTOOL_API_KEY')

# ❌ Bad - Hardcoded key
API_KEY = "mint_abc123..."
Request separate API keys for development, staging, and production to maintain security and enable easier key rotation.
Periodically rotate your API keys as a security best practice. Plan for key rotation by designing your application to handle key updates gracefully.
Always make API calls from your backend server. Never include API keys in client-side JavaScript, mobile apps, or public repositories.

Session Management

Always Return session_data

The most important best practice for maintaining conversation context:
Always include the session_data from previous responses in your next request to maintain conversation context and get more accurate answers.
# Correct way to maintain context
conversation = []

# First query
response1 = client.chat(
    messages=[{"role": "user", "content": "What is Apple's revenue?"}]
)

# Store the complete assistant message with metadata
conversation.append({
    "role": "assistant",
    "content": response1['message']['content'],
    "metadata": response1['message']['metadata']  # ✅ Include session_data
})

# Follow-up query
conversation.append({"role": "user", "content": "How does that compare to last year?"})

response2 = client.chat(messages=conversation)

Why session_data Matters

  • Context Preservation: Maintains conversation history and context
  • Better Answers: AI understands what “that” and “it” refer to
  • Improved Performance: Reduces redundant processing
  • Cost Efficiency: Avoids re-analyzing the same documents

Streaming for Better UX

When to Use Streaming

✅ Use Streaming

  • Interactive chat applications
  • User-facing interfaces
  • Real-time analytics dashboards
  • Live data exploration

❌ Don't Use Streaming

  • Batch processing jobs
  • Background data analysis
  • Scheduled reports
  • Non-interactive workflows

Streaming Implementation Tips

# Show thinking states to keep users engaged
for event in client.chat(messages=messages, stream=True):
    if event.get('type') == 'message':
        message = event['message']

        # Display thinking/processing state
        if 'thinking' in message:
            print(f"💭 {message['thinking']}")

        # Stream content progressively
        if 'content' in message:
            print(message['content'], end='', flush=True)

Filtering Strategies

Ticker Filtering

Always include ticker symbols when you know which companies are relevant:
# ✅ Good - Specific tickers
response = client.chat(
    messages=[{"role": "user", "content": "Compare revenue growth"}],
    filters={"tickers": ["AAPL", "MSFT"]}
)

# ❌ Less optimal - No ticker filter for company-specific question
response = client.chat(
    messages=[{"role": "user", "content": "What is Apple's revenue?"}]
)

Document Type Selection

Choose appropriate document types for your query:
  • Annual Data
  • Quarterly Updates
  • Management Commentary
  • Material Events
Use 10-K forms for comprehensive annual information:
filters={
    "doc_types": ["10-K"],
    "dates": {"start": "2024-01-01", "end": "2024-12-31"}
}

Date Range Filtering

Specify date ranges to focus on relevant time periods:
# Recent data only
filters={
    "dates": {
        "start": "2024-01-01",
        "end": "2024-12-31"
    }
}

# Specific quarter
filters={
    "dates": {
        "start": "2024-10-01",
        "end": "2024-12-31"
    }
}

Error Handling

Implement Retry Logic

Handle transient failures with exponential backoff:
import time
from requests.exceptions import HTTPError, Timeout

def chat_with_retry(client, messages, max_retries=3):
    """Chat request with exponential backoff retry"""

    for attempt in range(max_retries):
        try:
            return client.chat(messages=messages)

        except HTTPError as e:
            if e.response.status_code == 429:  # Rate limit
                wait_time = (2 ** attempt)  # Exponential backoff: 1s, 2s, 4s
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)

            elif e.response.status_code >= 500:  # Server error
                if attempt < max_retries - 1:
                    print(f"Server error. Retrying... ({attempt + 1}/{max_retries})")
                    time.sleep(1)
                else:
                    raise

            else:
                # Client error (4xx) - don't retry
                raise

        except Timeout:
            if attempt < max_retries - 1:
                print(f"Timeout. Retrying... ({attempt + 1}/{max_retries})")
                time.sleep(1)
            else:
                raise

    raise Exception("Max retries exceeded")

Handle Specific Error Codes

try:
    response = client.chat(messages=messages)

except HTTPError as e:
    if e.response.status_code == 401:
        print("❌ Authentication failed. Check your API key.")

    elif e.response.status_code == 429:
        print("⚠️ Rate limit exceeded. Slow down requests.")

    elif e.response.status_code == 400:
        error_data = e.response.json()
        print(f"❌ Invalid request: {error_data.get('error', {}).get('message')}")

    else:
        print(f"❌ API error: {e}")

Citation Handling

Parse Citation Markers

Always parse and display citations to users:
import re

def extract_citations(content: str, citations: List[Dict]) -> str:
    """Replace citation markers with clickable references"""

    citation_map = {c['chunk_id']: c for c in citations}
    counter = 1
    footnotes = []

    def replace_marker(match):
        nonlocal counter
        chunk_id = match.group(1)

        if chunk_id in citation_map:
            citation = citation_map[chunk_id]
            footnotes.append(
                f"[{counter}] {citation['document_title']}, "
                f"Page {citation['page_number']}"
            )
            result = f"[{counter}]"
            counter += 1
            return result

        return match.group(0)

    # Replace markers
    formatted = re.sub(r'\*\*\[([^\]]+)\]\*\*', replace_marker, content)

    # Add footnotes
    if footnotes:
        formatted += "\n\nSources:\n" + "\n".join(footnotes)

    return formatted

Display Source Information

Make citations actionable for users:
# Extract and display citations
for citation in response['citations']:
    print(f"""
    📄 {citation['document_title']}
    📍 Page {citation['page_number']}
    🎯 Relevance: {citation['relevance_score']:.0%}
    """)

Performance Optimization

Use Reranking Strategically

Reranking improves relevance but adds latency (~200ms):
# ✅ Use reranking for user-facing searches
search_results = client.search(
    query="...",
    rerank=True  # Better quality, slightly slower
)

# ✅ Disable for speed-critical applications
search_results = client.search(
    query="...",
    rerank=False  # Faster, good enough for many use cases
)

Batch Similar Queries

If you need to analyze multiple companies, batch your queries:
# ✅ Good - Single query with multiple tickers
response = client.chat(
    messages=[{
        "role": "user",
        "content": "Compare revenue growth for Apple, Microsoft, and Google"
    }],
    filters={"tickers": ["AAPL", "MSFT", "GOOGL"]}
)

# ❌ Less efficient - Multiple separate queries
# (Though sometimes necessary for different questions)

Cache Responses When Appropriate

For data that doesn’t change frequently:
from functools import lru_cache
import hashlib
import json

@lru_cache(maxsize=100)
def cached_search(query: str, tickers: str):
    """Cache search results for repeated queries"""
    return client.search(
        query=query,
        filters={"tickers": json.loads(tickers)}
    )

# Usage
result = cached_search("revenue growth", json.dumps(["AAPL"]))

Rate Limiting

Respect Rate Limits

Monitor rate limit headers and implement backoff:
def check_rate_limit(response):
    """Check and log rate limit status"""
    headers = response.headers

    limit = headers.get('X-RateLimit-Limit')
    remaining = headers.get('X-RateLimit-Remaining')
    reset = headers.get('X-RateLimit-Reset')

    if remaining and int(remaining) < 10:
        print(f"⚠️ Warning: Only {remaining} requests remaining until {reset}")

    return response

Implement Request Queuing

For high-volume applications:
import asyncio
from asyncio import Semaphore

class RateLimitedClient:
    def __init__(self, client, max_concurrent=5):
        self.client = client
        self.semaphore = Semaphore(max_concurrent)

    async def chat(self, messages):
        async with self.semaphore:
            return await self.client.chat(messages)
            await asyncio.sleep(0.1)  # Small delay between requests

Testing

Use Test Data

Create test cases with known answers:
import pytest

def test_basic_query():
    """Test basic chat functionality"""
    client = FintoolClient(api_key=os.getenv('FINTOOL_TEST_API_KEY'))

    response = client.chat(
        messages=[{"role": "user", "content": "What is Apple's ticker symbol?"}]
    )

    assert response['message']['role'] == 'assistant'
    assert 'AAPL' in response['message']['content']
    assert len(response.get('citations', [])) > 0

def test_conversation_context():
    """Test multi-turn conversation"""
    client = FintoolClient()

    # First query
    r1 = client.chat(
        messages=[{"role": "user", "content": "What is Tesla's revenue?"}]
    )

    # Follow-up with context
    r2 = client.chat(
        messages=[
            {"role": "user", "content": "What is Tesla's revenue?"},
            r1['message'],
            {"role": "user", "content": "What about net income?"}
        ]
    )

    assert r2['message']['role'] == 'assistant'

Monitoring & Logging

Log API Interactions

Track usage and debug issues:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def monitored_chat(client, messages):
    """Chat with logging"""
    logger.info(f"Sending chat request with {len(messages)} messages")

    try:
        response = client.chat(messages=messages)
        logger.info(f"Received response with {len(response.get('citations', []))} citations")
        return response

    except Exception as e:
        logger.error(f"Chat request failed: {e}")
        raise

Common Pitfalls

Avoid These Common Mistakes:
  1. ❌ Not including session_data in follow-up queries
  2. ❌ Exposing API keys in client-side code
  3. ❌ Ignoring rate limit headers
  4. ❌ Not handling streaming errors gracefully
  5. ❌ Forgetting to parse citation markers
  6. ❌ Making requests without appropriate filters
  7. ❌ Not implementing retry logic for transient failures

Summary Checklist

1

Security

  • API keys stored in environment variables
  • Keys never exposed client-side
  • Different keys for different environments
2

Context Management

  • Always return session_data in multi-turn conversations
  • Include full conversation history
  • Store assistant responses with metadata
3

User Experience

  • Use streaming for interactive applications
  • Display thinking states during processing
  • Show citations with source information
4

Reliability

  • Implement retry logic with exponential backoff
  • Handle specific error codes appropriately
  • Monitor rate limits
5

Performance

  • Use filters to narrow search scope
  • Enable/disable reranking based on use case
  • Cache when appropriate

Next Steps