attilasimko's picture
udates
18fe2e4
raw
history blame
3.97 kB
import time
import requests
import time
import os
import json
import streamlit as st
def model_predict(client, prompt):
for message in client.chat_completion(
messages=[{"role": "system", "content": "You are a chatbot evaluating github repositories, their python codes and corresponding readme files. Strictly answer the questions with Yes or No."}, {"role": "user", "content": prompt}],
max_tokens=500,
stream=True,
):
return message.choices[0].delta.content
return ""
def get_api_link(url):
username, repo_name = decompose_url(url)
if (username == None):
return ""
return f"https://api.github.com/repos/{username}/{repo_name}/zipball/"
def decompose_url(url):
try:
url = url.split("github.com")[1]
url = url.strip(".")
url = url.split(".git")[0]
url = url.strip("/")
parts = url.split("/")
username = parts[0]
repo_name = parts[1]
return username, repo_name
except:
return None, None
def fetch_repo_stars(verbose, repo_url, token):
headers = {"Authorization": f"token {token}"}
api_url = get_api_link(repo_url)
api_url = api_url.replace("/zipball/", "")
# Sending GET request to GitHub API
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return json.loads(response.content)["stargazers_count"]
if (response.status_code == 404):
log(verbose, "ERROR", "Repository private.")
def fetch_repo(verbose, repo_url, repo_name, token):
if (os.path.exists(repo_name)):
os.remove(repo_name)
if ("github.com" not in repo_url):
log(verbose, "ERROR", f"URL not for github repo, please evaluate manually ({repo_url}).")
return
headers = {"Authorization": f"token {token}"}
api_url = get_api_link(repo_url)
if (api_url == ""):
log(verbose, "ERROR", f"Failed to parse the URL, please evaluate manually ({repo_url}).")
return
# Sending GET request to GitHub API
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
with open(repo_name, 'wb') as file:
file.write(response.content)
log(verbose, "LOG", "Repository downloaded successfully")
if (response.status_code == 404):
log(verbose, "ERROR", "Repository private.")
def fetch_readme(zip):
readme_files = [readme for readme in zip.namelist() if ((readme.endswith("README.MD") | readme.endswith("README.md") | readme.endswith("readme.md")) & (len(readme.split("/")) == 2))]
readme = ""
for readme_file in readme_files:
readme += zip.open(readme_file).read().decode("utf-8") + "\n\n"
return readme
def fetch_license(zip):
license_files = [license for license in zip.namelist() if (("LICENSE" in license) & (len(license.split("/")) == 2))]
license = None
if (len(license_files) > 0):
license = zip.open(license_files[0]).read().decode("utf-8")
return license
def fetch_openalex(verbose, paper_name, year):
api_url = f"https://api.openalex.org/works?filter=default.search:{paper_name},publication_year:{year}"
response = requests.get(api_url)
if response.status_code == 200:
return response.json()
else:
log(verbose, "WARNING", "Could not find OpenAlex information for paper.")
def log(verbose, log_type, log_text, hf=False):
if (verbose == 0):
return
# Align line-break
if (log_text.startswith("\n")):
print("\n")
log_text = log_text.lstrip('\n')
if (log_type == "LOG"):
log_text = f"LOG: {log_text}"
if (log_type == "ERROR"):
log_text = f"ERROR: {log_text}"
if (log_type == "WARNING"):
log_text = f"WARNING: {log_text}"
if (verbose == 1):
print(log_text)
return
if (verbose == 2):
st.write(log_text)
return
raise Exception(log_text)