Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated fix for refs/heads/otel_segfault #201

Open
wants to merge 1 commit into
base: otel_segfault
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/core/tsi/fake_transport_security.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@

#include "src/core/tsi/fake_transport_security.h"

#include <grpc/support/alloc.h>
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include <string.h>

#include "absl/log/check.h"
#include "absl/log/log.h"

#include <grpc/support/alloc.h>
#include <grpc/support/port_platform.h>

#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/transport_security_grpc.h"
#include "src/core/tsi/transport_security_interface.h"
#include "absl/log/check.h"
#include "absl/log/log.h"

// --- Constants. ---
#define TSI_FAKE_FRAME_HEADER_SIZE 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ def __init__(
generic_method_attribute_filter: Optional[Callable[[str], bool]] = None,
):
new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()]
import sys; sys.stderr.write(f"[xuanwn_testing] init_CsmOpenTelemetryPlugin\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] init_CsmOpenTelemetryPlugin\n")
sys.stderr.flush()
super().__init__(
plugin_options=new_options,
meter_provider=meter_provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ class _OpenTelemetryPlugin:
identifier: str

def __init__(self, plugin: OpenTelemetryPlugin):
import sys; sys.stderr.write(f"[xuanwn_testing] init__OpenTelemetryPlugin\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] init__OpenTelemetryPlugin\n")
sys.stderr.flush()
self._plugin = plugin
self._metric_to_recorder = dict()
self.identifier = str(id(self))
Expand All @@ -93,7 +96,10 @@ def __init__(self, plugin: OpenTelemetryPlugin):
self._metric_to_recorder = self._register_metrics(
meter, enabled_metrics
)
import sys; sys.stderr.write(f"[xuanwn_testing] after_init__OpenTelemetryPlugin\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] after_init__OpenTelemetryPlugin\n")
sys.stderr.flush()

def _should_record(self, stats_data: StatsData) -> bool:
# Decide if this plugin should record the stats_data.
Expand Down Expand Up @@ -377,22 +383,34 @@ def __init__(
self._server_option_activated = False

def observability_init(self):
import sys; sys.stderr.write(f"[xuanwn_testing] observability_init\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] observability_init\n")
sys.stderr.flush()
try:
_cyobservability.activate_stats()
self.set_stats(True)
except Exception as e: # pylint: disable=broad-except
raise ValueError(f"Activate observability metrics failed with: {e}")
import sys; sys.stderr.write(f"[xuanwn_testing] after_activate_stats\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] after_activate_stats\n")
sys.stderr.flush()
try:
_cyobservability.cyobservability_init(self._exporter)
# TODO(xuanwn): Use specific exceptons
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception("Initiate observability failed with: %s", e)
import sys; sys.stderr.write(f"[xuanwn_testing] after_cyobservability_init\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] after_cyobservability_init\n")
sys.stderr.flush()

grpc._observability.observability_init(self)
import sys; sys.stderr.write(f"[xuanwn_testing] after_observability_init\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] after_observability_init\n")
sys.stderr.flush()

def observability_deinit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
Expand Down Expand Up @@ -512,7 +530,10 @@ def get_enabled_optional_labels(self) -> List[OptionalLabelType]:
def _start_open_telemetry_observability(
otel_o11y: OpenTelemetryObservability,
) -> None:
import sys; sys.stderr.write(f"[xuanwn_testing] _start_open_telemetry_observability\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] _start_open_telemetry_observability\n")
sys.stderr.flush()
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if _OPEN_TELEMETRY_OBSERVABILITY is None:
Expand All @@ -522,7 +543,13 @@ def _start_open_telemetry_observability(
raise RuntimeError(
"gPRC Python observability was already initialized!"
)
import sys; sys.stderr.write(f"[xuanwn_testing] after_start_open_telemetry_observability\n"); sys.stderr.flush()
import sys

sys.stderr.write(
f"[xuanwn_testing] after_start_open_telemetry_observability\n"
)
sys.stderr.flush()


def _end_open_telemetry_observability() -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def __init__(
Return True means the original method name will be used, False means method name will
be replaced with "other".
"""
import sys; sys.stderr.write(f"[xuanwn_testing] init_OpenTelemetryPlugin\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] init_OpenTelemetryPlugin\n")
sys.stderr.flush()
self.plugin_options = plugin_options
self.meter_provider = meter_provider
self.target_attribute_filter = target_attribute_filter or (
Expand All @@ -147,7 +150,12 @@ def register_global(self) -> None:
Raises:
RuntimeError: If a global plugin was already registered.
"""
import sys; sys.stderr.write(f"[xuanwn_testing] OpenTelemetryPlugin.register_global\n"); sys.stderr.flush()
import sys

sys.stderr.write(
f"[xuanwn_testing] OpenTelemetryPlugin.register_global\n"
)
sys.stderr.flush()
_open_telemetry_observability.start_open_telemetry_observability(
plugins=self._plugins
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ void PythonOpenCensusServerCallTracer::RecordSendInitialMetadata(

void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
LOG(INFO) << "[xuanwn_testing] [Server] calling RecordReceivedInitialMetadata";
LOG(INFO)
<< "[xuanwn_testing] [Server] calling RecordReceivedInitialMetadata";
ServerO11yMetadata som;
GetO11yMetadata(recv_initial_metadata, &som);
path_ = std::move(som.path);
Expand Down Expand Up @@ -182,7 +183,8 @@ void PythonOpenCensusServerCallTracer::RecordReceivedMessage(

void PythonOpenCensusServerCallTracer::RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
LOG(INFO) << "[xuanwn_testing] [Server] calling RecordReceivedDecompressedMessage";
LOG(INFO)
<< "[xuanwn_testing] [Server] calling RecordReceivedDecompressedMessage";
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
LOG(INFO) << "[xuanwn_testing] [Server] calling end";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,22 @@ def _run(
logger.info("Starting python xDS Interop Client.")
csm_plugin = None
if args.enable_csm_observability:
import sys; sys.stderr.write(f"[xuanwn_testing] calling__prepare_csm_observability_plugin\n"); sys.stderr.flush()
import sys

sys.stderr.write(
f"[xuanwn_testing] calling__prepare_csm_observability_plugin\n"
)
sys.stderr.flush()
csm_plugin = _prepare_csm_observability_plugin()
import sys; sys.stderr.write(f"[xuanwn_testing] register_global\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] register_global\n")
sys.stderr.flush()
csm_plugin.register_global()
import sys; sys.stderr.write(f"[xuanwn_testing] register_global\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] register_global\n")
sys.stderr.flush()
global _global_server # pylint: disable=global-statement
method_handles = []
channel_configs = {}
Expand All @@ -559,7 +570,10 @@ def _run(
)
channel_configs[method] = channel_config
method_handles.append(_MethodHandle(args.num_channels, channel_config))
import sys; sys.stderr.write(f"[xuanwn_testing] create_global_server\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] create_global_server\n")
sys.stderr.flush()
_global_server = grpc.server(concurrent.futures.ThreadPoolExecutor())
_global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}")
test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server(
Expand All @@ -570,7 +584,10 @@ def _run(
_global_server,
)
grpc_admin.add_admin_servicers(_global_server)
import sys; sys.stderr.write(f"[xuanwn_testing] start_global_server\n"); sys.stderr.flush()
import sys

sys.stderr.write(f"[xuanwn_testing] start_global_server\n")
sys.stderr.flush()
_global_server.start()
_global_server.wait_for_termination()
for method_handle in method_handles:
Expand Down