__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
import os
import random
import threading
import time
import weakref
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from sentry_sdk._batcher import Batcher
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute
if TYPE_CHECKING:
from typing import Any, Callable, Optional
from sentry_sdk._types import SpanJSON
class SpanBatcher(Batcher["SpanJSON"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
#
# The max limits are all per trace (per bucket).
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 2000
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
FLUSH_WAIT_TIME = 5.0
TYPE = "span"
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
def __init__(
self,
capture_func: "Callable[[Envelope], None]",
record_lost_func: "Callable[..., None]",
) -> None:
# Spans from different traces cannot be emitted in the same envelope
# since the envelope contains a shared trace header. That's why we bucket
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["SpanJSON"]] = defaultdict(list)
self._running_size: dict[str, int] = defaultdict(lambda: 0)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()
self._last_full_flush: float = time.monotonic() # drives time-based flushes
self._flush_event = threading.Event()
self._pending_flush: set[str] = set() # buckets to be flushed
self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)
def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()
os.register_at_fork(after_in_child=_reset_in_child)
def _reset_thread_state(self) -> None:
self._span_buffer = defaultdict(list)
self._running_size = defaultdict(lambda: 0)
self._running = True
self._lock = threading.Lock()
self._active = threading.local()
self._last_full_flush = time.monotonic()
self._flush_event = threading.Event()
self._pending_flush = set()
self._flusher = None
self._flusher_pid = None
def _flush_loop(self) -> None:
self._active.flag = True
while self._running:
jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
self._flush_event.wait(timeout=self.FLUSH_WAIT_TIME + jitter)
self._flush_event.clear()
self._flush(only_pending=True)
if (
time.monotonic() - self._last_full_flush
>= self.FLUSH_WAIT_TIME + jitter
):
self._flush()
self._last_full_flush = time.monotonic()
def add(self, span: "SpanJSON") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None
self._active.flag = True
try:
if not self._ensure_thread() or self._flusher is None:
return None
with self._lock:
size = len(self._span_buffer[span["trace_id"]])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None
self._span_buffer[span["trace_id"]].append(span)
self._running_size[span["trace_id"]] += self._estimate_size(span)
if (
size + 1 >= self.MAX_BEFORE_FLUSH
or self._running_size[span["trace_id"]]
>= self.MAX_BYTES_BEFORE_FLUSH
):
self._pending_flush.add(span["trace_id"])
notify = True
else:
notify = False
if notify:
self._flush_event.set()
finally:
self._active.flag = False
@staticmethod
def _estimate_size(item: "SpanJSON") -> int:
# Rough estimate of serialized span size that's quick to compute.
# 210 is the rough size of the payload without attributes, and then we
# estimate the attributes separately.
estimate = 210
for value in (item.get("attributes") or {}).values():
estimate += 50
if isinstance(value, str):
estimate += len(value)
else:
estimate += len(str(value))
return estimate
@staticmethod
def _to_transport_format(item: "SpanJSON") -> "Any":
res = {k: v for k, v in item.items() if k not in ("_segment_span",)}
if item.get("attributes"):
res["attributes"] = {
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
}
else:
del res["attributes"]
return res
def _flush(self, only_pending: bool = False) -> None:
with self._lock:
if only_pending:
buckets = list(self._pending_flush)
else:
# flush whole buffer, e.g. if the SDK is shutting down
buckets = list(self._span_buffer.keys())
self._pending_flush.clear()
if not buckets:
return
envelopes = []
for bucket_id in buckets:
spans = self._span_buffer.get(bucket_id)
if not spans:
continue
dsc = spans[0]["_segment_span"]._dynamic_sampling_context()
# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)
envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"version": 2,
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
],
}
),
)
)
envelopes.append(envelope)
del self._span_buffer[bucket_id]
del self._running_size[bucket_id]
for envelope in envelopes:
self._capture_func(envelope)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| ai | Folder | 0755 |
|
|
| crons | Folder | 0755 |
|
|
| integrations | Folder | 0755 |
|
|
| profiler | Folder | 0755 |
|
|
| __init__.py | File | 1.46 KB | 0644 |
|
| _batcher.py | File | 5.7 KB | 0644 |
|
| _compat.py | File | 3 KB | 0644 |
|
| _init_implementation.py | File | 2.43 KB | 0644 |
|
| _log_batcher.py | File | 1.88 KB | 0644 |
|
| _lru_cache.py | File | 1.14 KB | 0644 |
|
| _metrics_batcher.py | File | 1.21 KB | 0644 |
|
| _queue.py | File | 10.98 KB | 0644 |
|
| _span_batcher.py | File | 8.12 KB | 0644 |
|
| _types.py | File | 13.16 KB | 0644 |
|
| _werkzeug.py | File | 3.85 KB | 0644 |
|
| api.py | File | 15.59 KB | 0644 |
|
| attachments.py | File | 2.95 KB | 0644 |
|
| client.py | File | 49.95 KB | 0644 |
|
| consts.py | File | 61.95 KB | 0644 |
|
| debug.py | File | 959 B | 0644 |
|
| envelope.py | File | 9.37 KB | 0644 |
|
| feature_flags.py | File | 2.5 KB | 0644 |
|
| hub.py | File | 24.54 KB | 0644 |
|
| logger.py | File | 2.6 KB | 0644 |
|
| metrics.py | File | 1.42 KB | 0644 |
|
| monitor.py | File | 4.47 KB | 0644 |
|
| py.typed | File | 0 B | 0644 |
|
| scope.py | File | 74.09 KB | 0644 |
|
| scrubber.py | File | 5.99 KB | 0644 |
|
| serializer.py | File | 12.82 KB | 0644 |
|
| session.py | File | 5.08 KB | 0644 |
|
| sessions.py | File | 8.59 KB | 0644 |
|
| spotlight.py | File | 11.85 KB | 0644 |
|
| traces.py | File | 25.08 KB | 0644 |
|
| tracing.py | File | 50.33 KB | 0644 |
|
| tracing_utils.py | File | 54.36 KB | 0644 |
|
| transport.py | File | 44.41 KB | 0644 |
|
| types.py | File | 1.24 KB | 0644 |
|
| utils.py | File | 65.96 KB | 0644 |
|
| worker.py | File | 10.91 KB | 0644 |
|