"""Hardened SearX web search and web fetch tools. Every external response is sanitized and wrapped in security delimiters before being returned to the LLM. See deerflow.security for the pipeline. Image fetching is intentionally NOT supported. Agents in this build are text-only researchers; image_search_tool was removed and web_fetch_tool refuses any response whose Content-Type is not a textual media type. If you need an image-aware agent, add a dedicated tool with explicit user review — do not lift these restrictions in place. """ from __future__ import annotations from urllib.parse import quote import httpx from langchain.tools import tool from deerflow.config import get_app_config from deerflow.security.content_delimiter import wrap_untrusted_content from deerflow.security.html_cleaner import extract_secure_text from deerflow.security.sanitizer import sanitizer DEFAULT_SEARX_URL = "http://localhost:8888" DEFAULT_TIMEOUT = 30.0 DEFAULT_USER_AGENT = "DeerFlow-Hardened/1.0 (+searx)" # Allowed Content-Type prefixes for web_fetch responses. Anything else # (image/*, audio/*, video/*, application/octet-stream, font/*, ...) is # rejected before its body is read into memory. ALLOWED_CONTENT_TYPE_PREFIXES = ( "text/", "application/json", "application/xml", "application/xhtml+xml", "application/ld+json", "application/atom+xml", "application/rss+xml", ) def _is_text_content_type(header_value: str) -> bool: """True if the Content-Type header is a textual media type we're willing to read.""" if not header_value: # No header at all → refuse: we don't speculate. return False media = header_value.split(";", 1)[0].strip().lower() return any(media == prefix.rstrip("/") or media.startswith(prefix) for prefix in ALLOWED_CONTENT_TYPE_PREFIXES) def _tool_extra(name: str) -> dict: """Read the model_extra dict for a tool config entry, defensively.""" cfg = get_app_config().get_tool_config(name) if cfg is None: return {} return getattr(cfg, "model_extra", {}) or {} def _searx_url(tool_name: str = "web_search") -> str: return _tool_extra(tool_name).get("searx_url", DEFAULT_SEARX_URL) def _http_get(url: str, params: dict, timeout: float = DEFAULT_TIMEOUT) -> dict: """GET a SearX endpoint and return parsed JSON. Raises on transport/HTTP error.""" with httpx.Client(headers={"User-Agent": DEFAULT_USER_AGENT}) as client: response = client.get(url, params=params, timeout=timeout) response.raise_for_status() return response.json() @tool("web_search", parse_docstring=True) def web_search_tool(query: str, max_results: int = 10) -> str: """Search the web via the private hardened SearX instance. All results are sanitized against prompt-injection vectors and wrapped in <<>> markers. Args: query: Search keywords. max_results: Maximum results to return (capped by config). """ extra = _tool_extra("web_search") cap = int(extra.get("max_results", 10)) searx_url = extra.get("searx_url", DEFAULT_SEARX_URL) limit = max(1, min(int(max_results), cap)) try: data = _http_get( f"{searx_url}/search", {"q": quote(query), "format": "json"}, ) except Exception as exc: return wrap_untrusted_content({"error": f"Search failed: {exc}"}) results = [] for item in data.get("results", [])[:limit]: results.append( { "title": sanitizer.sanitize(item.get("title", ""), max_length=200), "url": item.get("url", ""), "content": sanitizer.sanitize(item.get("content", ""), max_length=500), } ) return wrap_untrusted_content( { "query": query, "total_results": len(results), "results": results, } ) @tool("web_fetch", parse_docstring=True) async def web_fetch_tool(url: str, max_chars: int = 10000) -> str: """Fetch a web page and return sanitized visible text. Only textual responses are accepted (text/html, application/json, ...). Image, audio, video, and binary responses are refused before the body is read into memory — this build is text-only by policy. Dangerous HTML elements (script, style, iframe, form, ...) are stripped, invisible Unicode is removed, and the result is wrapped in security markers. Only call this for URLs returned by web_search or supplied directly by the user — do not invent URLs. Args: url: Absolute URL to fetch (must include scheme). max_chars: Maximum number of characters to return. """ extra = _tool_extra("web_fetch") cap = int(extra.get("max_chars", max_chars)) limit = max(256, min(int(max_chars), cap)) try: async with httpx.AsyncClient( headers={"User-Agent": DEFAULT_USER_AGENT}, follow_redirects=True, ) as client: # Stream so we can inspect headers BEFORE reading the body. # Refuses image/audio/video/binary responses without ever # touching their bytes. async with client.stream("GET", url, timeout=DEFAULT_TIMEOUT) as response: response.raise_for_status() content_type = response.headers.get("content-type", "") if not _is_text_content_type(content_type): return wrap_untrusted_content( { "error": "Refused: non-text response (this build does not fetch images, audio, video or binary content).", "url": url, "content_type": content_type or "", } ) # Read at most ~4x the char limit in bytes to bound memory. # extract_secure_text + sanitizer will trim further. max_bytes = max(4096, limit * 4) buf = bytearray() async for chunk in response.aiter_bytes(): buf.extend(chunk) if len(buf) >= max_bytes: break html = buf.decode(response.encoding or "utf-8", errors="replace") except Exception as exc: return wrap_untrusted_content({"error": f"Fetch failed: {exc}", "url": url}) raw_text = extract_secure_text(html) clean_text = sanitizer.sanitize(raw_text, max_length=limit) return wrap_untrusted_content({"url": url, "content": clean_text}) # image_search_tool was intentionally removed in this hardened build. # Agents are text-only researchers; image fetching has no business in the # pipeline and only widens the attack surface (data exfiltration via # rendered tags, server-side image content, ...). If you need to # bring it back, build a separate tool with explicit user-side allowlist # and a render-side proxy — do not just paste the old function back.