1,项目简介
WebThinker 是一个深度研究智能体,使 LRMs 能够在推理过程中自主搜索网络、导航网页,并撰写研究报告。这种技术的目标是革命性的:让用户通过简单的查询就能在互联网的海量信息中进行深度搜索、挖掘和整合,从而为知识密集型领域(如金融、科学、工程)的研究人员大幅降低信息收集的时间和成本。
项目地址:https://github.com/RUC-NLPIR/WebThinker
评价:很垃圾,需要接入微软宙斯的东西,Bing搜索。这些都需要替换成国产的,而且质量一般,不兼容图片等等等,远远不如字节的 DeerFlow 强大。
现有的开源深度搜索智能体通常采用检索增强生成(Retrieval-Augmented Generation, RAG)技术,依循预定义的工作流程,这限制了 LRM 探索更深层次网页信息的能力,也阻碍了 LRM 与搜索引擎之间的紧密交互。
- 传统 RAG:仅进行浅层搜索,缺乏思考深度和连贯性
- 进阶 RAG:使用预定义工作流,包括查询拆解、多轮 RAG 等,但仍缺乏灵活性
- WebThinker:在连续深思考过程中自主调用工具,实现端到端任务执行
WebThinker 使 LRM 能够在单次生成中自主执行操作,无需遵循预设的工作流程,从而实现真正的端到端任务执行。
2,替换原LLM
【替换Bing的检索】run_web_thinker.py & run_web_thinker_report.py
async def generate_response( client: AsyncOpenAI, prompt: str, semaphore: asyncio.Semaphore, generate_mode: str = "chat", temperature: float = 0.0, top_p: float = 1.0, max_tokens: int = 32768, repetition_penalty: float = 1.0, top_k: int = 1, min_p: float = 0.0, model_name: str = "QwQ-32B", stop: List[str] = [END_SEARCH_QUERY], retry_limit: int = 3, bad_words: List[str] = [f"{END_SEARCH_RESULT}\n\n{tokenizer.eos_token}"], ) -> Tuple[str, str]: """Generate a single response with retry logic""" for attempt in range(retry_limit): try: async with semaphore: if generate_mode == "chat": messages = [{"role": "user", "content": prompt}] if 'qwq' in model_name.lower() or 'deepseek' in model_name.lower() or 'r1' in model_name.lower(): formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) else: formatted_prompt = aux_tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) if ('deepseek' in model_name.lower() or 'r1' in model_name.lower()) and "<think>\n" not in formatted_prompt: formatted_prompt = formatted_prompt + "<think>\n" else: formatted_prompt = prompt response = await client.completions.create( model=model_name, prompt=formatted_prompt, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stop=stop, extra_body={ 'top_k': top_k, 'include_stop_str_in_output': True, 'repetition_penalty': repetition_penalty, # 'bad_words': bad_words, # 'min_p': min_p }, timeout=3600, ) return formatted_prompt, response.choices[0].text except Exception as e: print(f"Generate Response Error occurred: {e}, Starting retry attempt {attempt + 1}") # print(prompt) if "maximum context length" in str(e).lower(): # If length exceeds limit, reduce max_tokens by half max_tokens = max_tokens // 2 print(f"Reducing max_tokens to {max_tokens}") if attempt == retry_limit - 1: print(f"Failed after {retry_limit} attempts: {e}") return "", "" await asyncio.sleep(1 * (attempt + 1)) return "", ""
替换为:
async def generate_response( client: AsyncOpenAI, prompt: str, semaphore: asyncio.Semaphore, generate_mode: str = "chat", temperature: float = 0.0, top_p: float = 1.0, max_tokens: int = 32768, repetition_penalty: float = 1.0, top_k: int = 1, min_p: float = 0.0, model_name: str = "QwQ-32B", stop: List[str] = [END_SEARCH_QUERY], retry_limit: int = 3, bad_words: List[str] = [f"{END_SEARCH_RESULT}\n\n{tokenizer.eos_token}"], ) -> Tuple[str, str]: """Generate a single response with retry logic""" for attempt in range(retry_limit): try: async with semaphore: if generate_mode == "chat": messages = [{"role": "user", "content": prompt}] if 'qwq' in model_name.lower() or 'deepseek' in model_name.lower() or 'r1' in model_name.lower(): formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) else: formatted_prompt = aux_tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) if ('deepseek' in model_name.lower() or 'r1' in model_name.lower()) and "<think>\n" not in formatted_prompt: formatted_prompt = formatted_prompt + "<think>\n" else: formatted_prompt = prompt client = OpenAI(api_key="sk-*****", base_url="https://api.deepseek.com/beta") response = client.chat.completions.create( model="deepseek-chat", temperature=temperature, messages=[ {"role": "system", "content": formatted_prompt}, ], stream=False ) return formatted_prompt, response.choices[0].message.content except Exception as e: print(f"Generate Response Error occurred: {e}, Starting retry attempt {attempt + 1}") # print(prompt) if "maximum context length" in str(e).lower(): # If length exceeds limit, reduce max_tokens by half max_tokens = max_tokens // 2 print(f"Reducing max_tokens to {max_tokens}") if attempt == retry_limit - 1: print(f"Failed after {retry_limit} attempts: {e}") return "", "" await asyncio.sleep(1 * (attempt + 1)) return "", ""
def extract_relevant_info(search_results): """ Extract relevant information from Bing search results. Args: search_results (dict): JSON response from the Bing Web Search API. Returns: list: A list of dictionaries containing the extracted information. """ useful_info = [] if 'webPages' in search_results and 'value' in search_results['webPages']: for id, result in enumerate(search_results['webPages']['value']): info = { 'id': id + 1, # Increment id for easier subsequent operations 'title': result.get('name', ''), 'url': result.get('url', ''), 'site_name': result.get('siteName', ''), 'date': result.get('datePublished', '').split('T')[0], 'snippet': result.get('snippet', ''), # Remove HTML tags # Add context content to the information 'context': '' # Reserved field to be filled later } useful_info.append(info) return useful_info
替换为:
def extract_relevant_info(search_results): """ Extract relevant information from search results. Compatible with custom search result JSON (not Bing). Args: search_results (dict): JSON response containing search results. Returns: list: A list of dictionaries containing the extracted information. """ useful_info = [] if 'results' in search_results and isinstance(search_results['results'], list): for id, result in enumerate(search_results['results']): info = { 'id': id + 1, 'title': result.get('title', ''), 'url': result.get('url', ''), 'site_name': '', # 该结构中无 site_name 字段 'date': '', # 该结构中无 datePublished 字段 'snippet': result.get('content', ''), # 使用 content 字段 'context': '' } useful_info.append(info) return useful_info
3,替换Bing搜索
目前Bing关闭的API,原方式无法继续使用。修改 bing_search.py:
async def bing_web_search_async(query, subscription_key, endpoint, market='en-US', language='en', timeout=20): """ Perform an asynchronous search using the Bing Web Search API. Args: query (str): Search query. subscription_key (str): Subscription key for the Bing Search API. endpoint (str): Endpoint for the Bing Search API. market (str): Market, e.g., "en-US" or "zh-CN". language (str): Language of the results, e.g., "en". timeout (int): Request timeout in seconds. Returns: dict: JSON response of the search results. Returns empty dict if all retries fail. """ headers = { "Ocp-Apim-Subscription-Key": subscription_key } params = { "q": query, "mkt": market, "setLang": language, "textDecorations": True, "textFormat": "HTML" } max_retries = 5 retry_count = 0 while retry_count < max_retries: try: response = session.get(endpoint, headers=headers, params=params, timeout=timeout) response.raise_for_status() search_results = response.json() return search_results except Exception as e: retry_count += 1 if retry_count == max_retries: print(f"Bing Web Search Request Error occurred: {e} after {max_retries} retries") return {} print(f"Bing Web Search Request Error occurred, retrying ({retry_count}/{max_retries})...") time.sleep(1) # Wait 1 second between retries
替换为:
async def bing_web_search_async(query, subscription_key, endpoint, market='en-US', language='en', timeout=20): client = TavilyClient("tvly-dev-*****") response = client.search( query=query ) response.raise_for_status() search_results = response.json() print("=========",search_results) return search_results
4,命令行启动
【问题解决模式】
python scripts/run_web_thinker.py \ --single_question "What is OpenAI Deep Research?" \ --bing_subscription_key "YOUR_BING_SUBSCRIPTION_KEY" \ # 用于调用 Bing 搜索API实现搜索增强(如 RAG) --api_base_url "YOUR_API_BASE_URL" \ # 主模型的 API 接口基础地址,如 http://localhost:8000/v1 --model_name "QwQ-32B" \ # 主模型名称,可为本地模型或远程托管模型 --tokenizer_path "PATH_TO_YOUR_TOKENIZER" \ # 主模型使用的分词器路径,如 ./tokenizers/qwq32b/ --aux_api_base_url "YOUR_AUX_API_BASE_URL" \ # 辅助模型 API 接口地址 --aux_model_name "Qwen2.5-32B-Instruct" \ # 辅助模型名称,用于多模型结果对比、增强或检验 --aux_tokenizer_path "PATH_TO_YOUR_AUX_TOKENIZER" # 辅助模型的分词器路径
【报告生成模式】
python scripts/run_web_thinker_report.py \ --single_question "What are the models of OpenAI and what are the differences?" \ --bing_subscription_key "YOUR_BING_SUBSCRIPTION_KEY" \ --api_base_url "YOUR_API_BASE_URL" \ --model_name "QwQ-32B" \ --aux_api_base_url "YOUR_AUX_API_BASE_URL" \ --aux_model_name "Qwen2.5-32B-Instruct" \ --tokenizer_path "PATH_TO_YOUR_TOKENIZER" \ --aux_tokenizer_path "PATH_TO_YOUR_AUX_TOKENIZER"
5,web 启动
cd demo streamlit run run_demo.py
【报错】There is no current event loop in thread 'ScriptRunner.scriptThread'.
【解决】修改 bin_search.py 457 行:关闭加锁。
self.lock = asyncio.Lock() 👇 self.lock = None
【报错】Generate Response Error occurred: Connection error
【解决】修改 settings.py _load_client()
def _load_client(self, api_base_url, aux_api_base_url): self.client = AsyncOpenAI( api_key="empty", base_url=api_base_url, ) self.aux_client = AsyncOpenAI( api_key="empty", base_url=aux_api_base_url, ) 👇 def _load_client(self, api_base_url, aux_api_base_url): client = OpenAI(api_key="sk-***", base_url="https://api.deepseek.com/beta") self.client = client self.aux_client = client
# bing_search.py use_model_name='QwQ-32B', aux_model_name='Qwen2.5-72B-Instruct', 👇 use_model_name='deepseek-chat', aux_model_name='deepseek-chat',
【报错】Invalid max_tokens value, the valid range of max_tokens is [1, 8192]
【解决】修改 bing_search.py
response = await client.completions.create( model=env.aux_model_name, max_tokens=4096, prompt=prompt, timeout=3600, ) 👇 response = client.chat.completions.create( model=env.aux_model_name, messages=[ {"role": "system", "content": prompt}, ], )
【报错】Generate Response Error occurred: Error code: 400 - {'error': {'message': 'Invalid max_tokens value, the valid range of max_tokens is [1, 8192]', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_request_error'}}, Starting retry attempt 1
【解决】修改 run_login.py generate_response()
async def generate_response( client: AsyncOpenAI, prompt: str, temperature: float = 0.0, top_p: float = 1.0, max_tokens: int = 4096, repetition_penalty: float = 1.0, top_k: int = 1, min_p: float = 0.0, model_name: str = "QwQ-32B", stop: List[str] = ["<|end_search_query|>"], retry_limit: int = 3, ): """Generate a streaming response with retry logic""" for attempt in range(retry_limit): try: client = OpenAI(api_key="sk-****", base_url="https://api.deepseek.com/beta") response = client.chat.completions.create( model="deepseek-chat", temperature=temperature, messages=[ {"role": "system", "content": prompt}, ], stream=True ) async for chunk in response: if chunk.choices[0].message.content: yield chunk.choices[0].message.content return except Exception as e: print(f"Generate Response Error occurred: {e}, Starting retry attempt {attempt + 1}") if attempt == retry_limit - 1: print(f"Failed after {retry_limit} attempts: {e}") await asyncio.sleep(0.5 * (attempt + 1)) yield ""
【报错】Generate Response Error occurred: 'async for' requires an object with __aiter__ method, got Stream, Starting
【解决】DeepSeek的输出不是标准的迭代格式,修改 run_logit.py generate_response()
async for chunk in response: if chunk.choices[0].message.content: yield chunk.choices[0].message.content return 👇 yield response.choices[0].message.content