__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

aptanhua@216.73.216.200: ~ $
import itertools
import json
import sys
import warnings
from collections import OrderedDict
from functools import wraps
from typing import TYPE_CHECKING

import sentry_sdk
from sentry_sdk.ai.utils import (
    GEN_AI_ALLOWED_MESSAGE_ROLES,
    get_start_span_function,
    normalize_message_roles,
    set_data_normalized,
    transform_content_part,
    truncate_and_annotate_messages,
)
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing_utils import (
    _get_value,
    has_span_streaming_enabled,
    should_truncate_gen_ai_input,
)
from sentry_sdk.utils import capture_internal_exceptions, logger

if TYPE_CHECKING:
    from typing import (
        Any,
        AsyncIterator,
        Callable,
        Dict,
        Iterator,
        List,
        Optional,
        Union,
    )
    from uuid import UUID

    from sentry_sdk._types import TextPart
    from sentry_sdk.tracing import Span


try:
    from langchain_core.agents import AgentFinish
    from langchain_core.callbacks import (
        BaseCallbackHandler,
        BaseCallbackManager,
        Callbacks,
        manager,
    )
    from langchain_core.messages import BaseMessage
    from langchain_core.outputs import LLMResult

except ImportError:
    raise DidNotEnable("langchain not installed")


try:
    # >=v1
    from langchain_classic.agents import AgentExecutor  # type: ignore[import-not-found]
except ImportError:
    try:
        # <v1
        from langchain.agents import AgentExecutor
    # Catch TypeError due to changes in type hint evaluation order: https://github.com/pydantic/pydantic/issues/13036
    except (ImportError, TypeError):
        AgentExecutor = None


# Conditional imports for embeddings providers
try:
    from langchain_openai import OpenAIEmbeddings  # type: ignore[import-not-found]
except ImportError:
    OpenAIEmbeddings = None

try:
    from langchain_openai import AzureOpenAIEmbeddings
except ImportError:
    AzureOpenAIEmbeddings = None

try:
    from langchain_google_vertexai import (  # type: ignore[import-not-found]
        VertexAIEmbeddings,
    )
except ImportError:
    VertexAIEmbeddings = None

try:
    from langchain_aws import BedrockEmbeddings  # type: ignore[import-not-found]
except ImportError:
    BedrockEmbeddings = None

try:
    from langchain_cohere import CohereEmbeddings  # type: ignore[import-not-found]
except ImportError:
    CohereEmbeddings = None

try:
    from langchain_mistralai import (  # type: ignore[import-not-found]
        MistralAIEmbeddings,
    )
except ImportError:
    MistralAIEmbeddings = None

try:
    from langchain_huggingface import (  # type: ignore[import-not-found]
        HuggingFaceEmbeddings,
    )
except ImportError:
    HuggingFaceEmbeddings = None

try:
    from langchain_ollama import OllamaEmbeddings  # type: ignore[import-not-found]
except ImportError:
    OllamaEmbeddings = None


def _get_ai_system(all_params: "Dict[str, Any]") -> "Optional[str]":
    ai_type = all_params.get("_type")

    if not ai_type or not isinstance(ai_type, str):
        return None

    return ai_type


