__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

aptanhua@216.73.217.25: ~ $
import os
import random
import threading
import time
import weakref
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute

if TYPE_CHECKING:
    from typing import Any, Callable, Optional

    from sentry_sdk._types import SpanJSON


class SpanBatcher(Batcher["SpanJSON"]):
    # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
    # a bit of a buffer for spans that appear between the trigger to flush
    # and actually flushing the buffer.
    #
    # The max limits are all per trace (per bucket).
    MAX_ENVELOPE_SIZE = 1000  # spans
    MAX_BEFORE_FLUSH = 1000
    MAX_BEFORE_DROP = 2000
    MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024  # 5 MB

    FLUSH_WAIT_TIME = 5.0

    TYPE = "span"
    CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"

    def __init__(
        self,
        capture_func: "Callable[[Envelope], None]",
        record_lost_func: "Callable[..., None]",
    ) -> None:
        # Spans from different traces cannot be emitted in the same envelope
        # since the envelope contains a shared trace header. That's why we bucket
        # by trace_id, so that we can then send the buckets each in its own
        # envelope.
        # trace_id -> span buffer
        self._span_buffer: dict[str, list["SpanJSON"]] = defaultdict(list)
        self._running_size: dict[str, int] = defaultdict(lambda: 0)
        self._capture_func = capture_func
        self._record_lost_func = record_lost_func
        self._running = True
        self._lock = threading.Lock()
        self._active: "threading.local" = threading.local()

        self._last_full_flush: float = time.monotonic()  # drives time-based flushes
        self._flush_event = threading.Event()
        self._pending_flush: set[str] = set()  # buckets to be flushed

        self._flusher: "Optional[threading.Thread]" = None
        self._flusher_pid: "Optional[int]" = None

        # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
        if hasattr(os, "register_at_fork"):
            weak_reset = weakref.WeakMethod(self._reset_thread_state)

            def _reset_in_child() -> None:
                method = weak_reset()
                if method is not None:
                    method()

            os.register_at_fork(after_in_child=_reset_in_child)

    def _reset_thread_state(self) -> None:
        self._span_buffer = defaultdict(list)
        self._running_size = defaultdict(lambda: 0)
        self._running = True

        self._lock = threading.Lock()
        self._active = threading.local()

        self._last_full_flush = time.monotonic()
        self._flush_event = threading.Event()
        self._pending_flush = set()

        self._flusher = None
        self._flusher_pid = None

    def _flush_loop(self) -> None:
        self._active.flag = True
        while self._running:
            jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
            self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter)
            self._flush_event.clear()

            self._flush(only_pending=True)

            if (
                time.monotonic() - self._last_full_flush
                >= self.FLUSH_WAIT_TIME + jitter
            ):
                self._flush()
                self._last_full_flush = time.monotonic()

    def add(self, span: "SpanJSON") -> None:
        # Bail out if the current thread is already executing batcher code.
        # This prevents deadlocks when code running inside the batcher (e.g.
        # _add_to_envelope during flush, or _flush_event.wait/set) triggers
        # a GC-emitted warning that routes back through the logging
        # integration into add().
        if getattr(self._active, "flag", False):
            return None

        self._active.flag = True

        try:
            if not self._ensure_thread() or self._flusher is None:
                return None

            with self._lock:
                size = len(self._span_buffer[span["trace_id"]])
                if size >= self.MAX_BEFORE_DROP:
                    self._record_lost_func(
                        reason="queue_overflow",
                        data_category="span",
                        quantity=1,
                    )
                    return None

                self._span_buffer[span["trace_id"]].append(span)
                self._running_size[span["trace_id"]] += self._estimate_size(span)

                if (
                    size + 1 >= self.MAX_BEFORE_FLUSH
                    or self._running_size[span["trace_id"]]
                    >= self.MAX_BYTES_BEFORE_FLUSH
                ):
                    self._pending_flush.add(span["trace_id"])
                    notify = True
                else:
                    notify = False

            if notify:
                self._flush_event.set()
        finally:
            self._active.flag = False

    @staticmethod
    def _estimate_size(item: "SpanJSON") -> int:
        # Rough estimate of serialized span size that's quick to compute.
        # 210 is the rough size of the payload without attributes, and then we
        # estimate the attributes separately.
        estimate = 210
        for value in (item.get("attributes") or {}).values():
            estimate += 50

            if isinstance(value, str):
                estimate += len(value)
            else:
                estimate += len(str(value))

        return estimate

    @staticmethod
    def _to_transport_format(item: "SpanJSON") -> "Any":
        res = {k: v for k, v in item.items() if k not in ("_segment_span",)}

        if item.get("attributes"):
            res["attributes"] = {
                k: serialize_attribute(v) for (k, v) in item["attributes"].items()
            }
        else:
            del res["attributes"]

        return res

    def _flush(self, only_pending: bool = False) -> None:
        with self._lock:
            if only_pending:
                buckets = list(self._pending_flush)
            else:
                # flush whole buffer, e.g. if the SDK is shutting down
                buckets = list(self._span_buffer.keys())

            self._pending_flush.clear()

            if not buckets:
                return

            envelopes = []

            for bucket_id in buckets:
                spans = self._span_buffer.get(bucket_id)
                if not spans:
                    continue

                dsc = spans[0]["_segment_span"]._dynamic_sampling_context()

                # Max per envelope is 1000, so if we happen to have more than
                # 1000 spans in one bucket, we'll need to separate them.
                for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
                    end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

                    envelope = Envelope(
                        headers={
                            "sent_at": format_timestamp(datetime.now(timezone.utc)),
                            "trace": dsc,
                        }
                    )

                    envelope.add_item(
                        Item(
                            type=self.TYPE,
                            content_type=self.CONTENT_TYPE,
                            headers={
                                "item_count": end - start,
                            },
                            payload=PayloadRef(
                                json={
                                    "version": 2,
                                    "items": [
                                        self._to_transport_format(spans[j])
                                        for j in range(start, end)
                                    ],
                                }
                            ),
                        )
                    )

                    envelopes.append(envelope)

                del self._span_buffer[bucket_id]
                del self._running_size[bucket_id]

        for envelope in envelopes:
            self._capture_func(envelope)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
