|
|
|
|
|
from __future__ import annotations |
|
|
|
import asyncio |
|
from typing import Any, Callable, Optional, Union |
|
|
|
from pydantic import TypeAdapter, model_validator |
|
|
|
from metagpt.actions import Action |
|
from metagpt.config2 import config |
|
from metagpt.logs import logger |
|
from metagpt.tools.search_engine import SearchEngine |
|
from metagpt.tools.web_browser_engine import WebBrowserEngine |
|
from metagpt.utils.common import OutputParser |
|
from metagpt.utils.text import generate_prompt_chunk, reduce_message_length |
|
|
|
LANG_PROMPT = "Please respond in {language}." |
|
|
|
RESEARCH_BASE_SYSTEM = """You are an AI critical thinker research assistant. Your sole purpose is to write well \ |
|
written, critically acclaimed, objective and structured reports on the given text.""" |
|
|
|
RESEARCH_TOPIC_SYSTEM = "You are an AI researcher assistant, and your research topic is:\n#TOPIC#\n{topic}" |
|
|
|
SEARCH_TOPIC_PROMPT = """Please provide up to 2 necessary keywords related to your research topic for Google search. \ |
|
Your response must be in JSON format, for example: ["keyword1", "keyword2"].""" |
|
|
|
SUMMARIZE_SEARCH_PROMPT = """### Requirements |
|
1. The keywords related to your research topic and the search results are shown in the "Search Result Information" section. |
|
2. Provide up to {decomposition_nums} queries related to your research topic base on the search results. |
|
3. Please respond in the following JSON format: ["query1", "query2", "query3", ...]. |
|
|
|
### Search Result Information |
|
{search_results} |
|
""" |
|
|
|
COLLECT_AND_RANKURLS_PROMPT = """### Topic |
|
{topic} |
|
### Query |
|
{query} |
|
|
|
### The online search results |
|
{results} |
|
|
|
### Requirements |
|
Please remove irrelevant search results that are not related to the query or topic. Then, sort the remaining search results \ |
|
based on the link credibility. If two results have equal credibility, prioritize them based on the relevance. Provide the |
|
ranked results' indices in JSON format, like [0, 1, 3, 4, ...], without including other words. |
|
""" |
|
|
|
WEB_BROWSE_AND_SUMMARIZE_PROMPT = """### Requirements |
|
1. Utilize the text in the "Reference Information" section to respond to the question "{query}". |
|
2. If the question cannot be directly answered using the text, but the text is related to the research topic, please provide \ |
|
a comprehensive summary of the text. |
|
3. If the text is entirely unrelated to the research topic, please reply with a simple text "Not relevant." |
|
4. Include all relevant factual information, numbers, statistics, etc., if available. |
|
|
|
### Reference Information |
|
{content} |
|
""" |
|
|
|
|
|
CONDUCT_RESEARCH_PROMPT = """### Reference Information |
|
{content} |
|
|
|
### Requirements |
|
Please provide a detailed research report in response to the following topic: "{topic}", using the information provided \ |
|
above. The report must meet the following requirements: |
|
|
|
- Focus on directly addressing the chosen topic. |
|
- Ensure a well-structured and in-depth presentation, incorporating relevant facts and figures where available. |
|
- Present data and findings in an intuitive manner, utilizing feature comparative tables, if applicable. |
|
- The report should have a minimum word count of 2,000 and be formatted with Markdown syntax following APA style guidelines. |
|
- Include all source URLs in APA format at the end of the report. |
|
""" |
|
|
|
|
|
class CollectLinks(Action): |
|
"""Action class to collect links from a search engine.""" |
|
|
|
name: str = "CollectLinks" |
|
i_context: Optional[str] = None |
|
desc: str = "Collect links from a search engine." |
|
search_func: Optional[Any] = None |
|
search_engine: Optional[SearchEngine] = None |
|
rank_func: Optional[Callable[[list[str]], None]] = None |
|
|
|
@model_validator(mode="after") |
|
def validate_engine_and_run_func(self): |
|
if self.search_engine is None: |
|
self.search_engine = SearchEngine.from_search_config(self.config.search, proxy=self.config.proxy) |
|
return self |
|
|
|
async def run( |
|
self, |
|
topic: str, |
|
decomposition_nums: int = 4, |
|
url_per_query: int = 4, |
|
system_text: str | None = None, |
|
) -> dict[str, list[str]]: |
|
"""Run the action to collect links. |
|
|
|
Args: |
|
topic: The research topic. |
|
decomposition_nums: The number of search questions to generate. |
|
url_per_query: The number of URLs to collect per search question. |
|
system_text: The system text. |
|
|
|
Returns: |
|
A dictionary containing the search questions as keys and the collected URLs as values. |
|
""" |
|
system_text = system_text if system_text else RESEARCH_TOPIC_SYSTEM.format(topic=topic) |
|
keywords = await self._aask(SEARCH_TOPIC_PROMPT, [system_text]) |
|
try: |
|
keywords = OutputParser.extract_struct(keywords, list) |
|
keywords = TypeAdapter(list[str]).validate_python(keywords) |
|
except Exception as e: |
|
logger.exception(f"fail to get keywords related to the research topic '{topic}' for {e}") |
|
keywords = [topic] |
|
results = await asyncio.gather(*(self.search_engine.run(i, as_string=False) for i in keywords)) |
|
|
|
def gen_msg(): |
|
while True: |
|
search_results = "\n".join( |
|
f"#### Keyword: {i}\n Search Result: {j}\n" for (i, j) in zip(keywords, results) |
|
) |
|
prompt = SUMMARIZE_SEARCH_PROMPT.format( |
|
decomposition_nums=decomposition_nums, search_results=search_results |
|
) |
|
yield prompt |
|
remove = max(results, key=len) |
|
remove.pop() |
|
if len(remove) == 0: |
|
break |
|
|
|
model_name = config.llm.model |
|
prompt = reduce_message_length(gen_msg(), model_name, system_text, config.llm.max_token) |
|
logger.debug(prompt) |
|
queries = await self._aask(prompt, [system_text]) |
|
try: |
|
queries = OutputParser.extract_struct(queries, list) |
|
queries = TypeAdapter(list[str]).validate_python(queries) |
|
except Exception as e: |
|
logger.exception(f"fail to break down the research question due to {e}") |
|
queries = keywords |
|
ret = {} |
|
for query in queries: |
|
ret[query] = await self._search_and_rank_urls(topic, query, url_per_query) |
|
return ret |
|
|
|
async def _search_and_rank_urls(self, topic: str, query: str, num_results: int = 4) -> list[str]: |
|
"""Search and rank URLs based on a query. |
|
|
|
Args: |
|
topic: The research topic. |
|
query: The search query. |
|
num_results: The number of URLs to collect. |
|
|
|
Returns: |
|
A list of ranked URLs. |
|
""" |
|
max_results = max(num_results * 2, 6) |
|
results = await self.search_engine.run(query, max_results=max_results, as_string=False) |
|
if len(results) == 0: |
|
return [] |
|
_results = "\n".join(f"{i}: {j}" for i, j in zip(range(max_results), results)) |
|
prompt = COLLECT_AND_RANKURLS_PROMPT.format(topic=topic, query=query, results=_results) |
|
logger.debug(prompt) |
|
indices = await self._aask(prompt) |
|
try: |
|
indices = OutputParser.extract_struct(indices, list) |
|
assert all(isinstance(i, int) for i in indices) |
|
except Exception as e: |
|
logger.exception(f"fail to rank results for {e}") |
|
indices = list(range(max_results)) |
|
results = [results[i] for i in indices] |
|
if self.rank_func: |
|
results = self.rank_func(results) |
|
return [i["link"] for i in results[:num_results]] |
|
|
|
|
|
class WebBrowseAndSummarize(Action): |
|
"""Action class to explore the web and provide summaries of articles and webpages.""" |
|
|
|
name: str = "WebBrowseAndSummarize" |
|
i_context: Optional[str] = None |
|
desc: str = "Explore the web and provide summaries of articles and webpages." |
|
browse_func: Union[Callable[[list[str]], None], None] = None |
|
web_browser_engine: Optional[WebBrowserEngine] = None |
|
|
|
@model_validator(mode="after") |
|
def validate_engine_and_run_func(self): |
|
if self.web_browser_engine is None: |
|
self.web_browser_engine = WebBrowserEngine.from_browser_config( |
|
self.config.browser, |
|
browse_func=self.browse_func, |
|
proxy=self.config.proxy, |
|
) |
|
return self |
|
|
|
async def run( |
|
self, |
|
url: str, |
|
*urls: str, |
|
query: str, |
|
system_text: str = RESEARCH_BASE_SYSTEM, |
|
) -> dict[str, str]: |
|
"""Run the action to browse the web and provide summaries. |
|
|
|
Args: |
|
url: The main URL to browse. |
|
urls: Additional URLs to browse. |
|
query: The research question. |
|
system_text: The system text. |
|
|
|
Returns: |
|
A dictionary containing the URLs as keys and their summaries as values. |
|
""" |
|
contents = await self.web_browser_engine.run(url, *urls) |
|
if not urls: |
|
contents = [contents] |
|
|
|
summaries = {} |
|
prompt_template = WEB_BROWSE_AND_SUMMARIZE_PROMPT.format(query=query, content="{}") |
|
for u, content in zip([url, *urls], contents): |
|
content = content.inner_text |
|
chunk_summaries = [] |
|
for prompt in generate_prompt_chunk(content, prompt_template, self.llm.model, system_text, 4096): |
|
logger.debug(prompt) |
|
summary = await self._aask(prompt, [system_text]) |
|
if summary == "Not relevant.": |
|
continue |
|
chunk_summaries.append(summary) |
|
|
|
if not chunk_summaries: |
|
summaries[u] = None |
|
continue |
|
|
|
if len(chunk_summaries) == 1: |
|
summaries[u] = chunk_summaries[0] |
|
continue |
|
|
|
content = "\n".join(chunk_summaries) |
|
prompt = WEB_BROWSE_AND_SUMMARIZE_PROMPT.format(query=query, content=content) |
|
summary = await self._aask(prompt, [system_text]) |
|
summaries[u] = summary |
|
return summaries |
|
|
|
|
|
class ConductResearch(Action): |
|
"""Action class to conduct research and generate a research report.""" |
|
|
|
def __init__(self, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
async def run( |
|
self, |
|
topic: str, |
|
content: str, |
|
system_text: str = RESEARCH_BASE_SYSTEM, |
|
) -> str: |
|
"""Run the action to conduct research and generate a research report. |
|
|
|
Args: |
|
topic: The research topic. |
|
content: The content for research. |
|
system_text: The system text. |
|
|
|
Returns: |
|
The generated research report. |
|
""" |
|
prompt = CONDUCT_RESEARCH_PROMPT.format(topic=topic, content=content) |
|
logger.debug(prompt) |
|
self.llm.auto_max_tokens = True |
|
return await self._aask(prompt, [system_text]) |
|
|
|
|
|
def get_research_system_text(topic: str, language: str): |
|
"""Get the system text for conducting research. |
|
|
|
Args: |
|
topic: The research topic. |
|
language: The language for the system text. |
|
|
|
Returns: |
|
The system text for conducting research. |
|
""" |
|
return " ".join((RESEARCH_TOPIC_SYSTEM.format(topic=topic), LANG_PROMPT.format(language=language))) |
|
|