DATA_FIELDS = {
    "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY,
    "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
    "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
    "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY,
    "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
    "tool_calls": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
    "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
    "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}


def _transform_langchain_content_block(
    content_block: "Dict[str, Any]",
) -> "Dict[str, Any]":
    """
    Transform a LangChain content block using the shared transform_content_part function.

    Returns the original content block if transformation is not applicable
    (e.g., for text blocks or unrecognized formats).
    """
    result = transform_content_part(content_block)
    return result if result is not None else content_block


def _transform_langchain_message_content(content: "Any") -> "Any":
    """
    Transform LangChain message content, handling both string content and
    list of content blocks.
    """
    if isinstance(content, str):
        return content

    if isinstance(content, (list, tuple)):
        transformed = []
        for block in content:
            if isinstance(block, dict):
                transformed.append(_transform_langchain_content_block(block))
            else:
                transformed.append(block)
        return transformed

    return content


def _get_system_instructions(messages: "List[List[BaseMessage]]") -> "List[str]":
    system_instructions = []

    for list_ in messages:
        for message in list_:
            # type of content: str | list[str | dict] | None
            if message.type == "system" and isinstance(message.content, str):
                system_instructions.append(message.content)

            elif message.type == "system" and isinstance(message.content, list):
                for item in message.content:
                    if isinstance(item, str):
                        system_instructions.append(item)

                    elif isinstance(item, dict) and item.get("type") == "text":
                        instruction = item.get("text")
                        if isinstance(instruction, str):
                            system_instructions.append(instruction)

    return system_instructions


def _transform_system_instructions(
    system_instructions: "List[str]",
) -> "List[TextPart]":
    return [
        {
            "type": "text",
            "content": instruction,
        }
        for instruction in system_instructions
    ]


class LangchainIntegration(Integration):
    identifier = "langchain"
    origin = f"auto.ai.{identifier}"

    def __init__(
        self: "LangchainIntegration",
        include_prompts: bool = True,
        max_spans: "Optional[int]" = None,
    ) -> None:
        self.include_prompts = include_prompts
        self.max_spans = max_spans

        if max_spans is not None:
            warnings.warn(
                "The `max_spans` parameter of `LangchainIntegration` is "
                "deprecated and will be removed in version 3.0 of sentry-sdk.",
                DeprecationWarning,
                stacklevel=2,
            )

    @staticmethod
    def setup_once() -> None:
        manager._configure = _wrap_configure(manager._configure)

        if AgentExecutor is not None:
            AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke)
            AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream)

        # Patch embeddings providers
        _patch_embeddings_provider(OpenAIEmbeddings)
        _patch_embeddings_provider(AzureOpenAIEmbeddings)
        _patch_embeddings_provider(VertexAIEmbeddings)
        _patch_embeddings_provider(BedrockEmbeddings)
        _patch_embeddings_provider(CohereEmbeddings)
        _patch_embeddings_provider(MistralAIEmbeddings)
        _patch_embeddings_provider(HuggingFaceEmbeddings)
        _patch_embeddings_provider(OllamaEmbeddings)


