Prompt_Uzmani / api_integrations.py
aimevzulari's picture
Upload 2 files
84d0efc verified
"""
API entegrasyonları için yardımcı fonksiyonlar.
Bu modül, OpenAI, Google Gemini ve OpenRouter API'leri ile etkileşim için gerekli fonksiyonları içerir.
"""
import os
import json
import time
import threading
from typing import Dict, Any, List, Optional
import openai
from google import generativeai as genai
import requests
from dotenv import load_dotenv
# Prompt şablonlarından model listelerini içe aktar
from prompt_templates import OPENAI_MODELS, GEMINI_MODELS, OPENROUTER_MODELS
# .env dosyasını yükle (varsa)
load_dotenv()
# Model önbelleği için global değişkenler
MODEL_CACHE = {
"openai": {
"models": [],
"last_updated": 0,
"update_interval": 3600 # 1 saat (saniye cinsinden)
},
"gemini": {
"models": [],
"last_updated": 0,
"update_interval": 3600 # 1 saat (saniye cinsinden)
},
"openrouter": {
"models": [],
"last_updated": 0,
"update_interval": 3600 # 1 saat (saniye cinsinden)
}
}
# Model önbelleği dosya yolu
CACHE_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model_cache.json")
def load_model_cache():
"""
Disk üzerindeki model önbelleğini yükler.
"""
global MODEL_CACHE
try:
if os.path.exists(CACHE_FILE_PATH):
with open(CACHE_FILE_PATH, 'r') as f:
cache_data = json.load(f)
MODEL_CACHE = cache_data
print(f"Model önbelleği yüklendi: {len(MODEL_CACHE['openai']['models'])} OpenAI, {len(MODEL_CACHE['gemini']['models'])} Gemini, {len(MODEL_CACHE['openrouter']['models'])} OpenRouter modeli")
except Exception as e:
print(f"Model önbelleği yüklenirken hata oluştu: {str(e)}")
def save_model_cache():
"""
Model önbelleğini diske kaydeder.
"""
try:
with open(CACHE_FILE_PATH, 'w') as f:
json.dump(MODEL_CACHE, f)
print("Model önbelleği diske kaydedildi")
except Exception as e:
print(f"Model önbelleği kaydedilirken hata oluştu: {str(e)}")
# Uygulama başlangıcında önbelleği yükle
load_model_cache()
class APIManager:
"""
API yönetimi için sınıf.
Bu sınıf, API anahtarlarını yönetir ve API'lerin durumunu kontrol eder.
"""
def __init__(self):
"""
API yöneticisini başlat.
"""
self.api_keys = {
"openai": os.getenv("OPENAI_API_KEY", ""),
"gemini": os.getenv("GEMINI_API_KEY", ""),
"openrouter": os.getenv("OPENROUTER_API_KEY", "")
}
def set_api_key(self, provider: str, api_key: str) -> None:
"""
Belirli bir sağlayıcı için API anahtarını ayarlar.
Args:
provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter')
api_key (str): API anahtarı
"""
if provider in self.api_keys:
self.api_keys[provider] = api_key
# API anahtarını ilgili kütüphane için de ayarla
if provider == "openai":
openai.api_key = api_key
elif provider == "gemini":
genai.configure(api_key=api_key)
# API anahtarı değiştiğinde model önbelleğini sıfırla
global MODEL_CACHE
MODEL_CACHE[provider]["last_updated"] = 0
def get_api_key(self, provider: str) -> str:
"""
Belirli bir sağlayıcı için API anahtarını döndürür.
Args:
provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter')
Returns:
str: API anahtarı
"""
return self.api_keys.get(provider, "")
def check_api_key_validity(self, provider: str) -> bool:
"""
Belirli bir sağlayıcı için API anahtarının geçerliliğini kontrol eder.
Args:
provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter')
Returns:
bool: API anahtarı geçerli ise True, değilse False
"""
api_key = self.get_api_key(provider)
if not api_key:
return False
try:
if provider == "openai":
openai.api_key = api_key
# Basit bir model listesi isteği ile API anahtarının geçerliliğini kontrol et
openai.models.list()
return True
elif provider == "gemini":
genai.configure(api_key=api_key)
# Kullanılabilir modelleri listeleyerek API anahtarının geçerliliğini kontrol et
genai.list_models()
return True
elif provider == "openrouter":
headers = {
"Authorization": f"Bearer {api_key}"
}
response = requests.get(
"https://openrouter.ai/api/v1/models",
headers=headers
)
return response.status_code == 200
return False
except Exception:
return False
class OpenAIHandler:
"""
OpenAI API ile etkileşim için sınıf.
"""
def __init__(self, api_key: str = ""):
"""
OpenAI işleyicisini başlat.
Args:
api_key (str, optional): OpenAI API anahtarı
"""
self.api_key = api_key
if api_key:
openai.api_key = api_key
def set_api_key(self, api_key: str) -> None:
"""
OpenAI API anahtarını ayarlar.
Args:
api_key (str): OpenAI API anahtarı
"""
self.api_key = api_key
openai.api_key = api_key
# API anahtarı değiştiğinde model önbelleğini sıfırla
global MODEL_CACHE
MODEL_CACHE["openai"]["last_updated"] = 0
def fetch_models_from_api(self) -> List[str]:
"""
OpenAI API'sinden modelleri çeker.
Returns:
List[str]: API'den çekilen modeller listesi
"""
if not self.api_key:
return []
try:
models = openai.models.list()
# Sadece GPT modellerini filtrele
gpt_models = [model.id for model in models.data if "gpt" in model.id.lower()]
return gpt_models
except Exception as e:
print(f"OpenAI modellerini çekerken hata oluştu: {str(e)}")
return []
def get_available_models(self, force_refresh: bool = False) -> List[str]:
"""
Kullanılabilir OpenAI modellerini döndürür.
Args:
force_refresh (bool): Önbelleği zorla yenileme
Returns:
List[str]: Kullanılabilir modeller listesi
"""
global MODEL_CACHE
current_time = time.time()
cache = MODEL_CACHE["openai"]
# Önbellek yenileme koşulları:
# 1. Zorla yenileme istenmiş
# 2. Önbellek boş
# 3. Önbellek güncellenme zamanı aşılmış
if (force_refresh or
not cache["models"] or
current_time - cache["last_updated"] > cache["update_interval"]):
# API'den modelleri çek
api_models = self.fetch_models_from_api()
if api_models:
# API'den modeller başarıyla çekildiyse önbelleği güncelle
all_models = list(set(api_models + OPENAI_MODELS))
# Varsayılan modelleri önceliklendir
sorted_models = sorted(
all_models,
key=lambda x: (
0 if x in OPENAI_MODELS else 1,
OPENAI_MODELS.index(x) if x in OPENAI_MODELS else float('inf')
)
)
# Önbelleği güncelle
MODEL_CACHE["openai"]["models"] = sorted_models
MODEL_CACHE["openai"]["last_updated"] = current_time
# Önbelleği diske kaydet
save_model_cache()
return sorted_models
else:
# API'den model çekilemediyse önbellekteki modelleri kullan
if cache["models"]:
return cache["models"]
else:
return OPENAI_MODELS
else:
# Önbellek güncel, önbellekteki modelleri kullan
return cache["models"] if cache["models"] else OPENAI_MODELS
def generate_response(self, prompt: str, model: str = "gpt-3.5-turbo", temperature: float = 0.7, max_tokens: int = 2000) -> Dict[str, Any]:
"""
OpenAI API kullanarak yanıt oluşturur.
Args:
prompt (str): Gönderilecek prompt
model (str): Kullanılacak model
temperature (float): Sıcaklık değeri (0.0-1.0)
max_tokens (int): Maksimum token sayısı
Returns:
Dict[str, Any]: Yanıt bilgilerini içeren sözlük
"""
if not self.api_key:
return {"success": False, "error": "OpenAI API anahtarı ayarlanmamış.", "content": ""}
try:
response = openai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens
)
return {
"success": True,
"content": response.choices[0].message.content,
"model": model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
except Exception as e:
return {"success": False, "error": str(e), "content": ""}
class GeminiHandler:
"""
Google Gemini API ile etkileşim için sınıf.
"""
def __init__(self, api_key: str = ""):
"""
Gemini işleyicisini başlat.
Args:
api_key (str, optional): Gemini API anahtarı
"""
self.api_key = api_key
if api_key:
genai.configure(api_key=api_key)
def set_api_key(self, api_key: str) -> None:
"""
Gemini API anahtarını ayarlar.
Args:
api_key (str): Gemini API anahtarı
"""
self.api_key = api_key
genai.configure(api_key=api_key)
# API anahtarı değiştiğinde model önbelleğini sıfırla
global MODEL_CACHE
MODEL_CACHE["gemini"]["last_updated"] = 0
def fetch_models_from_api(self) -> List[str]:
"""
Gemini API'sinden modelleri çeker.
Returns:
List[str]: API'den çekilen modeller listesi
"""
if not self.api_key:
return []
try:
models = genai.list_models()
api_models = []
# Tüm model türlerini topla (gemini, imagen, veo, vb.)
for model in models:
model_name = model.name.split("/")[-1]
if any(keyword in model_name.lower() for keyword in ["gemini", "imagen", "veo"]):
api_models.append(model_name)
return api_models
except Exception as e:
print(f"Gemini modellerini çekerken hata oluştu: {str(e)}")
return []
def get_available_models(self, force_refresh: bool = False) -> List[str]:
"""
Kullanılabilir Gemini modellerini döndürür.
Args:
force_refresh (bool): Önbelleği zorla yenileme
Returns:
List[str]: Kullanılabilir modeller listesi
"""
global MODEL_CACHE
current_time = time.time()
cache = MODEL_CACHE["gemini"]
# Önbellek yenileme koşulları:
# 1. Zorla yenileme istenmiş
# 2. Önbellek boş
# 3. Önbellek güncellenme zamanı aşılmış
if (force_refresh or
not cache["models"] or
current_time - cache["last_updated"] > cache["update_interval"]):
# API'den modelleri çek
api_models = self.fetch_models_from_api()
if api_models:
# API'den modeller başarıyla çekildiyse önbelleği güncelle
all_models = list(set(api_models + GEMINI_MODELS))
# Varsayılan modelleri önceliklendir
sorted_models = sorted(
all_models,
key=lambda x: (
0 if x in GEMINI_MODELS else 1,
GEMINI_MODELS.index(x) if x in GEMINI_MODELS else float('inf')
)
)
# Önbelleği güncelle
MODEL_CACHE["gemini"]["models"] = sorted_models
MODEL_CACHE["gemini"]["last_updated"] = current_time
# Önbelleği diske kaydet
save_model_cache()
return sorted_models
else:
# API'den model çekilemediyse önbellekteki modelleri kullan
if cache["models"]:
return cache["models"]
else:
return GEMINI_MODELS
else:
# Önbellek güncel, önbellekteki modelleri kullan
return cache["models"] if cache["models"] else GEMINI_MODELS
def generate_response(self, prompt: str, model: str = "gemini-1.5-pro", temperature: float = 0.7) -> Dict[str, Any]:
"""
Gemini API kullanarak yanıt oluşturur.
Args:
prompt (str): Gönderilecek prompt
model (str): Kullanılacak model
temperature (float): Sıcaklık değeri (0.0-1.0)
Returns:
Dict[str, Any]: Yanıt bilgilerini içeren sözlük
"""
if not self.api_key:
return {"success": False, "error": "Gemini API anahtarı ayarlanmamış.", "content": ""}
try:
model_obj = genai.GenerativeModel(model)
response = model_obj.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
temperature=temperature
)
)
return {
"success": True,
"content": response.text,
"model": model
}
except Exception as e:
return {"success": False, "error": str(e), "content": ""}
class OpenRouterHandler:
"""
OpenRouter API ile etkileşim için sınıf.
"""
def __init__(self, api_key: str = ""):
"""
OpenRouter işleyicisini başlat.
Args:
api_key (str, optional): OpenRouter API anahtarı
"""
self.api_key = api_key
self.base_url = "https://openrouter.ai/api/v1"
def set_api_key(self, api_key: str) -> None:
"""
OpenRouter API anahtarını ayarlar.
Args:
api_key (str): OpenRouter API anahtarı
"""
self.api_key = api_key
# API anahtarı değiştiğinde model önbelleğini sıfırla
global MODEL_CACHE
MODEL_CACHE["openrouter"]["last_updated"] = 0
def fetch_models_from_api(self) -> List[str]:
"""
OpenRouter API'sinden modelleri çeker.
Returns:
List[str]: API'den çekilen modeller listesi
"""
if not self.api_key:
return []
try:
headers = {
"Authorization": f"Bearer {self.api_key}"
}
response = requests.get(
f"{self.base_url}/models",
headers=headers
)
if response.status_code == 200:
models_data = response.json()
api_models = [model["id"] for model in models_data["data"]]
return api_models
else:
print(f"OpenRouter modellerini çekerken HTTP hatası: {response.status_code}")
return []
except Exception as e:
print(f"OpenRouter modellerini çekerken hata oluştu: {str(e)}")
return []
def get_available_models(self, force_refresh: bool = False) -> List[str]:
"""
Kullanılabilir OpenRouter modellerini döndürür.
Args:
force_refresh (bool): Önbelleği zorla yenileme
Returns:
List[str]: Kullanılabilir modeller listesi
"""
global MODEL_CACHE
current_time = time.time()
cache = MODEL_CACHE["openrouter"]
# Önbellek yenileme koşulları:
# 1. Zorla yenileme istenmiş
# 2. Önbellek boş
# 3. Önbellek güncellenme zamanı aşılmış
if (force_refresh or
not cache["models"] or
current_time - cache["last_updated"] > cache["update_interval"]):
# API'den modelleri çek
api_models = self.fetch_models_from_api()
if api_models:
# API'den modeller başarıyla çekildiyse önbelleği güncelle
all_models = list(set(api_models + OPENROUTER_MODELS))
# Varsayılan modelleri önceliklendir
sorted_models = sorted(
all_models,
key=lambda x: (
0 if x in OPENROUTER_MODELS else 1,
OPENROUTER_MODELS.index(x) if x in OPENROUTER_MODELS else float('inf')
)
)
# Önbelleği güncelle
MODEL_CACHE["openrouter"]["models"] = sorted_models
MODEL_CACHE["openrouter"]["last_updated"] = current_time
# Önbelleği diske kaydet
save_model_cache()
return sorted_models
else:
# API'den model çekilemediyse önbellekteki modelleri kullan
if cache["models"]:
return cache["models"]
else:
return OPENROUTER_MODELS
else:
# Önbellek güncel, önbellekteki modelleri kullan
return cache["models"] if cache["models"] else OPENROUTER_MODELS
def generate_response(self, prompt: str, model: str = "openai/gpt-4-turbo", temperature: float = 0.7, max_tokens: int = 2000) -> Dict[str, Any]:
"""
OpenRouter API kullanarak yanıt oluşturur.
Args:
prompt (str): Gönderilecek prompt
model (str): Kullanılacak model
temperature (float): Sıcaklık değeri (0.0-1.0)
max_tokens (int): Maksimum token sayısı
Returns:
Dict[str, Any]: Yanıt bilgilerini içeren sözlük
"""
if not self.api_key:
return {"success": False, "error": "OpenRouter API anahtarı ayarlanmamış.", "content": ""}
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data
)
if response.status_code == 200:
response_data = response.json()
return {
"success": True,
"content": response_data["choices"][0]["message"]["content"],
"model": model,
"usage": response_data.get("usage", {})
}
else:
return {
"success": False,
"error": f"HTTP {response.status_code}: {response.text}",
"content": ""
}
except Exception as e:
return {"success": False, "error": str(e), "content": ""}
# Arka planda model listelerini güncelleyen fonksiyon
def background_model_updater():
"""
Arka planda çalışarak model listelerini periyodik olarak günceller.
"""
while True:
try:
# Her sağlayıcı için modelleri güncelle
if openai_handler.api_key:
openai_handler.get_available_models(force_refresh=True)
if gemini_handler.api_key:
gemini_handler.get_available_models(force_refresh=True)
if openrouter_handler.api_key:
openrouter_handler.get_available_models(force_refresh=True)
print("Model listeleri arka planda güncellendi")
# 1 saat bekle
time.sleep(3600)
except Exception as e:
print(f"Arka plan model güncelleyicisinde hata: {str(e)}")
time.sleep(60) # Hata durumunda 1 dakika bekle ve tekrar dene
# API işleyicilerini oluştur
api_manager = APIManager()
openai_handler = OpenAIHandler()
gemini_handler = GeminiHandler()
openrouter_handler = OpenRouterHandler()
# API anahtarlarını ayarla (varsa)
openai_api_key = os.getenv("OPENAI_API_KEY", "")
gemini_api_key = os.getenv("GEMINI_API_KEY", "")
openrouter_api_key = os.getenv("OPENROUTER_API_KEY", "")
if openai_api_key:
openai_handler.set_api_key(openai_api_key)
if gemini_api_key:
gemini_handler.set_api_key(gemini_api_key)
if openrouter_api_key:
openrouter_handler.set_api_key(openrouter_api_key)
# Arka plan model güncelleyicisini başlat
updater_thread = threading.Thread(target=background_model_updater, daemon=True)
updater_thread.start()
# Test fonksiyonu
def test_api_connections():
"""
API bağlantılarını test eder.
"""
print("API Bağlantı Testi:")
# OpenAI API testi
if openai_api_key:
print("\nOpenAI API Testi:")
try:
models = openai_handler.get_available_models(force_refresh=True)
print(f"Kullanılabilir modeller: {models[:5]}...")
print("OpenAI API bağlantısı başarılı.")
except Exception as e:
print(f"OpenAI API hatası: {str(e)}")
else:
print("\nOpenAI API anahtarı ayarlanmamış.")
print(f"Varsayılan modeller: {OPENAI_MODELS}")
# Gemini API testi
if gemini_api_key:
print("\nGemini API Testi:")
try:
models = gemini_handler.get_available_models(force_refresh=True)
print(f"Kullanılabilir modeller: {models}")
print("Gemini API bağlantısı başarılı.")
except Exception as e:
print(f"Gemini API hatası: {str(e)}")
else:
print("\nGemini API anahtarı ayarlanmamış.")
print(f"Varsayılan modeller: {GEMINI_MODELS}")
# OpenRouter API testi
if openrouter_api_key:
print("\nOpenRouter API Testi:")
try:
models = openrouter_handler.get_available_models(force_refresh=True)
print(f"Kullanılabilir modeller: {len(models)} model bulundu")
print(f"İlk 10 model: {models[:10]}...")
print("OpenRouter API bağlantısı başarılı.")
except Exception as e:
print(f"OpenRouter API hatası: {str(e)}")
else:
print("\nOpenRouter API anahtarı ayarlanmamış.")
print(f"Varsayılan modeller: {len(OPENROUTER_MODELS)} model bulundu")
print(f"İlk 10 model: {OPENROUTER_MODELS[:10]}...")
if __name__ == "__main__":
test_api_connections()