Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import time | |
import requests | |
import time | |
import os | |
import zipfile | |
import json | |
import streamlit as st | |
def fetch_code(zip_file): | |
zip_content_dict = {} | |
for file_name in zip_file.namelist(): | |
if ((file_name.lower().endswith(".py") | (file_name.lower().endswith(".ipynb")) | (file_name.lower().endswith(".md")))): | |
file_content = zip_file.open(file_name).read().decode("utf-8") | |
zip_content_dict[file_name] = file_content | |
return zip_content_dict | |
def get_api_link(url): | |
username, repo_name = decompose_url(url) | |
if (username == None): | |
return "" | |
return f"https://api.github.com/repos/{username}/{repo_name}/zipball/" | |
def decompose_url(url): | |
try: | |
url = url.split("github.com")[1] | |
url = url.strip(".") | |
url = url.split(".git")[0] | |
url = url.strip("/") | |
parts = url.split("/") | |
username = parts[0] | |
repo_name = parts[1] | |
return username, repo_name | |
except: | |
return None, None | |
def fetch_repo_stars(verbose, repo_url, token): | |
headers = {"Authorization": f"token {token}"} | |
api_url = get_api_link(repo_url) | |
api_url = api_url.replace("/zipball/", "") | |
# Sending GET request to GitHub API | |
response = requests.get(api_url, headers=headers) | |
if response.status_code == 200: | |
return json.loads(response.content)["stargazers_count"] | |
if (response.status_code == 404): | |
log(verbose, "ERROR", "Repository private.") | |
def fetch_repo(verbose, repo_url, repo_name, token): | |
if (os.path.exists(repo_name)): | |
os.remove(repo_name) | |
if ("github.com" not in repo_url): | |
log(verbose, "ERROR", f"URL not for github repo, please evaluate manually ({repo_url}).") | |
return | |
headers = {"Authorization": f"token {token}"} | |
api_url = get_api_link(repo_url) | |
if (api_url == ""): | |
log(verbose, "ERROR", f"Failed to parse the URL, please evaluate manually ({repo_url}).") | |
return | |
# Sending GET request to GitHub API | |
response = requests.get(api_url, headers=headers) | |
if response.status_code == 200: | |
with open(repo_name, 'wb') as file: | |
file.write(response.content) | |
if (response.status_code == 404): | |
log(verbose, "ERROR", "Repository private / Link broken.") | |
def fetch_readme(zip): | |
readme_files = [readme for readme in zip.namelist() if ((readme.endswith("README.MD") | readme.endswith("README.md") | readme.endswith("readme.md")) & (len(readme.split("/")) == 2))] | |
readme = "" | |
for readme_file in readme_files: | |
readme += zip.open(readme_file).read().decode("utf-8") + "\n\n" | |
return readme | |
def fetch_license(zip): | |
license_files = [license for license in zip.namelist() if (("LICENSE" in license) & (len(license.split("/")) == 2))] | |
license = None | |
if (len(license_files) > 0): | |
license = zip.open(license_files[0]).read().decode("utf-8") | |
return license | |
def fetch_openalex(verbose, paper_name, year): | |
api_url = f"https://api.openalex.org/works?filter=default.search:{paper_name},publication_year:{year}" | |
response = requests.get(api_url) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
log(verbose, "WARNING", "Could not find OpenAlex information for paper.") | |
def log(verbose, log_type, log_text, hf=False): | |
if (verbose == 0): | |
return | |
show_tips = (verbose == 2) | (verbose == 4) | |
if ((verbose == 1) | (verbose == 2)): | |
show = print | |
if ((verbose == 3) | (verbose == 4)): | |
show = st.write | |
# Align line-break | |
if (log_text.startswith("\n")): | |
show("\n") | |
log_text = log_text.lstrip('\n') | |
# Only show tips in verbose mode 2 and 4 | |
if ((log_type == "TITLE") & show_tips): | |
show(f"\n#### {log_text}") | |
if ((log_type == "TIP") & show_tips): | |
show(f"*{log_text}*") | |
if ((log_type == "LOG") & show_tips): | |
show(f"{log_text}") | |
if ((log_type == "ERROR")): | |
show(f"**{log_text}**") | |
if ((log_type != "TIP") & (log_type != "LOG") & (log_type != "ERROR") & (log_type != "TITLE")): | |
raise ValueError("Invalid log type. Use 'TIP', 'LOG', 'TITLE' or 'ERROR'.") | |
def init_llm(verbose): | |
log(verbose, "LOG", "Initializing LLM...") |