class SentryLangchainCallback(BaseCallbackHandler):  # type: ignore[misc]
    """Callback handler that creates Sentry spans."""

    def __init__(
        self, max_span_map_size: "Optional[int]", include_prompts: bool
    ) -> None:
        self.span_map: "OrderedDict[UUID, Union[sentry_sdk.tracing.Span, StreamedSpan]]" = OrderedDict()
        self.max_span_map_size = max_span_map_size
        self.include_prompts = include_prompts

    def gc_span_map(self) -> None:
        if self.max_span_map_size is not None:
            while len(self.span_map) > self.max_span_map_size:
                run_id, span = self.span_map.popitem(last=False)
                self._exit_span(span, run_id)

    def _handle_error(self, run_id: "UUID", error: "Any") -> None:
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            sentry_sdk.capture_exception(
                error, span._scope if isinstance(span, StreamedSpan) else span.scope
            )

            span.__exit__(type(error), error, error.__traceback__)
            del self.span_map[run_id]

    def _normalize_langchain_message(self, message: "BaseMessage") -> "Any":
        # Transform content to handle multimodal data (images, audio, video, files)
        transformed_content = _transform_langchain_message_content(message.content)
        parsed = {"role": message.type, "content": transformed_content}
        parsed.update(message.additional_kwargs)
        return parsed

    def _create_span(
        self: "SentryLangchainCallback",
        run_id: "UUID",
        parent_id: "Optional[Any]",
        op: str,
        name: str,
        origin: str,
    ) -> "Union[sentry_sdk.tracing.Span, StreamedSpan]":
        span = None
        if parent_id:
            parent_span: "Optional[Union[sentry_sdk.tracing.Span, StreamedSpan]]" = (
                self.span_map.get(parent_id)
            )
            if parent_span:
                span = (
                    sentry_sdk.traces.start_span(
                        parent_span=parent_span,
                        name=name,
                        attributes={
                            "sentry.op": op,
                            "sentry.origin": origin,
                        },
                    )
                    if isinstance(parent_span, StreamedSpan)
                    else parent_span.start_child(op=op, name=name, origin=origin)
                )

        if span is None:
            span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
            span = (
                sentry_sdk.traces.start_span(
                    name=name,
                    attributes={
                        "sentry.op": op,
                        "sentry.origin": origin,
                    },
                )
                if span_streaming
                else sentry_sdk.start_span(op=op, name=name, origin=origin)
            )

        span.__enter__()
        self.span_map[run_id] = span
        self.gc_span_map()
        return span

    def _exit_span(
        self: "SentryLangchainCallback",
        span: "Union[sentry_sdk.tracing.Span, StreamedSpan]",
        run_id: "UUID",
    ) -> None:
        span.__exit__(None, None, None)
        del self.span_map[run_id]

    def on_llm_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        prompts: "List[str]",
        *,
        run_id: "UUID",
        tags: "Optional[List[str]]" = None,
        parent_run_id: "Optional[UUID]" = None,
        metadata: "Optional[Dict[str, Any]]" = None,
        **kwargs: "Any",
    ) -> "Any":
        with capture_internal_exceptions():
            if not run_id:
                return

            all_params = kwargs.get("invocation_params", {})
            all_params.update(serialized.get("kwargs", {}))

            model = (
                all_params.get("model")
                or all_params.get("model_name")
                or all_params.get("model_id")
                or ""
            )

            span = self._create_span(
                run_id,
                parent_run_id,
                op=OP.GEN_AI_TEXT_COMPLETION,
                name=f"text_completion {model}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )
            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion")

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

            if model:
                set_on_span(
                    SPANDATA.GEN_AI_REQUEST_MODEL,
                    model,
                )

            ai_system = _get_ai_system(all_params)
            if ai_system:
                set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system)

            for key, attribute in DATA_FIELDS.items():
                if key in all_params and all_params[key] is not None:
                    set_data_normalized(span, attribute, all_params[key], unpack=False)

            _set_tools_on_span(span, all_params.get("tools"))

            if should_send_default_pii() and self.include_prompts:
                normalized_messages = [
                    {
                        "role": GEN_AI_ALLOWED_MESSAGE_ROLES.USER,
                        "content": {"type": "text", "text": prompt},
                    }
                    for prompt in prompts
                ]

                client = sentry_sdk.get_client()
                scope = sentry_sdk.get_current_scope()
                messages_data = (
                    truncate_and_annotate_messages(normalized_messages, span, scope)
                    if should_truncate_gen_ai_input(client.options)
                    else normalized_messages
                )
                if messages_data is not None:
                    set_data_normalized(
                        span,
                        SPANDATA.GEN_AI_REQUEST_MESSAGES,
                        messages_data,
                        unpack=False,
                    )

    def on_chat_model_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        messages: "List[List[BaseMessage]]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model starts running."""
        with capture_internal_exceptions():
            if not run_id:
                return

            all_params = kwargs.get("invocation_params", {})
            all_params.update(serialized.get("kwargs", {}))

            model = (
                all_params.get("model")
                or all_params.get("model_name")
                or all_params.get("model_id")
                or ""
            )

            span = self._create_span(
                run_id,
                kwargs.get("parent_run_id"),
                op=OP.GEN_AI_CHAT,
                name=f"chat {model}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )
            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
            if model:
                set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model)

            ai_system = _get_ai_system(all_params)
            if ai_system:
                set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system)

            agent_metadata = kwargs.get("metadata")
            if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata:
                set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"])

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(
                    SPANDATA.GEN_AI_FUNCTION_ID,
                    run_name,
                )

            for key, attribute in DATA_FIELDS.items():
                if key in all_params and all_params[key] is not None:
                    set_data_normalized(span, attribute, all_params[key], unpack=False)

            _set_tools_on_span(span, all_params.get("tools"))

            if should_send_default_pii() and self.include_prompts:
                system_instructions = _get_system_instructions(messages)
                if len(system_instructions) > 0:
                    set_on_span(
                        SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
                        json.dumps(_transform_system_instructions(system_instructions)),
                    )

                normalized_messages = []
                for list_ in messages:
                    for message in list_:
                        if message.type == "system":
                            continue

                        normalized_messages.append(
                            self._normalize_langchain_message(message)
                        )
                normalized_messages = normalize_message_roles(normalized_messages)

                client = sentry_sdk.get_client()
                scope = sentry_sdk.get_current_scope()
                messages_data = (
                    truncate_and_annotate_messages(normalized_messages, span, scope)
                    if should_truncate_gen_ai_input(client.options)
                    else normalized_messages
                )
                if messages_data is not None:
                    set_data_normalized(
                        span,
                        SPANDATA.GEN_AI_REQUEST_MESSAGES,
                        messages_data,
                        unpack=False,
                    )

    def on_chat_model_end(
        self: "SentryLangchainCallback",
        response: "LLMResult",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_RESPONSE_TEXT,
                    [[x.text for x in list_] for list_ in response.generations],
                )

            _record_token_usage(span, response)
            self._exit_span(span, run_id)

    def on_llm_end(
        self: "SentryLangchainCallback",
        response: "LLMResult",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when LLM ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            try:
                generation = response.generations[0][0]
            except IndexError:
                generation = None

            if generation is not None:
                set_on_span = (
                    span.set_attribute
                    if isinstance(span, StreamedSpan)
                    else span.set_data
                )

                try:
                    response_model = generation.message.response_metadata.get(
                        "model_name"
                    )
                    if response_model is not None:
                        set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)
                except AttributeError:
                    pass

                try:
                    finish_reason = generation.generation_info.get("finish_reason")
                    if finish_reason is not None:
                        set_on_span(
                            SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS,
                            [finish_reason],
                        )
                except AttributeError:
                    pass

                try:
                    if should_send_default_pii() and self.include_prompts:
                        tool_calls = getattr(generation.message, "tool_calls", None)
                        if tool_calls is not None and tool_calls != []:
                            set_data_normalized(
                                span,
                                SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
                                tool_calls,
                                unpack=False,
                            )
                except AttributeError:
                    pass

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_RESPONSE_TEXT,
                    [[x.text for x in list_] for list_ in response.generations],
                )

            _record_token_usage(span, response)
            self._exit_span(span, run_id)

    def on_llm_error(
        self: "SentryLangchainCallback",
        error: "Union[Exception, KeyboardInterrupt]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when LLM errors."""
        self._handle_error(run_id, error)

    def on_chat_model_error(
        self: "SentryLangchainCallback",
        error: "Union[Exception, KeyboardInterrupt]",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when Chat Model errors."""
        self._handle_error(run_id, error)

    def on_agent_finish(
        self: "SentryLangchainCallback",
        finish: "AgentFinish",
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items()
                )

            self._exit_span(span, run_id)

    def on_tool_start(
        self: "SentryLangchainCallback",
        serialized: "Dict[str, Any]",
        input_str: str,
        *,
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when tool starts running."""
        with capture_internal_exceptions():
            if not run_id:
                return

            tool_name = serialized.get("name") or kwargs.get("name") or ""

            span = self._create_span(
                run_id,
                kwargs.get("parent_run_id"),
                op=OP.GEN_AI_EXECUTE_TOOL,
                name=f"execute_tool {tool_name}".strip(),
                origin=LangchainIntegration.origin,
            )

            set_on_span = (
                span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
            )

            set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool")
            set_on_span(SPANDATA.GEN_AI_TOOL_NAME, tool_name)

            tool_description = serialized.get("description")
            if tool_description is not None:
                set_on_span(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description)

            agent_metadata = kwargs.get("metadata")
            if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata:
                set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"])

            run_name = kwargs.get("name")
            if run_name:
                set_on_span(
                    SPANDATA.GEN_AI_FUNCTION_ID,
                    run_name,
                )

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_TOOL_INPUT,
                    kwargs.get("inputs", [input_str]),
                )

    def on_tool_end(
        self: "SentryLangchainCallback", output: str, *, run_id: "UUID", **kwargs: "Any"
    ) -> "Any":
        """Run when tool ends running."""
        with capture_internal_exceptions():
            if not run_id or run_id not in self.span_map:
                return

            span = self.span_map[run_id]

            if should_send_default_pii() and self.include_prompts:
                set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output)

            self._exit_span(span, run_id)

    def on_tool_error(
        self,
        error: "SentryLangchainCallback",
        *args: "Union[Exception, KeyboardInterrupt]",
        run_id: "UUID",
        **kwargs: "Any",
    ) -> "Any":
        """Run when tool errors."""
        self._handle_error(run_id, error)


