Overview
Follow these best practices to build robust, efficient applications with the Fintool API.
Authentication & Security
Never hardcode API keys in your application code. Use environment variables or secure credential management systems. # ✅ Good - Use environment variables
import os
API_KEY = os.getenv( 'FINTOOL_API_KEY' )
# ❌ Bad - Hardcoded key
API_KEY = "mint_abc123..."
Use Different Keys for Different Environments
Request separate API keys for development, staging, and production to maintain security and enable easier key rotation.
Periodically rotate your API keys as a security best practice. Plan for key rotation by designing your application to handle key updates gracefully.
Never Expose Keys Client-Side
Always make API calls from your backend server. Never include API keys in client-side JavaScript, mobile apps, or public repositories.
Session Management
Always Return session_data
The most important best practice for maintaining conversation context:
Always include the session_data from previous responses in your next request to maintain conversation context and get more accurate answers.
# Correct way to maintain context
conversation = []
# First query
response1 = client.chat(
messages = [{ "role" : "user" , "content" : "What is Apple's revenue?" }]
)
# Store the complete assistant message with metadata
conversation.append({
"role" : "assistant" ,
"content" : response1[ 'message' ][ 'content' ],
"metadata" : response1[ 'message' ][ 'metadata' ] # ✅ Include session_data
})
# Follow-up query
conversation.append({ "role" : "user" , "content" : "How does that compare to last year?" })
response2 = client.chat( messages = conversation)
Why session_data Matters
Context Preservation : Maintains conversation history and context
Better Answers : AI understands what “that” and “it” refer to
Improved Performance : Reduces redundant processing
Cost Efficiency : Avoids re-analyzing the same documents
Streaming for Better UX
When to Use Streaming
✅ Use Streaming
Interactive chat applications
User-facing interfaces
Real-time analytics dashboards
Live data exploration
❌ Don't Use Streaming
Batch processing jobs
Background data analysis
Scheduled reports
Non-interactive workflows
Streaming Implementation Tips
# Show thinking states to keep users engaged
for event in client.chat( messages = messages, stream = True ):
if event.get( 'type' ) == 'message' :
message = event[ 'message' ]
# Display thinking/processing state
if 'thinking' in message:
print ( f "💭 { message[ 'thinking' ] } " )
# Stream content progressively
if 'content' in message:
print (message[ 'content' ], end = '' , flush = True )
Filtering Strategies
Ticker Filtering
Always include ticker symbols when you know which companies are relevant:
# ✅ Good - Specific tickers
response = client.chat(
messages = [{ "role" : "user" , "content" : "Compare revenue growth" }],
filters = { "tickers" : [ "AAPL" , "MSFT" ]}
)
# ❌ Less optimal - No ticker filter for company-specific question
response = client.chat(
messages = [{ "role" : "user" , "content" : "What is Apple's revenue?" }]
)
Document Type Selection
Choose appropriate document types for your query:
Annual Data
Quarterly Updates
Material Events
Use 10-K forms for comprehensive annual information: filters = {
"doc_types" : [ "10-K" ],
"dates" : { "start" : "2024-01-01" , "end" : "2024-12-31" }
}
Date Range Filtering
Specify date ranges to focus on relevant time periods:
# Recent data only
filters = {
"dates" : {
"start" : "2024-01-01" ,
"end" : "2024-12-31"
}
}
# Specific quarter
filters = {
"dates" : {
"start" : "2024-10-01" ,
"end" : "2024-12-31"
}
}
Error Handling
Implement Retry Logic
Handle transient failures with exponential backoff:
import time
from requests.exceptions import HTTPError, Timeout
def chat_with_retry ( client , messages , max_retries = 3 ):
"""Chat request with exponential backoff retry"""
for attempt in range (max_retries):
try :
return client.chat( messages = messages)
except HTTPError as e:
if e.response.status_code == 429 : # Rate limit
wait_time = ( 2 ** attempt) # Exponential backoff: 1s, 2s, 4s
print ( f "Rate limited. Waiting { wait_time } s..." )
time.sleep(wait_time)
elif e.response.status_code >= 500 : # Server error
if attempt < max_retries - 1 :
print ( f "Server error. Retrying... ( { attempt + 1 } / { max_retries } )" )
time.sleep( 1 )
else :
raise
else :
# Client error (4xx) - don't retry
raise
except Timeout:
if attempt < max_retries - 1 :
print ( f "Timeout. Retrying... ( { attempt + 1 } / { max_retries } )" )
time.sleep( 1 )
else :
raise
raise Exception ( "Max retries exceeded" )
Handle Specific Error Codes
try :
response = client.chat( messages = messages)
except HTTPError as e:
if e.response.status_code == 401 :
print ( "❌ Authentication failed. Check your API key." )
elif e.response.status_code == 429 :
print ( "⚠️ Rate limit exceeded. Slow down requests." )
elif e.response.status_code == 400 :
error_data = e.response.json()
print ( f "❌ Invalid request: { error_data.get( 'error' , {}).get( 'message' ) } " )
else :
print ( f "❌ API error: { e } " )
Citation Handling
Parse Citation Markers
Always parse and display citations to users:
import re
def extract_citations ( content : str , citations : List[Dict]) -> str :
"""Replace citation markers with clickable references"""
citation_map = {c[ 'chunk_id' ]: c for c in citations}
counter = 1
footnotes = []
def replace_marker ( match ):
nonlocal counter
chunk_id = match.group( 1 )
if chunk_id in citation_map:
citation = citation_map[chunk_id]
footnotes.append(
f "[ { counter } ] { citation[ 'document_title' ] } , "
f "Page { citation[ 'page_number' ] } "
)
result = f "[ { counter } ]"
counter += 1
return result
return match.group( 0 )
# Replace markers
formatted = re.sub( r ' \*\*\[ ([ ^ \] ] + ) \]\*\* ' , replace_marker, content)
# Add footnotes
if footnotes:
formatted += " \n\n Sources: \n " + " \n " .join(footnotes)
return formatted
Make citations actionable for users:
# Extract and display citations
for citation in response[ 'citations' ]:
print ( f """
📄 { citation[ 'document_title' ] }
📍 Page { citation[ 'page_number' ] }
🎯 Relevance: { citation[ 'relevance_score' ] :.0%}
""" )
Use Reranking Strategically
Reranking improves relevance but adds latency (~200ms):
# ✅ Use reranking for user-facing searches
search_results = client.search(
query = "..." ,
rerank = True # Better quality, slightly slower
)
# ✅ Disable for speed-critical applications
search_results = client.search(
query = "..." ,
rerank = False # Faster, good enough for many use cases
)
Batch Similar Queries
If you need to analyze multiple companies, batch your queries:
# ✅ Good - Single query with multiple tickers
response = client.chat(
messages = [{
"role" : "user" ,
"content" : "Compare revenue growth for Apple, Microsoft, and Google"
}],
filters = { "tickers" : [ "AAPL" , "MSFT" , "GOOGL" ]}
)
# ❌ Less efficient - Multiple separate queries
# (Though sometimes necessary for different questions)
Cache Responses When Appropriate
For data that doesn’t change frequently:
from functools import lru_cache
import hashlib
import json
@lru_cache ( maxsize = 100 )
def cached_search ( query : str , tickers : str ):
"""Cache search results for repeated queries"""
return client.search(
query = query,
filters = { "tickers" : json.loads(tickers)}
)
# Usage
result = cached_search( "revenue growth" , json.dumps([ "AAPL" ]))
Rate Limiting
Respect Rate Limits
Monitor rate limit headers and implement backoff:
def check_rate_limit ( response ):
"""Check and log rate limit status"""
headers = response.headers
limit = headers.get( 'X-RateLimit-Limit' )
remaining = headers.get( 'X-RateLimit-Remaining' )
reset = headers.get( 'X-RateLimit-Reset' )
if remaining and int (remaining) < 10 :
print ( f "⚠️ Warning: Only { remaining } requests remaining until { reset } " )
return response
Implement Request Queuing
For high-volume applications:
import asyncio
from asyncio import Semaphore
class RateLimitedClient :
def __init__ ( self , client , max_concurrent = 5 ):
self .client = client
self .semaphore = Semaphore(max_concurrent)
async def chat ( self , messages ):
async with self .semaphore:
return await self .client.chat(messages)
await asyncio.sleep( 0.1 ) # Small delay between requests
Testing
Use Test Data
Create test cases with known answers:
import pytest
def test_basic_query ():
"""Test basic chat functionality"""
client = FintoolClient( api_key = os.getenv( 'FINTOOL_TEST_API_KEY' ))
response = client.chat(
messages = [{ "role" : "user" , "content" : "What is Apple's ticker symbol?" }]
)
assert response[ 'message' ][ 'role' ] == 'assistant'
assert 'AAPL' in response[ 'message' ][ 'content' ]
assert len (response.get( 'citations' , [])) > 0
def test_conversation_context ():
"""Test multi-turn conversation"""
client = FintoolClient()
# First query
r1 = client.chat(
messages = [{ "role" : "user" , "content" : "What is Tesla's revenue?" }]
)
# Follow-up with context
r2 = client.chat(
messages = [
{ "role" : "user" , "content" : "What is Tesla's revenue?" },
r1[ 'message' ],
{ "role" : "user" , "content" : "What about net income?" }
]
)
assert r2[ 'message' ][ 'role' ] == 'assistant'
Monitoring & Logging
Log API Interactions
Track usage and debug issues:
import logging
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
def monitored_chat ( client , messages ):
"""Chat with logging"""
logger.info( f "Sending chat request with { len (messages) } messages" )
try :
response = client.chat( messages = messages)
logger.info( f "Received response with { len (response.get( 'citations' , [])) } citations" )
return response
except Exception as e:
logger.error( f "Chat request failed: { e } " )
raise
Common Pitfalls
Avoid These Common Mistakes:
❌ Not including session_data in follow-up queries
❌ Exposing API keys in client-side code
❌ Ignoring rate limit headers
❌ Not handling streaming errors gracefully
❌ Forgetting to parse citation markers
❌ Making requests without appropriate filters
❌ Not implementing retry logic for transient failures
Summary Checklist
Next Steps