Spaces:
Sleeping
Sleeping
import time | |
import requests | |
import time | |
import os | |
import json | |
import streamlit as st | |
def model_predict(client, prompt): | |
for message in client.chat_completion( | |
messages=[{"role": "system", "content": "You are a chatbot evaluating github repositories, their python codes and corresponding readme files. Strictly answer the questions with Yes or No."}, {"role": "user", "content": prompt}], | |
max_tokens=500, | |
stream=True, | |
): | |
return message.choices[0].delta.content | |
return "" | |
def get_api_link(url): | |
username, repo_name = decompose_url(url) | |
if (username == None): | |
return "" | |
return f"https://api.github.com/repos/{username}/{repo_name}/zipball/" | |
def decompose_url(url): | |
try: | |
url = url.split("github.com")[1] | |
url = url.strip(".") | |
url = url.split(".git")[0] | |
url = url.strip("/") | |
parts = url.split("/") | |
username = parts[0] | |
repo_name = parts[1] | |
return username, repo_name | |
except: | |
return None, None | |
def fetch_repo_stars(verbose, repo_url, token): | |
headers = {"Authorization": f"token {token}"} | |
api_url = get_api_link(repo_url) | |
api_url = api_url.replace("/zipball/", "") | |
# Sending GET request to GitHub API | |
response = requests.get(api_url, headers=headers) | |
if response.status_code == 200: | |
return json.loads(response.content)["stargazers_count"] | |
if (response.status_code == 404): | |
log(verbose, "ERROR", "Repository private.") | |
def fetch_repo(verbose, repo_url, repo_name, token): | |
if (os.path.exists(repo_name)): | |
os.remove(repo_name) | |
if ("github.com" not in repo_url): | |
log(verbose, "ERROR", f"URL not for github repo, please evaluate manually ({repo_url}).") | |
return | |
headers = {"Authorization": f"token {token}"} | |
api_url = get_api_link(repo_url) | |
if (api_url == ""): | |
log(verbose, "ERROR", f"Failed to parse the URL, please evaluate manually ({repo_url}).") | |
return | |
# Sending GET request to GitHub API | |
response = requests.get(api_url, headers=headers) | |
if response.status_code == 200: | |
with open(repo_name, 'wb') as file: | |
file.write(response.content) | |
log(verbose, "LOG", "Repository downloaded successfully") | |
if (response.status_code == 404): | |
log(verbose, "ERROR", "Repository private.") | |
def fetch_readme(zip): | |
readme_files = [readme for readme in zip.namelist() if ((readme.endswith("README.MD") | readme.endswith("README.md") | readme.endswith("readme.md")) & (len(readme.split("/")) == 2))] | |
readme = "" | |
for readme_file in readme_files: | |
readme += zip.open(readme_file).read().decode("utf-8") + "\n\n" | |
return readme | |
def fetch_license(zip): | |
license_files = [license for license in zip.namelist() if (("LICENSE" in license) & (len(license.split("/")) == 2))] | |
license = None | |
if (len(license_files) > 0): | |
license = zip.open(license_files[0]).read().decode("utf-8") | |
return license | |
def fetch_openalex(verbose, paper_name, year): | |
api_url = f"https://api.openalex.org/works?filter=default.search:{paper_name},publication_year:{year}" | |
response = requests.get(api_url) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
log(verbose, "WARNING", "Could not find OpenAlex information for paper.") | |
def log(verbose, log_type, log_text, hf=False): | |
if (verbose == 0): | |
return | |
# Align line-break | |
if (log_text.startswith("\n")): | |
print("\n") | |
log_text = log_text.lstrip('\n') | |
if (log_type == "LOG"): | |
log_text = f"LOG: {log_text}" | |
if (log_type == "ERROR"): | |
log_text = f"ERROR: {log_text}" | |
if (log_type == "WARNING"): | |
log_text = f"WARNING: {log_text}" | |
if (verbose == 1): | |
print(log_text) | |
return | |
if (verbose == 2): | |
st.write(log_text) | |
return | |
raise Exception(log_text) |