def _extract_tokens(
    token_usage: "Any",
) -> "tuple[Optional[int], Optional[int], Optional[int]]":
    if not token_usage:
        return None, None, None

    input_tokens = _get_value(token_usage, "prompt_tokens") or _get_value(
        token_usage, "input_tokens"
    )
    output_tokens = _get_value(token_usage, "completion_tokens") or _get_value(
        token_usage, "output_tokens"
    )
    total_tokens = _get_value(token_usage, "total_tokens")

    return input_tokens, output_tokens, total_tokens


def _extract_tokens_from_generations(
    generations: "Any",
) -> "tuple[Optional[int], Optional[int], Optional[int]]":
    """Extract token usage from response.generations structure."""
    if not generations:
        return None, None, None

    total_input = 0
    total_output = 0
    total_total = 0

    for gen_list in generations:
        for gen in gen_list:
            token_usage = _get_token_usage(gen)
            input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
            total_input += input_tokens if input_tokens is not None else 0
            total_output += output_tokens if output_tokens is not None else 0
            total_total += total_tokens if total_tokens is not None else 0

    return (
        total_input if total_input > 0 else None,
        total_output if total_output > 0 else None,
        total_total if total_total > 0 else None,
    )


def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]":
    """
    Check multiple paths to extract token usage from different objects.
    """
    possible_names = ("usage", "token_usage", "usage_metadata")

    message = _get_value(obj, "message")
    if message is not None:
        for name in possible_names:
            usage = _get_value(message, name)
            if usage is not None:
                return usage

    llm_output = _get_value(obj, "llm_output")
    if llm_output is not None:
        for name in possible_names:
            usage = _get_value(llm_output, name)
            if usage is not None:
                return usage

    for name in possible_names:
        usage = _get_value(obj, name)
        if usage is not None:
            return usage

    return None