ai Folder 0755
crons Folder 0755
integrations Folder 0755
profiler Folder 0755
__init__.py File 1.46 KB 0644
_batcher.py File 5.7 KB 0644
_compat.py File 3 KB 0644
_init_implementation.py File 2.43 KB 0644
_log_batcher.py File 1.88 KB 0644
_lru_cache.py File 1.14 KB 0644
_metrics_batcher.py File 1.21 KB 0644
_queue.py File 10.98 KB 0644
_span_batcher.py File 8.12 KB 0644
_types.py File 13.16 KB 0644
_werkzeug.py File 3.85 KB 0644
api.py File 15.59 KB 0644
attachments.py File 2.95 KB 0644
client.py File 49.95 KB 0644
consts.py File 61.95 KB 0644
debug.py File 959 B 0644
envelope.py File 9.37 KB 0644
feature_flags.py File 2.5 KB 0644
hub.py File 24.54 KB 0644
logger.py File 2.6 KB 0644
metrics.py File 1.42 KB 0644
monitor.py File 4.47 KB 0644
py.typed File 0 B 0644
scope.py File 74.09 KB 0644
scrubber.py File 5.99 KB 0644
serializer.py File 12.82 KB 0644
session.py File 5.08 KB 0644
sessions.py File 8.59 KB 0644
spotlight.py File 11.85 KB 0644
traces.py File 25.08 KB 0644
tracing.py File 50.33 KB 0644
tracing_utils.py File 54.36 KB 0644
transport.py File 44.41 KB 0644
types.py File 1.24 KB 0644
utils.py File 65.96 KB 0644
worker.py File 10.91 KB 0644