Building a Robust Multi-Layered Architecture for API Quota Management
The Challenge of API Rate Limits
If you've worked with external APIs, particularly those with generous free tiers like Google's Gemini AI, you've likely encountered the dreaded "429 Too Many Requests" error. During a recent project involving extensive LLM testing, my system repeatedly hit these quota limits, resulting in evaluation failures and interrupted workflows.
The logs told a clear story:
ERROR: Status code: 429
Response: {
"error": {
"code": 429,
"message": "You exceeded your current quota, please check your plan and billing details",
"status": "RESOURCE_EXHAUSTED"
}
}
Rather than immediately upgrading to a paid tier, I decided to engineer a more efficient solution. This article details my multi-layered architecture for managing API quotas effectively, all while working entirely within free tier limitations.
My Solution: A Multi-Level Quota Management Architecture
I developed a comprehensive system with five interconnected components that work together to maximize API availability:
1. Intelligent Quota Manager (QuotaManager)
The cornerstone of my solution is a QuotaManager class that keeps track of API usage across different models and makes intelligent decisions about when and how to use them.
class QuotaManager:
def __init__(self):
self.lock = threading.Lock()
self.model_usage = defaultdict(list) # Stores timestamp of each API call
self.cooldown_until = defaultdict(float) # Timestamp when model will be available again
def _refresh_counters(self):
"""Remove timestamps older than 60 seconds"""
current_time = time.time()
for model in self.model_usage:
self.model_usage[model] = [t for t in self.model_usage[model]
if current_time - t < 60]
def _is_model_available(self, model: str) -> bool:
"""Check if a model is available or in cooldown"""
if time.time() < self.cooldown_until[model]:
return False
# Prevent exceeding 12 requests per minute (below the actual limit of 15)
# This gives us a safety buffer
return len(self.model_usage[model]) < 12
def record_usage(self, model: str):
"""Record that we've used a model"""
with self.lock:
self.model_usage[model].append(time.time())
def get_available_model(self, preferred_model: str) -> str:
"""Return the best available model currently"""
with self.lock:
self._refresh_counters()
# Check if preferred model is available
if self._is_model_available(preferred_model):
return preferred_model
# Otherwise, use model cascade
model_cascade = ["gemini-2.0-flash", "gemini-1.5-flash", "gemini-pro"]
for model in model_cascade:
if self._is_model_available(model) and model != preferred_model:
return model
# If no models available, return the one with fewest recent calls
return min(self.model_usage.items(), key=lambda x: len(x[1]))[0]
def set_cooldown(self, model: str, seconds: int = 60):
"""Put a model in cooldown when we hit quota limits"""
with self.lock:
self.cooldown_until[model] = time.time() + seconds
logging.info(f"Model {model} in cooldown for {seconds} seconds")
This manager implements several key strategies:
2. API Key Rotation System (APIKeyManager)
To further extend my available quota, I implemented a key rotation system that distributes requests across multiple API keys:
class APIKeyManager:
def __init__(self):
self.lock = threading.Lock()
self.api_keys = self._load_api_keys()
self.current_index = 0
def _load_api_keys(self) -> List[str]:
"""Load API keys from environment and .env file"""
keys = []
# Get keys from environment variables
env_key = os.environ.get("GEMINI_API_KEY")
if env_key:
keys.append(env_key)
# Get additional keys from .env.apikeys file
try:
with open(".env.apikeys", "r") as f:
for line in f:
if line.strip() and not line.startswith("#"):
keys.append(line.strip())
except FileNotFoundError:
pass
if not keys:
raise ValueError("No API keys found in environment or .env.apikeys file")
logging.info(f"Loaded {len(keys)} API keys")
return keys
def get_next_key(self) -> str:
"""Get the next API key in rotation"""
with self.lock:
key = self.api_keys[self.current_index]
self.current_index = (self.current_index + 1) % len(self.api_keys)
return key
This approach provides several benefits:
3. Automatic Rate Limiting
To prevent hitting quota limits in the first place, I implemented automatic rate limiting at the request level:
class RateLimiter:
def __init__(self, requests_per_minute: int = 12):
self.lock = threading.Lock()
self.min_interval = 60 / requests_per_minute # Minimum seconds between requests
self.last_request_time = 0
def wait_if_needed(self):
"""Wait if we're making requests too quickly"""
with self.lock:
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
sleep_time = self.min_interval - elapsed
logging.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_request_time = time.time()
class BackoffStrategy:
def __init__(self, base_delay: float = 2.0, max_retries: int = 5):
self.base_delay = base_delay
self.max_retries = max_retries
def get_delay(self, attempt: int) -> float:
"""Calculate exponential backoff delay"""
if attempt >= self.max_retries:
raise ValueError(f"Maximum retries ({self.max_retries}) exceeded")
return self.base_delay ** attempt
The rate limiting approach:
Recommended by LinkedIn
4. Model Cascade Implementation
My system includes a model cascade that allows for graceful degradation when the preferred model is unavailable:
class ModelCascade:
def __init__(self, quota_manager: QuotaManager):
self.quota_manager = quota_manager
self.primary_models = {
"fast": "gemini-2.0-flash",
"balanced": "gemini-1.5-flash",
"powerful": "gemini-pro"
}
def get_model(self, preference: str = "balanced") -> str:
"""Get the best available model based on preference and availability"""
preferred_model = self.primary_models.get(preference, "gemini-1.5-flash")
return self.quota_manager.get_available_model(preferred_model)
This implementation:
5. Robust Error Handling and Retry Logic
Finally, I implemented comprehensive error handling specific to quota-related issues:
class GeminiClient:
def __init__(self, api_key_manager: APIKeyManager, quota_manager: QuotaManager,
rate_limiter: RateLimiter, backoff: BackoffStrategy):
self.api_key_manager = api_key_manager
self.quota_manager = quota_manager
self.rate_limiter = rate_limiter
self.backoff = backoff
def _is_quota_error(self, error) -> bool:
"""Determine if an error is related to quota limits"""
if hasattr(error, 'code') and error.code == 429:
return True
if hasattr(error, 'message') and "quota" in error.message.lower():
return True
return False
def generate_content(self, model: str, prompt: str, max_retries: int = 3) -> str:
"""Generate content with automatic retry and fallback"""
attempts = 0
used_models = set()
while attempts < max_retries:
try:
# Apply rate limiting
self.rate_limiter.wait_if_needed()
# Get the current model to use
current_model = model if model not in used_models else self.quota_manager.get_available_model(model)
used_models.add(current_model)
# Get the next API key
api_key = self.api_key_manager.get_next_key()
# Create Gemini client
genai.configure(api_key=api_key)
gemini_model = genai.GenerativeModel(current_model)
# Record usage and make the actual API call
self.quota_manager.record_usage(current_model)
response = gemini_model.generate_content(prompt)
return response.text
except Exception as e:
attempts += 1
logging.warning(f"Attempt {attempts} failed: {str(e)}")
if self._is_quota_error(e):
# Handle quota error
self.quota_manager.set_cooldown(current_model)
continue # Try again immediately with different model
# For other errors, use exponential backoff
try:
delay = self.backoff.get_delay(attempts)
logging.info(f"Backing off for {delay} seconds")
time.sleep(delay)
except ValueError:
# Max retries exceeded
raise
raise Exception(f"Failed to generate content after {max_retries} attempts")
This approach provides:
Putting It All Together
Here's how I integrated all these components into a unified system:
def create_gemini_client() -> GeminiClient:
"""Factory function to create a fully configured Gemini client"""
api_key_manager = APIKeyManager()
quota_manager = QuotaManager()
rate_limiter = RateLimiter(requests_per_minute=12)
backoff = BackoffStrategy(base_delay=2.0, max_retries=5)
return GeminiClient(
api_key_manager=api_key_manager,
quota_manager=quota_manager,
rate_limiter=rate_limiter,
backoff=backoff
)
# Usage example
client = create_gemini_client()
model_cascade = ModelCascade(client.quota_manager)
# Generate content with automatic quota management
for question in evaluation_questions:
model = model_cascade.get_model(preference="fast")
try:
response = client.generate_content(model, question)
# Process response...
except Exception as e:
logging.error(f"Failed to process question: {str(e)}")
# Handle failure...
Results and Benefits
After implementing this multi-layered architecture, my system has achieved:
Lessons Learned
This project reinforced several important engineering principles:
Conclusion
API rate limits don't have to be a barrier to building reliable systems. With thoughtful architecture and implementation, you can maximize the value of free tiers and create robust applications that handle external service limitations gracefully.
My multi-layered approach has completely eliminated quota-related failures in my LLM evaluation system, allowing me to focus on my core work rather than managing API limitations.
What approaches have you used to handle API quota limits? I'd love to hear about your experiences and solutions in the comments!
AI Engineering Student
4dSuch a great and smart example of making a balanced compromise by sacrificing some LLM robustness for fewer quotas. I can see this extending to even paid APIs in order to save on the end-of-the-month bill.