__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

aptanhua@216.73.217.25: ~ $
import os
import random
import threading
import weakref
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Generic, TypeVar

from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp

if TYPE_CHECKING:
    from typing import Any, Callable, Optional

T = TypeVar("T")


class Batcher(Generic[T]):
    MAX_BEFORE_FLUSH = 100
    MAX_BEFORE_DROP = 1_000
    FLUSH_WAIT_TIME = 5.0

    TYPE = ""
    CONTENT_TYPE = ""

    def __init__(
        self,
        capture_func: "Callable[[Envelope], None]",
        record_lost_func: "Callable[..., None]",
    ) -> None:
        self._buffer: "list[T]" = []
        self._capture_func = capture_func
        self._record_lost_func = record_lost_func
        self._running = True
        self._lock = threading.Lock()
        self._active: "threading.local" = threading.local()

        self._flush_event: "threading.Event" = threading.Event()

        self._flusher: "Optional[threading.Thread]" = None
        self._flusher_pid: "Optional[int]" = None

        # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
        if hasattr(os, "register_at_fork"):
            weak_reset = weakref.WeakMethod(self._reset_thread_state)

            def _reset_in_child() -> None:
                method = weak_reset()
                if method is not None:
                    method()

            os.register_at_fork(after_in_child=_reset_in_child)

    def _reset_thread_state(self) -> None:
        self._buffer = []
        self._running = True
        self._lock = threading.Lock()
        self._active = threading.local()
        self._flush_event = threading.Event()
        self._flusher = None
        self._flusher_pid = None

    def _ensure_thread(self) -> bool:
        """For forking processes we might need to restart this thread.
        This ensures that our process actually has that thread running.
        """
        if not self._running:
            return False

        pid = os.getpid()
        if self._flusher_pid == pid:
            return True

        with self._lock:
            # Recheck to make sure another thread didn't get here and start the
            # the flusher in the meantime
            if self._flusher_pid == pid:
                return True

            self._flusher_pid = pid

            self._flusher = threading.Thread(target=self._flush_loop)
            self._flusher.daemon = True

            try:
                self._flusher.start()
            except RuntimeError:
                # Unfortunately at this point the interpreter is in a state that no
                # longer allows us to spawn a thread and we have to bail.
                self._running = False
                return False

        return True

    def _flush_loop(self) -> None:
        # Mark the flush-loop thread as active for its entire lifetime so
        # that any re-entrant add() triggered by GC warnings during wait(),
        # flush(), or Event operations is silently dropped instead of
        # deadlocking on internal locks.
        self._active.flag = True
        while self._running:
            self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
            self._flush_event.clear()
            self._flush()

    def add(self, item: "T") -> None:
        # Bail out if the current thread is already executing batcher code.
        # This prevents deadlocks when code running inside the batcher (e.g.
        # _add_to_envelope during flush, or _flush_event.wait/set) triggers
        # a GC-emitted warning that routes back through the logging
        # integration into add().
        if getattr(self._active, "flag", False):
            return None

        self._active.flag = True
        try:
            if not self._ensure_thread() or self._flusher is None:
                return None

            with self._lock:
                if len(self._buffer) >= self.MAX_BEFORE_DROP:
                    self._record_lost(item)
                    return None

                self._buffer.append(item)
                if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
                    self._flush_event.set()
        finally:
            self._active.flag = False

    def kill(self) -> None:
        if self._flusher is None:
            return

        self._running = False
        self._flush_event.set()
        self._flusher = None

    def flush(self) -> None:
        was_active = getattr(self._active, "flag", False)
        self._active.flag = True
        try:
            self._flush()
        finally:
            self._active.flag = was_active

    def _add_to_envelope(self, envelope: "Envelope") -> None:
        envelope.add_item(
            Item(
                type=self.TYPE,
                content_type=self.CONTENT_TYPE,
                headers={
                    "item_count": len(self._buffer),
                },
                payload=PayloadRef(
                    json={
                        "version": 2,
                        "items": [
                            self._to_transport_format(item) for item in self._buffer
                        ],
                    }
                ),
            )
        )

    def _flush(self) -> "Optional[Envelope]":
        envelope = Envelope(
            headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
        )
        with self._lock:
            if len(self._buffer) == 0:
                return None

            self._add_to_envelope(envelope)
            self._buffer.clear()

        self._capture_func(envelope)
        return envelope

    def _record_lost(self, item: "T") -> None:
        pass

    @staticmethod
    def _to_transport_format(item: "T") -> "Any":
        pass

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
ai Folder 0755
crons Folder 0755
integrations Folder 0755
profiler Folder 0755
__init__.py File 1.46 KB 0644
_batcher.py File 5.7 KB 0644
_compat.py File 3 KB 0644
_init_implementation.py File 2.43 KB 0644
_log_batcher.py File 1.88 KB 0644
_lru_cache.py File 1.14 KB 0644
_metrics_batcher.py File 1.21 KB 0644
_queue.py File 10.98 KB 0644
_span_batcher.py File 8.12 KB 0644
_types.py File 13.16 KB 0644
_werkzeug.py File 3.85 KB 0644
api.py File 15.59 KB 0644
attachments.py File 2.95 KB 0644
client.py File 49.95 KB 0644
consts.py File 61.95 KB 0644
debug.py File 959 B 0644
envelope.py File 9.37 KB 0644
feature_flags.py File 2.5 KB 0644
hub.py File 24.54 KB 0644
logger.py File 2.6 KB 0644
metrics.py File 1.42 KB 0644
monitor.py File 4.47 KB 0644
py.typed File 0 B 0644
scope.py File 74.09 KB 0644
scrubber.py File 5.99 KB 0644
serializer.py File 12.82 KB 0644
session.py File 5.08 KB 0644
sessions.py File 8.59 KB 0644
spotlight.py File 11.85 KB 0644
traces.py File 25.08 KB 0644
tracing.py File 50.33 KB 0644
tracing_utils.py File 54.36 KB 0644
transport.py File 44.41 KB 0644
types.py File 1.24 KB 0644
utils.py File 65.96 KB 0644
worker.py File 10.91 KB 0644