Hopsakee's picture
Upload folder using huggingface_hub
5fe3652 verified
import streamlit as st
import os
import pyperclip
from pathlib import Path
from src.fabrics_processor.database import initialize_qdrant_database
from src.fabrics_processor.database_updater import update_qdrant_database
from src.fabrics_processor.file_change_detector import detect_file_changes
from src.search_qdrant.database_query import query_qdrant_database
from src.fabrics_processor.obsidian2fabric import sync_folders
from src.fabrics_processor.logger import setup_logger
import logging
import atexit
from src.fabrics_processor.config import config
from src.fabrics_processor.deduplicator import remove_duplicates
# Configure logging
logger = setup_logger()
def init_session_state():
"""Initialize session state variables."""
if 'client' not in st.session_state:
client = initialize_qdrant_database(api_key=st.secrets["api_key"])
st.session_state.client = client
# Register cleanup function
atexit.register(lambda: client.close() if hasattr(client, '_transport') else None)
if 'selected_prompts' not in st.session_state:
st.session_state.selected_prompts = []
if 'comparing' not in st.session_state:
st.session_state.comparing = False
if 'comparison_selected' not in st.session_state:
st.session_state.comparison_selected = None
def show_comparison_view(prompts):
"""Show a full-width comparison view of the selected prompts."""
st.write("## Compare Selected Prompts")
# Add the back button at the top
if st.button("Back to search"):
st.session_state.comparing = False
st.rerun()
# Create columns for each prompt
cols = st.columns(len(prompts))
# Track which prompt is selected for copying
selected_idx = None
for idx, (col, prompt) in enumerate(zip(cols, prompts)):
with col:
st.markdown(f"### {prompt.metadata['filename']}")
# Create two columns for trigger and button
trigger_col, button_col = st.columns([0.7, 0.3])
with trigger_col:
# Add trigger field
current_trigger = prompt.metadata.get('trigger', '')
new_trigger = st.text_input("Trigger",
value=current_trigger,
key=f"trigger_{idx}")
# Update trigger if changed
if new_trigger != current_trigger:
try:
st.session_state.client.set_payload(
collection_name=config.embedding.collection_name,
payload={"trigger": new_trigger},
points=[prompt.id]
)
st.success(f"Updated trigger to: {new_trigger}")
except Exception as e:
st.error(f"Failed to update trigger: {str(e)}")
with button_col:
# Align button with text input using empty space
st.write("") # This creates some vertical space
if st.button(f"Use this prompt", key=f"compare_use_{idx}"):
selected_idx = idx
# Display content as markdown
st.markdown("### Content")
st.markdown(prompt.metadata["content"])
# Handle selection
if selected_idx is not None:
pyperclip.copy(prompts[selected_idx].metadata['content'])
st.success(f"Copied {prompts[selected_idx].metadata['filename']} to clipboard!")
# Clear comparison view
st.session_state.comparing = False
st.rerun()
def search_interface():
"""Show the search interface."""
if st.session_state.comparing:
show_comparison_view(st.session_state.selected_prompts)
return
st.subheader("Search for prompts")
query = st.text_area("What are you trying to accomplish? I will then search for good prompts to give you a good start.")
if query:
try:
results = query_qdrant_database(
query=query,
client=st.session_state.client,
num_results=5,
collection_name=config.embedding.collection_name
)
if results:
st.write("Which prompts would you like to investigate? Max 3.")
# Create checkboxes for selection
selected = []
for r in results:
if st.checkbox(f"{r.metadata['filename']}", key=f"select_{r.id}"):
selected.append(r)
st.session_state.selected_prompts = selected
if selected:
col1, col2 = st.columns(2)
with col1:
if st.button("Use: copy to clipboard"):
if len(selected) == 1:
pyperclip.copy(selected[0].metadata['content'])
st.success("Copied to clipboard!")
with col2:
if len(selected) > 1 and st.button("Compare"):
st.session_state.comparing = True
st.rerun()
except Exception as e:
logger.error(f"Error in search_interface: {e}", exc_info=True)
st.error(f"Error searching database: {e}")
def update_database():
"""Update the markdown folder with prompt files from Obsidian.
Then update the Qdrant database.
Finally based on the Qdrant database create a new espanso YAML file and
the Obsidian Textgenerator markdown files."""
try:
with st.spinner("Processing markdown files..."):
# First check if there are any changes in the prompt files in Obsidian.
# If so, add them to the markdown folder before updating the database.
sync_folders(source_dir=Path(config.obsidian_input_folder), target_dir=Path(config.fabric_patterns_folder))
# Get current collection info
collection_info = st.session_state.client.get_collection(config.embedding.collection_name)
initial_points = collection_info.points_count
# Detect file changes
new_files, modified_files, deleted_files = detect_file_changes(
client=st.session_state.client,
fabric_patterns_folder=config.fabric_patterns_folder
)
# Update the database if there are changes
if any([new_files, modified_files, deleted_files]):
st.info("Changes detected. Updating database...")
update_qdrant_database(
client=st.session_state.client,
collection_name=config.embedding.collection_name,
new_files=new_files,
modified_files=modified_files,
deleted_files=deleted_files
)
else:
st.info("No changes detected in input folders.")
# Create a separate section for deduplication - ALWAYS run this regardless of file changes
st.subheader("Deduplication Process")
with st.spinner("Checking for and removing duplicate entries..."):
# Run the deduplication process
duplicates_removed = remove_duplicates(
client=st.session_state.client,
collection_name=config.embedding.collection_name
)
if duplicates_removed > 0:
st.success(f"Successfully removed {duplicates_removed} duplicate entries from the database")
else:
st.info("No duplicate entries found in the database")
# Get updated collection info
collection_info = st.session_state.client.get_collection(config.embedding.collection_name)
final_points = collection_info.points_count
# Show summary
st.success(f"""
Database update completed successfully!
Changes detected:
- {len(new_files)} new files
- {len(modified_files)} modified files
- {len(deleted_files)} deleted files
- {duplicates_removed} duplicate entries removed
Database entries:
- Initial: {initial_points}
- Final: {final_points}
""")
except Exception as e:
logger.error(f"Error updating database: {e}", exc_info=True)
st.error(f"Error updating database: {e}")
def display_trigger_table():
"""Display the trigger table in the sidebar."""
with st.sidebar:
# Add some space to push the table to the bottom
st.markdown("<br>" * 10, unsafe_allow_html=True)
# Create the table
st.markdown("""
| trigger | description |
|---------|-------------|
| ;;c | code |
| ;;s | summarize and extract |
| ;;t | think |
""")
def main():
st.set_page_config(
page_title="Fabric to Espanso Prompt Manager",
layout="wide")
init_session_state()
# Sidebar
with st.sidebar:
# Add logo to sidebar
image_path = Path(__file__).parent.parent.parent / "data" / "Fab2Esp_transparent.png"
st.image(str(image_path), width=200, use_container_width=False)
st.title("Prompt Manager")
page = st.radio("Select Option:", ["Search for prompts", "Update database and prompt files"])
if st.button("Quit"):
if hasattr(st.session_state.client, '_transport'):
st.session_state.client.close()
st.success("Database connection closed.")
st.stop()
# Main content
if page == "Search for prompts":
search_interface()
else:
st.subheader("Update Database")
if st.button("Start Update"):
update_database()
# Add the trigger table at the end
display_trigger_table()
# Add credits at the bottom left
st.markdown("""
<style>
.credits {
position: fixed;
left: 1rem;
bottom: 1rem;
font-size: 0.8rem;
color: #666;
max-width: 600px;
}
</style>
<div class="credits">
This tool searches the great list of prompts available at <a href="https://github.com/danielmiessler/fabric">https://github.com/danielmiessler/fabric</a>.
A great commandline utilty build by Daniel Miessler to make the use of LLM more frictionless.<br>
All credits to him and his fellow fabric builders.
</div>
""", unsafe_allow_html=True)
if __name__ == "__main__":
main()