def _record_token_usage(span: "Union[Span, StreamedSpan]", response: "Any") -> None:
    token_usage = _get_token_usage(response)
    if token_usage:
        input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
    else:
        input_tokens, output_tokens, total_tokens = _extract_tokens_from_generations(
            response.generations
        )

    set_on_span = (
        span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
    )

    if input_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens)

    if output_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens)

    if total_tokens is not None:
        set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens)


def _get_request_data(
    obj: "Any", args: "Any", kwargs: "Any"
) -> "tuple[Optional[str], Optional[List[Any]]]":
    """
    Get the agent name and available tools for the agent.
    """
    agent = getattr(obj, "agent", None)
    runnable = getattr(agent, "runnable", None)
    runnable_config = getattr(runnable, "config", {})
    tools = (
        getattr(obj, "tools", None)
        or getattr(agent, "tools", None)
        or runnable_config.get("tools")
        or runnable_config.get("available_tools")
    )
    tools = tools if tools and len(tools) > 0 else None

    try:
        agent_name = None
        if len(args) > 1:
            agent_name = args[1].get("run_name")
        if agent_name is None:
            agent_name = runnable_config.get("run_name")
    except Exception:
        pass

    return (agent_name, tools)


def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]":
    """Parse and simplify tools into a cleaner format."""
    if not tools:
        return None

    if not isinstance(tools, (list, tuple)):
        return None

    simplified_tools = []
    for tool in tools:
        try:
            if isinstance(tool, dict):
                if "function" in tool and isinstance(tool["function"], dict):
                    func = tool["function"]
                    simplified_tool = {
                        "name": func.get("name"),
                        "description": func.get("description"),
                    }
                    if simplified_tool["name"]:
                        simplified_tools.append(simplified_tool)
                elif "name" in tool:
                    simplified_tool = {
                        "name": tool.get("name"),
                        "description": tool.get("description"),
                    }
                    simplified_tools.append(simplified_tool)
                else:
                    name = (
                        tool.get("name")
                        or tool.get("tool_name")
                        or tool.get("function_name")
                    )
                    if name:
                        simplified_tools.append(
                            {
                                "name": name,
                                "description": tool.get("description")
                                or tool.get("desc"),
                            }
                        )
            elif hasattr(tool, "name"):
                simplified_tool = {
                    "name": getattr(tool, "name", None),
                    "description": getattr(tool, "description", None)
                    or getattr(tool, "desc", None),
                }
                if simplified_tool["name"]:
                    simplified_tools.append(simplified_tool)
            elif hasattr(tool, "__name__"):
                simplified_tools.append(
                    {
                        "name": tool.__name__,
                        "description": getattr(tool, "__doc__", None),
                    }
                )
            else:
                tool_str = str(tool)
                if tool_str and tool_str != "":
                    simplified_tools.append({"name": tool_str, "description": None})
        except Exception:
            continue

    return simplified_tools if simplified_tools else None


def _set_tools_on_span(span: "Union[Span, StreamedSpan]", tools: "Any") -> None:
    """Set available tools data on a span if tools are provided."""
    if tools is not None:
        simplified_tools = _simplify_langchain_tools(tools)
        if simplified_tools:
            set_data_normalized(
                span,
                SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS,
                simplified_tools,
                unpack=False,
            )


