Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import asyncio | |
import contextlib | |
import enum | |
import logging | |
from functools import partial | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
Generator, | |
Iterable, | |
List, | |
Optional, | |
Tuple, | |
Type, | |
) | |
import numpy as np | |
import sqlalchemy | |
from langchain.docstore.document import Document | |
from langchain.schema.embeddings import Embeddings | |
from langchain.utils import get_from_dict_or_env | |
from langchain.vectorstores.base import VectorStore | |
from langchain.vectorstores.pgvector import BaseModel | |
from langchain.vectorstores.utils import maximal_marginal_relevance | |
from pgvector.sqlalchemy import Vector | |
from sqlalchemy import delete | |
from sqlalchemy.orm import Session, declarative_base, relationship | |
class DistanceStrategy(str, enum.Enum): | |
"""Enumerator of the Distance strategies.""" | |
EUCLIDEAN = "l2" | |
COSINE = "cosine" | |
MAX_INNER_PRODUCT = "inner" | |
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE | |
Base = declarative_base() # type: Any | |
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain" | |
def _results_to_docs(docs_and_scores: Any) -> List[Document]: | |
"""Return docs from docs and scores.""" | |
return [doc for doc, _ in docs_and_scores] | |
class Article(Base): | |
"""Embedding store.""" | |
__tablename__ = "article" | |
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False) | |
title = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
abstract = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
embedding: Vector = sqlalchemy.Column("abstract_embedding", Vector(None)) | |
doi = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
class CustomPGVector(VectorStore): | |
"""`Postgres`/`PGVector` vector store. | |
To use, you should have the ``pgvector`` python package installed. | |
Args: | |
connection_string: Postgres connection string. | |
embedding_function: Any embedding function implementing | |
`langchain.embeddings.base.Embeddings` interface. | |
table_name: The name of the collection to use. (default: langchain) | |
NOTE: This is not the name of the table, but the name of the collection. | |
The tables will be created when initializing the store (if not exists) | |
So, make sure the user has the right permissions to create tables. | |
distance_strategy: The distance strategy to use. (default: COSINE) | |
pre_delete_collection: If True, will delete the collection if it exists. | |
(default: False). Useful for testing. | |
Example: | |
.. code-block:: python | |
from langchain.vectorstores import PGVector | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
CONNECTION_STRING = "postgresql+psycopg2://hwc@localhost:5432/test3" | |
COLLECTION_NAME = "state_of_the_union_test" | |
embeddings = OpenAIEmbeddings() | |
vectorestore = PGVector.from_documents( | |
embedding=embeddings, | |
documents=docs, | |
table_name=COLLECTION_NAME, | |
connection_string=CONNECTION_STRING, | |
) | |
""" | |
def __init__( | |
self, | |
connection_string: str, | |
embedding_function: Embeddings, | |
table_name: str, | |
column_name: str, | |
collection_metadata: Optional[dict] = None, | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
pre_delete_collection: bool = False, | |
logger: Optional[logging.Logger] = None, | |
relevance_score_fn: Optional[Callable[[float], float]] = None, | |
) -> None: | |
self.connection_string = connection_string | |
self.embedding_function = embedding_function | |
self.table_name = table_name | |
self.column_name = column_name | |
self.collection_metadata = collection_metadata | |
self._distance_strategy = distance_strategy | |
self.pre_delete_collection = pre_delete_collection | |
self.logger = logger or logging.getLogger(__name__) | |
self.override_relevance_score_fn = relevance_score_fn | |
self.__post_init__() | |
def __post_init__( | |
self, | |
) -> None: | |
""" | |
Initialize the store. | |
""" | |
self._conn = self.connect() | |
self.create_vector_extension() | |
self.EmbeddingStore = Article | |
def embeddings(self) -> Embeddings: | |
return self.embedding_function | |
def connect(self) -> sqlalchemy.engine.Connection: | |
engine = sqlalchemy.create_engine(self.connection_string) | |
conn = engine.connect() | |
return conn | |
def create_vector_extension(self) -> None: | |
try: | |
with Session(self._conn) as session: | |
statement = sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector") | |
session.execute(statement) | |
session.commit() | |
except Exception as e: | |
self.logger.exception(e) | |
def drop_tables(self) -> None: | |
with self._conn.begin(): | |
Base.metadata.drop_all(self._conn) | |
def _make_session(self) -> Generator[Session, None, None]: | |
"""Create a context manager for the session, bind to _conn string.""" | |
yield Session(self._conn) | |
def delete( | |
self, | |
ids: Optional[List[str]] = None, | |
**kwargs: Any, | |
) -> None: | |
"""Delete vectors by ids. | |
Args: | |
ids: List of ids to delete. | |
""" | |
with Session(self._conn) as session: | |
if ids is not None: | |
self.logger.debug( | |
"Trying to delete vectors by ids (represented by the model " | |
"using the custom ids field)" | |
) | |
stmt = delete(self.EmbeddingStore).where( | |
self.EmbeddingStore.custom_id.in_(ids) | |
) | |
session.execute(stmt) | |
session.commit() | |
def __from( | |
cls, | |
texts: List[str], | |
embeddings: List[List[float]], | |
embedding: Embeddings, | |
metadatas: Optional[List[dict]] = None, | |
ids: Optional[List[str]] = None, | |
table_name: str = "article", | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
connection_string: Optional[str] = None, | |
pre_delete_collection: bool = False, | |
**kwargs: Any, | |
) -> CustomPGVector: | |
if not metadatas: | |
metadatas = [{} for _ in texts] | |
if connection_string is None: | |
connection_string = cls.get_connection_string(kwargs) | |
store = cls( | |
connection_string=connection_string, | |
table_name=table_name, | |
embedding_function=embedding, | |
distance_strategy=distance_strategy, | |
pre_delete_collection=pre_delete_collection, | |
**kwargs, | |
) | |
store.add_embeddings( | |
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs | |
) | |
return store | |
def add_embeddings( | |
self, | |
texts: Iterable[str], | |
embeddings: List[List[float]], | |
metadatas: Optional[List[dict]] = None, | |
ids: Optional[List[str]] = None, | |
**kwargs: Any, | |
) -> List[str]: | |
"""Add embeddings to the vectorstore. | |
Args: | |
texts: Iterable of strings to add to the vectorstore. | |
embeddings: List of list of embedding vectors. | |
metadatas: List of metadatas associated with the texts. | |
kwargs: vectorstore specific parameters | |
""" | |
if not metadatas: | |
metadatas = [{} for _ in texts] | |
with Session(self._conn) as session: | |
# collection = self.get_collection(session) | |
# if not collection: | |
# raise ValueError("Collection not found") | |
for text, metadata, embedding, id in zip(texts, metadatas, embeddings, ids): | |
embedding_store = self.EmbeddingStore( | |
embedding=embedding, | |
document=text, | |
cmetadata=metadata, | |
custom_id=id, | |
) | |
session.add(embedding_store) | |
session.commit() | |
return ids | |
def add_texts( | |
self, | |
texts: Iterable[str], | |
metadatas: Optional[List[dict]] = None, | |
ids: Optional[List[str]] = None, | |
**kwargs: Any, | |
) -> List[str]: | |
"""Run more texts through the embeddings and add to the vectorstore. | |
Args: | |
texts: Iterable of strings to add to the vectorstore. | |
metadatas: Optional list of metadatas associated with the texts. | |
kwargs: vectorstore specific parameters | |
Returns: | |
List of ids from adding the texts into the vectorstore. | |
""" | |
embeddings = self.embedding_function.embed_documents(list(texts)) | |
return self.add_embeddings( | |
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs | |
) | |
def similarity_search( | |
self, | |
query: str, | |
k: int = 4, | |
filter: Optional[dict] = None, | |
**kwargs: Any, | |
) -> List[Document]: | |
"""Run similarity search with PGVector with distance. | |
Args: | |
query (str): Query text to search for. | |
k (int): Number of results to return. Defaults to 4. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List of Documents most similar to the query. | |
""" | |
embedding = self.embedding_function.embed_query(text=query) | |
return self.similarity_search_by_vector( | |
embedding=embedding, | |
k=k, | |
filter=filter, | |
) | |
def similarity_search_with_score( | |
self, | |
query: str, | |
k: int = 4, | |
filter: Optional[dict] = None, | |
) -> List[Tuple[Document, float]]: | |
"""Return docs most similar to query. | |
Args: | |
query: Text to look up documents similar to. | |
k: Number of Documents to return. Defaults to 4. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List of Documents most similar to the query and score for each. | |
""" | |
embedding = self.embedding_function.embed_query(query) | |
docs = self.similarity_search_with_score_by_vector( | |
embedding=embedding, k=k, filter=filter | |
) | |
return docs | |
def distance_strategy(self) -> Any: | |
if self._distance_strategy == DistanceStrategy.EUCLIDEAN: | |
return self.EmbeddingStore.embedding.l2_distance | |
elif self._distance_strategy == DistanceStrategy.COSINE: | |
return self.EmbeddingStore.embedding.cosine_distance | |
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: | |
return self.EmbeddingStore.embedding.max_inner_product | |
else: | |
raise ValueError( | |
f"Got unexpected value for distance: {self._distance_strategy}. " | |
f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}." | |
) | |
def similarity_search_with_score_by_vector( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
filter: Optional[dict] = None, | |
) -> List[Tuple[Document, float]]: | |
results = self.__query_collection(embedding=embedding, k=k, filter=filter) | |
return self._results_to_docs_and_scores(results) | |
def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]: | |
"""Return docs and scores from results.""" | |
docs = [ | |
( | |
Document( | |
page_content=result.Article.abstract, | |
metadata={ | |
"id": result.Article.id, | |
"title": result.Article.title, | |
"doi": result.Article.doi, | |
}, | |
), | |
result.distance if self.embedding_function is not None else None, | |
) | |
for result in results | |
] | |
return docs | |
def __query_collection( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
filter: Optional[Dict[str, str]] = None, | |
) -> List[Any]: | |
"""Query the collection.""" | |
with Session(self._conn) as session: | |
results: List[Any] = ( | |
session.query( | |
self.EmbeddingStore, | |
self.distance_strategy(embedding).label("distance"), # type: ignore | |
) | |
.order_by(sqlalchemy.asc("distance")) | |
.limit(k) | |
.all() | |
) | |
print(results) | |
return results | |
def similarity_search_by_vector( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
filter: Optional[dict] = None, | |
**kwargs: Any, | |
) -> List[Document]: | |
"""Return docs most similar to embedding vector. | |
Args: | |
embedding: Embedding to look up documents similar to. | |
k: Number of Documents to return. Defaults to 4. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List of Documents most similar to the query vector. | |
""" | |
docs_and_scores = self.similarity_search_with_score_by_vector( | |
embedding=embedding, k=k, filter=filter | |
) | |
return _results_to_docs(docs_and_scores) | |
def from_texts( | |
cls: Type[PGVector], | |
texts: List[str], | |
embedding: Embeddings, | |
metadatas: Optional[List[dict]] = None, | |
table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
ids: Optional[List[str]] = None, | |
pre_delete_collection: bool = False, | |
**kwargs: Any, | |
) -> PGVector: | |
""" | |
Return VectorStore initialized from texts and embeddings. | |
Postgres connection string is required | |
"Either pass it as a parameter | |
or set the PGVECTOR_CONNECTION_STRING environment variable. | |
""" | |
embeddings = embedding.embed_documents(list(texts)) | |
return cls.__from( | |
texts, | |
embeddings, | |
embedding, | |
metadatas=metadatas, | |
ids=ids, | |
table_name=table_name, | |
distance_strategy=distance_strategy, | |
pre_delete_collection=pre_delete_collection, | |
**kwargs, | |
) | |
def from_embeddings( | |
cls, | |
text_embeddings: List[Tuple[str, List[float]]], | |
embedding: Embeddings, | |
metadatas: Optional[List[dict]] = None, | |
table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
ids: Optional[List[str]] = None, | |
pre_delete_collection: bool = False, | |
**kwargs: Any, | |
) -> PGVector: | |
"""Construct PGVector wrapper from raw documents and pre- | |
generated embeddings. | |
Return VectorStore initialized from documents and embeddings. | |
Postgres connection string is required | |
"Either pass it as a parameter | |
or set the PGVECTOR_CONNECTION_STRING environment variable. | |
Example: | |
.. code-block:: python | |
from langchain.vectorstores import PGVector | |
from langchain.embeddings import OpenAIEmbeddings | |
embeddings = OpenAIEmbeddings() | |
text_embeddings = embeddings.embed_documents(texts) | |
text_embedding_pairs = list(zip(texts, text_embeddings)) | |
faiss = PGVector.from_embeddings(text_embedding_pairs, embeddings) | |
""" | |
texts = [t[0] for t in text_embeddings] | |
embeddings = [t[1] for t in text_embeddings] | |
return cls.__from( | |
texts, | |
embeddings, | |
embedding, | |
metadatas=metadatas, | |
ids=ids, | |
table_name=table_name, | |
distance_strategy=distance_strategy, | |
pre_delete_collection=pre_delete_collection, | |
**kwargs, | |
) | |
def from_existing_index( | |
cls: Type[PGVector], | |
embedding: Embeddings, | |
table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
pre_delete_collection: bool = False, | |
**kwargs: Any, | |
) -> PGVector: | |
""" | |
Get intsance of an existing PGVector store.This method will | |
return the instance of the store without inserting any new | |
embeddings | |
""" | |
connection_string = cls.get_connection_string(kwargs) | |
store = cls( | |
connection_string=connection_string, | |
table_name=table_name, | |
embedding_function=embedding, | |
distance_strategy=distance_strategy, | |
pre_delete_collection=pre_delete_collection, | |
) | |
return store | |
def get_connection_string(cls, kwargs: Dict[str, Any]) -> str: | |
connection_string: str = get_from_dict_or_env( | |
data=kwargs, | |
key="connection_string", | |
env_key="PGVECTOR_CONNECTION_STRING", | |
) | |
if not connection_string: | |
raise ValueError( | |
"Postgres connection string is required" | |
"Either pass it as a parameter" | |
"or set the PGVECTOR_CONNECTION_STRING environment variable." | |
) | |
return connection_string | |
def from_documents( | |
cls: Type[CustomPGVector], | |
documents: List[Document], | |
embedding: Embeddings, | |
table_name: str = "article", | |
column_name: str = "embeding", | |
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
ids: Optional[List[str]] = None, | |
pre_delete_collection: bool = False, | |
**kwargs: Any, | |
) -> CustomPGVector: | |
""" | |
Return VectorStore initialized from documents and embeddings. | |
Postgres connection string is required | |
"Either pass it as a parameter | |
or set the PGVECTOR_CONNECTION_STRING environment variable. | |
""" | |
texts = [d.page_content for d in documents] | |
metadatas = [d.metadata for d in documents] | |
connection_string = cls.get_connection_string(kwargs) | |
kwargs["connection_string"] = connection_string | |
return cls.from_texts( | |
texts=texts, | |
pre_delete_collection=pre_delete_collection, | |
embedding=embedding, | |
distance_strategy=distance_strategy, | |
metadatas=metadatas, | |
ids=ids, | |
table_name=table_name, | |
column_name=column_name, | |
**kwargs, | |
) | |
def connection_string_from_db_params( | |
cls, | |
driver: str, | |
host: str, | |
port: int, | |
database: str, | |
user: str, | |
password: str, | |
) -> str: | |
"""Return connection string from database parameters.""" | |
return f"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}" | |
def _select_relevance_score_fn(self) -> Callable[[float], float]: | |
""" | |
The 'correct' relevance function | |
may differ depending on a few things, including: | |
- the distance / similarity metric used by the VectorStore | |
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!) | |
- embedding dimensionality | |
- etc. | |
""" | |
if self.override_relevance_score_fn is not None: | |
return self.override_relevance_score_fn | |
# Default strategy is to rely on distance strategy provided | |
# in vectorstore constructor | |
if self._distance_strategy == DistanceStrategy.COSINE: | |
return self._cosine_relevance_score_fn | |
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN: | |
return self._euclidean_relevance_score_fn | |
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: | |
return self._max_inner_product_relevance_score_fn | |
else: | |
raise ValueError( | |
"No supported normalization function" | |
f" for distance_strategy of {self._distance_strategy}." | |
"Consider providing relevance_score_fn to PGVector constructor." | |
) | |
def max_marginal_relevance_search_with_score_by_vector( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
fetch_k: int = 20, | |
lambda_mult: float = 0.5, | |
filter: Optional[Dict[str, str]] = None, | |
**kwargs: Any, | |
) -> List[Tuple[Document, float]]: | |
"""Return docs selected using the maximal marginal relevance with score | |
to embedding vector. | |
Maximal marginal relevance optimizes for similarity to query AND diversity | |
among selected documents. | |
Args: | |
embedding: Embedding to look up documents similar to. | |
k (int): Number of Documents to return. Defaults to 4. | |
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. | |
Defaults to 20. | |
lambda_mult (float): Number between 0 and 1 that determines the degree | |
of diversity among the results with 0 corresponding | |
to maximum diversity and 1 to minimum diversity. | |
Defaults to 0.5. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List[Tuple[Document, float]]: List of Documents selected by maximal marginal | |
relevance to the query and score for each. | |
""" | |
results = self.__query_collection(embedding=embedding, k=fetch_k, filter=filter) | |
embedding_list = [result.EmbeddingStore.embedding for result in results] | |
mmr_selected = maximal_marginal_relevance( | |
np.array(embedding, dtype=np.float32), | |
embedding_list, | |
k=k, | |
lambda_mult=lambda_mult, | |
) | |
candidates = self._results_to_docs_and_scores(results) | |
return [r for i, r in enumerate(candidates) if i in mmr_selected] | |
def max_marginal_relevance_search( | |
self, | |
query: str, | |
k: int = 4, | |
fetch_k: int = 20, | |
lambda_mult: float = 0.5, | |
filter: Optional[Dict[str, str]] = None, | |
**kwargs: Any, | |
) -> List[Document]: | |
"""Return docs selected using the maximal marginal relevance. | |
Maximal marginal relevance optimizes for similarity to query AND diversity | |
among selected documents. | |
Args: | |
query (str): Text to look up documents similar to. | |
k (int): Number of Documents to return. Defaults to 4. | |
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. | |
Defaults to 20. | |
lambda_mult (float): Number between 0 and 1 that determines the degree | |
of diversity among the results with 0 corresponding | |
to maximum diversity and 1 to minimum diversity. | |
Defaults to 0.5. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List[Document]: List of Documents selected by maximal marginal relevance. | |
""" | |
embedding = self.embedding_function.embed_query(query) | |
return self.max_marginal_relevance_search_by_vector( | |
embedding, | |
k=k, | |
fetch_k=fetch_k, | |
lambda_mult=lambda_mult, | |
**kwargs, | |
) | |
def max_marginal_relevance_search_with_score( | |
self, | |
query: str, | |
k: int = 4, | |
fetch_k: int = 20, | |
lambda_mult: float = 0.5, | |
filter: Optional[dict] = None, | |
**kwargs: Any, | |
) -> List[Tuple[Document, float]]: | |
"""Return docs selected using the maximal marginal relevance with score. | |
Maximal marginal relevance optimizes for similarity to query AND diversity | |
among selected documents. | |
Args: | |
query (str): Text to look up documents similar to. | |
k (int): Number of Documents to return. Defaults to 4. | |
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. | |
Defaults to 20. | |
lambda_mult (float): Number between 0 and 1 that determines the degree | |
of diversity among the results with 0 corresponding | |
to maximum diversity and 1 to minimum diversity. | |
Defaults to 0.5. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List[Tuple[Document, float]]: List of Documents selected by maximal marginal | |
relevance to the query and score for each. | |
""" | |
embedding = self.embedding_function.embed_query(query) | |
docs = self.max_marginal_relevance_search_with_score_by_vector( | |
embedding=embedding, | |
k=k, | |
fetch_k=fetch_k, | |
lambda_mult=lambda_mult, | |
filter=filter, | |
**kwargs, | |
) | |
return docs | |
def max_marginal_relevance_search_by_vector( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
fetch_k: int = 20, | |
lambda_mult: float = 0.5, | |
filter: Optional[Dict[str, str]] = None, | |
**kwargs: Any, | |
) -> List[Document]: | |
"""Return docs selected using the maximal marginal relevance | |
to embedding vector. | |
Maximal marginal relevance optimizes for similarity to query AND diversity | |
among selected documents. | |
Args: | |
embedding (str): Text to look up documents similar to. | |
k (int): Number of Documents to return. Defaults to 4. | |
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm. | |
Defaults to 20. | |
lambda_mult (float): Number between 0 and 1 that determines the degree | |
of diversity among the results with 0 corresponding | |
to maximum diversity and 1 to minimum diversity. | |
Defaults to 0.5. | |
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
Returns: | |
List[Document]: List of Documents selected by maximal marginal relevance. | |
""" | |
docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( | |
embedding, | |
k=k, | |
fetch_k=fetch_k, | |
lambda_mult=lambda_mult, | |
filter=filter, | |
**kwargs, | |
) | |
return _results_to_docs(docs_and_scores) | |
async def amax_marginal_relevance_search_by_vector( | |
self, | |
embedding: List[float], | |
k: int = 4, | |
fetch_k: int = 20, | |
lambda_mult: float = 0.5, | |
filter: Optional[Dict[str, str]] = None, | |
**kwargs: Any, | |
) -> List[Document]: | |
"""Return docs selected using the maximal marginal relevance.""" | |
# This is a temporary workaround to make the similarity search | |
# asynchronous. The proper solution is to make the similarity search | |
# asynchronous in the vector store implementations. | |
func = partial( | |
self.max_marginal_relevance_search_by_vector, | |
embedding, | |
k=k, | |
fetch_k=fetch_k, | |
lambda_mult=lambda_mult, | |
filter=filter, | |
**kwargs, | |
) | |
return await asyncio.get_event_loop().run_in_executor(None, func) | |