429 Too Many Requests
error response.
/chat_generate
| 100 | Per minute | Non-streaming requests |
| /chat_generate
(streaming) | 30 | Per minute | Streaming requests (beta) |
| /audio
| 250 | Per minute | Non-streaming requests |
| /audio
(streaming) | 100 | Per minute | Streaming requests |
| All endpoints combined | 1,000 | Per hour | Total requests per API key |
usage_metadata
field in API responses provides information about token usage:
"usage_metadata": {"prompt_token_count": 12,"candidates_token_count": 8,"total_token_count": 20}
prompt
| 32,000 tokens | Approximately 24,000 words |
| text
(TTS) | 2,000 characters | For text-to-speech requests |
| Response length | 4,096 tokens | Maximum generated text length |
429 Too Many Requests
response with a Retry-After
header indicating the number of seconds to wait before retrying.
HTTP/1.1 429 Too Many RequestsContent-Type: application/jsonRetry-After: 5{"status": "error","error": {"code": "RATE_LIMIT_EXCEEDED","message": "Rate limit exceeded. Please reduce request frequency."}}
async function callWithBackoff(fn, maxRetries = 5) {let retries = 0;while (retries < maxRetries) {try {return await fn();} catch (error) {if (error.status === 429) {retries++;const retryAfter = parseInt(error.headers.get("Retry-After") || "1",10,);const backoffTime = retryAfter * 1000 * Math.pow(1.5, retries - 1);console.log(`Rate limited. Retrying after ${backoffTime}ms (retry ${retries}/${maxRetries})`,);await new Promise((resolve) => setTimeout(resolve, backoffTime));} else {throw error; // Re-throw non-rate-limit errors}}}throw new Error(`Failed after ${maxRetries} retries due to rate limiting`);}
class RequestQueue {constructor(requestsPerMinute) {this.queue = [];this.processing = false;this.interval = 60000 / requestsPerMinute; // Time between requests}async add(requestFn) {return new Promise((resolve, reject) => {this.queue.push({ requestFn, resolve, reject });if (!this.processing) {this.process();}});}async process() {if (this.queue.length === 0) {this.processing = false;return;}this.processing = true;const { requestFn, resolve, reject } = this.queue.shift();try {const result = await requestFn();resolve(result);} catch (error) {reject(error);}// Wait before processing next requestsetTimeout(() => this.process(), this.interval);}}// Usageconst apiQueue = new RequestQueue(50); // 50 requests per minuteasync function callAPI(params) {return apiQueue.add(() => actualAPICall(params));}
let tokenUsage = {daily: 0,monthly: 0,};function updateTokenUsage(response) {if (response.usage_metadata) {tokenUsage.daily += response.usage_metadata.total_token_count;tokenUsage.monthly += response.usage_metadata.total_token_count;// Check if approaching limitsconst dailyLimit = 50000; // Example limitif (tokenUsage.daily > dailyLimit * 0.8) {console.warn(`Warning: Using ${tokenUsage.daily}/${dailyLimit} daily tokens (${Math.round((tokenUsage.daily / dailyLimit) * 100,)}%)`,);}}}
function pruneConversationHistory(history, maxTokens = 2000) {// Start with the most recent messageslet prunedHistory = [...history].reverse();let tokenCount = 0;let keepMessages = [];// Estimate token count (rough approximation)for (const message of prunedHistory) {const messageTokens = message.content.split(/\s+/).length * 1.3; // Rough estimationif (tokenCount + messageTokens <= maxTokens) {keepMessages.unshift(message); // Add to the beginning to maintain ordertokenCount += messageTokens;} else {break;}}return keepMessages;}
const responseCache = new Map();async function getCachedResponse(prompt, language) {const cacheKey = `${prompt}:${language}`;// Check cache firstif (responseCache.has(cacheKey)) {return responseCache.get(cacheKey);}// Make API call if not cachedconst response = await callAddisAI(prompt, language);// Cache the response (with TTL)responseCache.set(cacheKey, response);setTimeout(() => responseCache.delete(cacheKey), 3600000); // 1 hour TTLreturn response;}
X-RateLimit-Limit
: Your total request limitX-RateLimit-Remaining
: Remaining requests in the current windowX-RateLimit-Reset
: Time when the current window resets (Unix timestamp)usage_metadata
field in each response provides token usage for that specific request.
import requestsimport timeimport jsondef call_with_rate_limit_handling(prompt, target_language):"""Call the Addis AI API with basic rate limit handling."""api_key = "YOUR_API_KEY"url = "https://api.addisassistant.com/api/v1/chat_generate"headers = {"Content-Type": "application/json","X-API-Key": api_key}data = {"prompt": prompt,"target_language": target_language}try:response = requests.post(url, headers=headers, json=data)# Check for rate limitif response.status_code == 429:# Get retry after timeretry_after = int(response.headers.get("Retry-After", "60"))print(f"Rate limit exceeded. Waiting for {retry_after} seconds before retrying.")# Wait for the specified timetime.sleep(retry_after)# Retry the requestreturn call_with_rate_limit_handling(prompt, target_language)# Process successful responseif response.status_code == 200:return response.json()# Handle other errorsprint(f"Error: {response.status_code}")try:print(response.json())except:print(response.text)return Noneexcept requests.exceptions.RequestException as e:print(f"Request failed: {str(e)}")return None# Example usageresponse = call_with_rate_limit_handling(prompt="What is the capital of Ethiopia?",target_language="am")if response:print(response["response_text"])
import requestsimport timeimport threadingimport jsonfrom collections import dequeclass RateLimiter:"""Implements a token bucket algorithm for rate limiting API calls."""def __init__(self, rate_per_minute, max_burst=None):self.rate = rate_per_minute / 60.0 # Tokens per secondself.max_tokens = max_burst or rate_per_minuteself.tokens = self.max_tokensself.last_update = time.time()self.lock = threading.Lock()# Queue for tracking token usageself.token_usage_queue = deque()self.daily_token_limit = 50000 # Example limitself.daily_token_usage = 0def _update_tokens(self):"""Update the token count based on elapsed time."""now = time.time()elapsed = now - self.last_updatenew_tokens = elapsed * self.ratewith self.lock:self.tokens = min(self.max_tokens, self.tokens + new_tokens)self.last_update = nowdef try_acquire(self):"""Try to acquire a token. Returns True if successful, False otherwise."""self._update_tokens()with self.lock:if self.tokens >= 1:self.tokens -= 1return Truereturn Falsedef acquire(self, block=True, timeout=None):"""Acquire a token, waiting if necessary.Args:block (bool): Whether to block until a token is available.timeout (float): Maximum time to wait for a token.Returns:bool: True if a token was acquired, False otherwise."""if not block:return self.try_acquire()start_time = time.time()while timeout is None or time.time() - start_time < timeout:if self.try_acquire():return True# Sleep for a small intervaltime.sleep(0.01)return Falsedef update_token_usage(self, token_count):"""Update token usage tracking.Args:token_count (int): The number of tokens used in the current request."""with self.lock:now = time.time()# Add current usageself.token_usage_queue.append((now, token_count))self.daily_token_usage += token_count# Clean up old entries (older than 24 hours)day_ago = now - 86400 # 24 hours in secondswhile self.token_usage_queue and self.token_usage_queue[0][0] < day_ago:_, old_count = self.token_usage_queue.popleft()self.daily_token_usage -= old_countdef check_token_limits(self):"""Check if we're approaching token limits.Returns:tuple: (is_approaching_limit, percentage_used)"""with self.lock:percentage = (self.daily_token_usage / self.daily_token_limit) * 100is_approaching = percentage > 80return is_approaching, percentageclass AddisAIClient:"""Client for the Addis AI API with rate limiting."""def __init__(self, api_key, rate_per_minute=60):self.api_key = api_keyself.base_url = "https://api.addisassistant.com/api/v1"# Create rate limiterself.rate_limiter = RateLimiter(rate_per_minute)# Headersself.headers = {"Content-Type": "application/json","X-API-Key": api_key}def chat_generate(self, prompt, target_language,conversation_history=None, generation_config=None):"""Call the chat_generate endpoint with rate limiting."""# Check token limitsapproaching_limit, percentage = self.rate_limiter.check_token_limits()if approaching_limit:print(f"Warning: Approaching daily token limit ({percentage:.1f}%)")# Acquire token from rate limiterif not self.rate_limiter.acquire(timeout=300): # Wait up to 5 minutesraise Exception("Failed to acquire rate limit token after timeout")# Prepare requesturl = f"{self.base_url}/chat_generate"data = {"prompt": prompt,"target_language": target_language}if conversation_history:data["conversation_history"] = conversation_historyif generation_config:data["generation_config"] = generation_config# Make requesttry:response = requests.post(url, headers=self.headers, json=data)# Handle rate limitingif response.status_code == 429:retry_after = int(response.headers.get("Retry-After", "60"))print(f"Rate limit exceeded. Waiting for {retry_after} seconds.")time.sleep(retry_after + 1) # Add 1 second buffer# Retry after waitingreturn self.chat_generate(prompt, target_language, conversation_history, generation_config)# Parse responseif response.status_code == 200:result = response.json()# Update token usage trackingif "usage_metadata" in result:token_count = result["usage_metadata"].get("total_token_count", 0)self.rate_limiter.update_token_usage(token_count)return resultelse:print(f"Error: {response.status_code}")try:print(response.json())except:print(response.text)return Noneexcept requests.exceptions.RequestException as e:print(f"Request failed: {str(e)}")return None# Example usageclient = AddisAIClient(api_key="YOUR_API_KEY", rate_per_minute=50)# Process multiple requests with rate limitingrequests_to_process = [("What is the capital of Ethiopia?", "am"),("How many regions are in Ethiopia?", "am"),("Tell me about Ethiopian cuisine", "am"),# Add more requests...]for prompt, language in requests_to_process:response = client.chat_generate(prompt, language)if response:print(f"Prompt: {prompt}")print(f"Response: {response['response_text']}")print("---")# No need to add artificial delay; the rate limiter handles this
import redef estimate_token_count(text):"""Estimate the number of tokens in a string.This is a simple approximation - actual tokenization varies by model."""# Rough approximation: 1 token ≈ 4 characters for English, less for non-Latin scriptsreturn len(text) / 3 # Adjust for Amharic/Afan Oromo which may use more bytes per characterdef prune_conversation_history(history, max_tokens=2000):"""Prune conversation history to stay under token limits.Args:history (list): The conversation historymax_tokens (int): Maximum token budgetReturns:list: Pruned conversation history"""if not history:return []# Calculate token counts for each messagetoken_counts = []for message in history:if "content" in message:token_counts.append(estimate_token_count(message["content"]))elif "parts" in message:# Sum tokens in text partsparts_count = 0for part in message["parts"]:if "text" in part:parts_count += estimate_token_count(part["text"])token_counts.append(parts_count)else:# Default if we can't determinetoken_counts.append(50) # Assume a default size# Start with most recent messages and work backwardtotal_tokens = 0keep_indices = []for i in range(len(history) - 1, -1, -1):if total_tokens + token_counts[i] <= max_tokens:keep_indices.append(i)total_tokens += token_counts[i]else:break# Sort indices to maintain original orderkeep_indices.sort()# If we couldn't keep any messages, at least keep the latest one# (possibly truncated)if not keep_indices and history:latest_msg = history[-1].copy()# Truncate content if neededif "content" in latest_msg:content = latest_msg["content"]# Estimate how much to keepkeep_chars = int(max_tokens * 3) # Convert tokens to approximate char countif len(content) > keep_chars:latest_msg["content"] = content[:keep_chars] + "..."return [latest_msg]return [history[i] for i in keep_indices]# Example usageconversation_history = [{"role": "user", "content": "What is Ethiopia?"},{"role": "assistant", "content": "Ethiopia is a country located in the Horn of Africa..."},{"role": "user", "content": "Tell me about its history."},{"role": "assistant", "content": "Ethiopia has a rich history dating back thousands of years..."},{"role": "user", "content": "What about its culture?"}]# Prune to stay under token limitpruned_history = prune_conversation_history(conversation_history, max_tokens=500)print(f"Kept {len(pruned_history)} of {len(conversation_history)} messages")# Use pruned history in API callclient = AddisAIClient(api_key="YOUR_API_KEY")response = client.chat_generate(prompt="Tell me more about Ethiopian cuisine",target_language="am",conversation_history=pruned_history)