def _wrap_configure(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_configure(
        callback_manager_cls: type,
        inheritable_callbacks: "Callbacks" = None,
        local_callbacks: "Callbacks" = None,
        *args: "Any",
        **kwargs: "Any",
    ) -> "Any":
        integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
        if integration is None:
            return f(
                callback_manager_cls,
                inheritable_callbacks,
                local_callbacks,
                *args,
                **kwargs,
            )

        local_callbacks = local_callbacks or []

        # Handle each possible type of local_callbacks. For each type, we
        # extract the list of callbacks to check for SentryLangchainCallback,
        # and define a function that would add the SentryLangchainCallback
        # to the existing callbacks list.
        if isinstance(local_callbacks, BaseCallbackManager):
            callbacks_list = local_callbacks.handlers
        elif isinstance(local_callbacks, BaseCallbackHandler):
            callbacks_list = [local_callbacks]
        elif isinstance(local_callbacks, list):
            callbacks_list = local_callbacks
        else:
            logger.debug("Unknown callback type: %s", local_callbacks)
            # Just proceed with original function call
            return f(
                callback_manager_cls,
                inheritable_callbacks,
                local_callbacks,
                *args,
                **kwargs,
            )

        # Handle each possible type of inheritable_callbacks.
        if isinstance(inheritable_callbacks, BaseCallbackManager):
            inheritable_callbacks_list = inheritable_callbacks.handlers
        elif isinstance(inheritable_callbacks, list):
            inheritable_callbacks_list = inheritable_callbacks
        else:
            inheritable_callbacks_list = []

        if not any(
            isinstance(cb, SentryLangchainCallback)
            for cb in itertools.chain(callbacks_list, inheritable_callbacks_list)
        ):
            sentry_handler = SentryLangchainCallback(
                integration.max_spans,
                integration.include_prompts,
            )
            if isinstance(local_callbacks, BaseCallbackManager):
                local_callbacks = local_callbacks.copy()
                local_callbacks.handlers = [
                    *local_callbacks.handlers,
                    sentry_handler,
                ]
            elif isinstance(local_callbacks, BaseCallbackHandler):
                local_callbacks = [local_callbacks, sentry_handler]
            else:
                local_callbacks = [*local_callbacks, sentry_handler]

        return f(
            callback_manager_cls,
            inheritable_callbacks,
            local_callbacks,
            *args,
            **kwargs,
        )

    return new_configure


def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        run_name, tools = _get_request_data(self, args, kwargs)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                attributes={
                    "sentry.op": OP.GEN_AI_INVOKE_AGENT,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent",
                    SPANDATA.GEN_AI_RESPONSE_STREAMING: False,
                },
            ) as span:
                if run_name:
                    span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

                _set_tools_on_span(span, tools)

                # Run the agent
                result = f(self, *args, **kwargs)

                input = result.get("input")
                if (
                    input is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    normalized_messages = normalize_message_roles([input])

                    client = sentry_sdk.get_client()
                    scope = sentry_sdk.get_current_scope()
                    messages_data = (
                        truncate_and_annotate_messages(normalized_messages, span, scope)
                        if should_truncate_gen_ai_input(client.options)
                        else normalized_messages
                    )
                    if messages_data is not None:
                        set_data_normalized(
                            span,
                            SPANDATA.GEN_AI_REQUEST_MESSAGES,
                            messages_data,
                            unpack=False,
                        )

                output = result.get("output")
                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                return result
        else:
            start_span_function = get_start_span_function()

            with start_span_function(
                op=OP.GEN_AI_INVOKE_AGENT,
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                origin=LangchainIntegration.origin,
            ) as span:
                if run_name:
                    span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
                span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False)

                _set_tools_on_span(span, tools)

                # Run the agent
                result = f(self, *args, **kwargs)

                input = result.get("input")
                if (
                    input is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    normalized_messages = normalize_message_roles([input])

                    client = sentry_sdk.get_client()
                    scope = sentry_sdk.get_current_scope()
                    messages_data = (
                        truncate_and_annotate_messages(normalized_messages, span, scope)
                        if should_truncate_gen_ai_input(client.options)
                        else normalized_messages
                    )
                    if messages_data is not None:
                        set_data_normalized(
                            span,
                            SPANDATA.GEN_AI_REQUEST_MESSAGES,
                            messages_data,
                            unpack=False,
                        )

                output = result.get("output")
                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                return result

    return new_invoke


def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]":
    @wraps(f)
    def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        run_name, tools = _get_request_data(self, args, kwargs)

        if has_span_streaming_enabled(client.options):
            span = sentry_sdk.traces.start_span(
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                attributes={
                    "sentry.op": OP.GEN_AI_INVOKE_AGENT,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent",
                    SPANDATA.GEN_AI_RESPONSE_STREAMING: True,
                },
            )

            if run_name:
                span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name)
        else:
            start_span_function = get_start_span_function()

            span = start_span_function(
                op=OP.GEN_AI_INVOKE_AGENT,
                name=f"invoke_agent {run_name}" if run_name else "invoke_agent",
                origin=LangchainIntegration.origin,
            )
            span.__enter__()

            span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
            span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

            if run_name:
                span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name)

        _set_tools_on_span(span, tools)

        input = args[0].get("input") if len(args) >= 1 else None
        if (
            input is not None
            and should_send_default_pii()
            and integration.include_prompts
        ):
            normalized_messages = normalize_message_roles([input])

            client = sentry_sdk.get_client()
            scope = sentry_sdk.get_current_scope()
            messages_data = (
                truncate_and_annotate_messages(normalized_messages, span, scope)
                if should_truncate_gen_ai_input(client.options)
                else normalized_messages
            )
            if messages_data is not None:
                set_data_normalized(
                    span,
                    SPANDATA.GEN_AI_REQUEST_MESSAGES,
                    messages_data,
                    unpack=False,
                )

        # Run the agent
        result = f(self, *args, **kwargs)

        old_iterator = result

        def new_iterator() -> "Iterator[Any]":
            exc_info: "tuple[Any, Any, Any]" = (None, None, None)
            try:
                for event in old_iterator:
                    yield event

                try:
                    output = event.get("output")
                except Exception:
                    output = None

                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                span.__exit__(None, None, None)
            except Exception:
                exc_info = sys.exc_info()
                with capture_internal_exceptions():
                    span.__exit__(*exc_info)
                raise

        async def new_iterator_async() -> "AsyncIterator[Any]":
            exc_info: "tuple[Any, Any, Any]" = (None, None, None)
            try:
                async for event in old_iterator:
                    yield event

                try:
                    output = event.get("output")
                except Exception:
                    output = None

                if (
                    output is not None
                    and should_send_default_pii()
                    and integration.include_prompts
                ):
                    set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)

                span.__exit__(None, None, None)
            except Exception:
                exc_info = sys.exc_info()
                with capture_internal_exceptions():
                    span.__exit__(*exc_info)
                raise

        if str(type(result)) == "<class 'async_generator'>":
            result = new_iterator_async()
        else:
            result = new_iterator()

        return result

    return new_stream


def _patch_embeddings_provider(provider_class: "Any") -> None:
    """Patch an embeddings provider class with monitoring wrappers."""
    if provider_class is None:
        return

    if hasattr(provider_class, "embed_documents"):
        provider_class.embed_documents = _wrap_embedding_method(
            provider_class.embed_documents
        )
    if hasattr(provider_class, "embed_query"):
        provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query)
    if hasattr(provider_class, "aembed_documents"):
        provider_class.aembed_documents = _wrap_async_embedding_method(
            provider_class.aembed_documents
        )
    if hasattr(provider_class, "aembed_query"):
        provider_class.aembed_query = _wrap_async_embedding_method(
            provider_class.aembed_query
        )


def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
    """Wrap sync embedding methods (embed_documents and embed_query)."""

    @wraps(f)
    def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return f(self, *args, **kwargs)

        model_name = getattr(self, "model", None) or getattr(self, "model_name", None)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"embeddings {model_name}" if model_name else "embeddings",
                attributes={
                    "sentry.op": OP.GEN_AI_EMBEDDINGS,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "embeddings",
                },
            ) as span:
                if model_name:
                    span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = f(self, *args, **kwargs)
                return result
        else:
            with sentry_sdk.start_span(
                op=OP.GEN_AI_EMBEDDINGS,
                name=f"embeddings {model_name}" if model_name else "embeddings",
                origin=LangchainIntegration.origin,
            ) as span:
                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
                if model_name:
                    span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = f(self, *args, **kwargs)
                return result

    return new_embedding_method


def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
    """Wrap async embedding methods (aembed_documents and aembed_query)."""

    @wraps(f)
    async def new_async_embedding_method(
        self: "Any", *args: "Any", **kwargs: "Any"
    ) -> "Any":
        client = sentry_sdk.get_client()
        integration = client.get_integration(LangchainIntegration)
        if integration is None:
            return await f(self, *args, **kwargs)

        model_name = getattr(self, "model", None) or getattr(self, "model_name", None)

        if has_span_streaming_enabled(client.options):
            with sentry_sdk.traces.start_span(
                name=f"embeddings {model_name}" if model_name else "embeddings",
                attributes={
                    "sentry.op": OP.GEN_AI_EMBEDDINGS,
                    "sentry.origin": LangchainIntegration.origin,
                    SPANDATA.GEN_AI_OPERATION_NAME: "embeddings",
                },
            ) as span:
                if model_name:
                    span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = await f(self, *args, **kwargs)
                return result
        else:
            with sentry_sdk.start_span(
                op=OP.GEN_AI_EMBEDDINGS,
                name=f"embeddings {model_name}" if model_name else "embeddings",
                origin=LangchainIntegration.origin,
            ) as span:
                span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
                if model_name:
                    span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)

                # Capture input if PII is allowed
                if (
                    should_send_default_pii()
                    and integration.include_prompts
                    and len(args) > 0
                ):
                    input_data = args[0]
                    # Normalize to list format
                    texts = input_data if isinstance(input_data, list) else [input_data]
                    set_data_normalized(
                        span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
                    )

                result = await f(self, *args, **kwargs)
                return result

    return new_async_embedding_method

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
celery Folder 0755
django Folder 0755
google_genai Folder 0755
grpc Folder 0755
openai_agents Folder 0755
opentelemetry Folder 0755
pydantic_ai Folder 0755
redis Folder 0755
spark Folder 0755
__init__.py File 12.51 KB 0644
_asgi_common.py File 4 KB 0644
_wsgi_common.py File 7.28 KB 0644
aiohttp.py File 19.28 KB 0644
aiomysql.py File 9.09 KB 0644
anthropic.py File 39 KB 0644
argv.py File 876 B 0644
ariadne.py File 5.7 KB 0644
arq.py File 9.23 KB 0644
asgi.py File 20.06 KB 0644
asyncio.py File 9.28 KB 0644
asyncpg.py File 9.68 KB 0644
atexit.py File 1.51 KB 0644
aws_lambda.py File 17.41 KB 0644
beam.py File 4.91 KB 0644
boto3.py File 6.2 KB 0644
bottle.py File 7.21 KB 0644
chalice.py File 4.51 KB 0644
clickhouse_driver.py File 5.85 KB 0644
cloud_resource_context.py File 7.49 KB 0644
cohere.py File 10.44 KB 0644
dedupe.py File 1.86 KB 0644
dramatiq.py File 8.02 KB 0644
excepthook.py File 2.25 KB 0644
executing.py File 1.93 KB 0644
falcon.py File 9.04 KB 0644
fastapi.py File 5.28 KB 0644
flask.py File 8.27 KB 0644
gcp.py File 10.57 KB 0644
gnu_backtrace.py File 2.72 KB 0644
gql.py File 4.93 KB 0644
graphene.py File 5.71 KB 0644
httpx.py File 9.79 KB 0644
httpx2.py File 9.8 KB 0644
huey.py File 8.19 KB 0644
huggingface_hub.py File 15.28 KB 0644
langchain.py File 48.31 KB 0644
langgraph.py File 18.13 KB 0644
launchdarkly.py File 1.87 KB 0644
litellm.py File 13.03 KB 0644
litestar.py File 11.46 KB 0644
logging.py File 15.69 KB 0644
loguru.py File 6.35 KB 0644
mcp.py File 23.12 KB 0644
modules.py File 787 B 0644
openai.py File 53.38 KB 0644
openfeature.py File 1.08 KB 0644
otlp.py File 7.99 KB 0644
pure_eval.py File 4.41 KB 0644
pymongo.py File 8.21 KB 0644
pyramid.py File 7.42 KB 0644
pyreqwest.py File 6.82 KB 0644
quart.py File 7.32 KB 0644
ray.py File 5.75 KB 0644
rq.py File 7.81 KB 0644
rust_tracing.py File 9.44 KB 0644
sanic.py File 15.25 KB 0644
serverless.py File 1.58 KB 0644
socket.py File 5.02 KB 0644
sqlalchemy.py File 5.24 KB 0644
starlette.py File 27.93 KB 0644
starlite.py File 11.04 KB 0644
statsig.py File 1.19 KB 0644
stdlib.py File 14.01 KB 0644
strawberry.py File 17.39 KB 0644
sys_exit.py File 2.35 KB 0644
threading.py File 6.88 KB 0644
tornado.py File 10.79 KB 0644
trytond.py File 1.67 KB 0644
typer.py File 1.72 KB 0644
unleash.py File 1.02 KB 0644
unraisablehook.py File 1.65 KB 0644
wsgi.py File 15.03 KB 0644