Skip to content

Commit

Permalink
Merge pull request #660 from bytedance/merge-memshell
Browse files Browse the repository at this point in the history
Merge memshell
  • Loading branch information
yoloyyh authored Jul 18, 2024
2 parents 6214ee8 + 35ce369 commit 57c6a9d
Show file tree
Hide file tree
Showing 32 changed files with 1,083 additions and 194 deletions.
105 changes: 104 additions & 1 deletion plugins/lib/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use parking_lot::Mutex;
use protobuf::Message;
use signal_hook::consts::SIGTERM;
use std::{
env,
fs::File,
io::{BufReader, BufWriter, Error, Read, Write},
sync::Arc,
Expand All @@ -32,6 +33,7 @@ pub enum EncodeType {
}
#[derive(Clone)]
pub struct Client {
high_writer: Arc<Mutex<BufWriter<File>>>,
writer: Arc<Mutex<BufWriter<File>>>,
reader: Arc<Mutex<BufReader<File>>>,
}
Expand All @@ -43,9 +45,27 @@ const READ_PIPE_FD: i32 = 3;
const WRITE_PIPE_FD: i32 = 1;
#[cfg(not(feature = "debug"))]
const WRITE_PIPE_FD: i32 = 4;
const HIGH_PRIORIT_FD: i32 = 5;

impl Client {

pub fn can_use_high() -> bool {
match env::var("ELKEID_PLUGIN_HIGH_PRIORITY_PIPE") {
Ok(value) => {
if !value.is_empty() {
return true;
}

}
Err(_) => {
return false;
}

}
false
}
pub fn new(ignore_terminate: bool) -> Self {

let writer = Arc::new(Mutex::new(BufWriter::with_capacity(512 * 1024, unsafe {
#[cfg(target_family = "unix")]
{
Expand All @@ -58,6 +78,37 @@ impl Client {
File::from_raw_handle(raw_handle.0 as _)
}
})));
let mut high_writer = writer.clone();
if Self::can_use_high() {
high_writer = Arc::new(Mutex::new(BufWriter::with_capacity(512 * 1024, unsafe {
#[cfg(target_family = "unix")]
{
File::from_raw_fd(HIGH_PRIORIT_FD)
}

#[cfg(target_family = "windows")]
{
let raw_handle = GetStdHandle(STD_OUTPUT_HANDLE).unwrap();
File::from_raw_handle(raw_handle.0 as _)
}
})));

let high_writer_c = high_writer.clone();
thread::spawn(move || {
let ticker = tick(Duration::from_millis(200));
loop {
select! {
recv(ticker)->_=>{
let mut w = high_writer_c.lock();
if w.flush().is_err() {
break;
}
}
}
}
});
}

let reader = Arc::new(Mutex::new(BufReader::new(unsafe {
#[cfg(target_family = "unix")]
{
Expand All @@ -70,6 +121,7 @@ impl Client {
File::from_raw_handle(raw_handle.0 as _)
}
})));

let writer_c = writer.clone();
thread::spawn(move || {
let ticker = tick(Duration::from_millis(200));
Expand All @@ -93,14 +145,17 @@ impl Client {
info!("received signal: {:?}, wait 3 secs to exit", sig);
thread::sleep(Duration::from_secs(3));
unsafe {
if Self::can_use_high() {
libc::close(HIGH_PRIORIT_FD);
}
libc::close(WRITE_PIPE_FD);
libc::close(READ_PIPE_FD);
}
}
}
});
}
Self { writer, reader }
Self { high_writer, writer, reader }
}
pub fn send_record(&mut self, rec: &Record) -> Result<(), Error> {
let mut w = self.writer.lock();
Expand All @@ -120,6 +175,54 @@ impl Client {
w.write_all(b"}\n")
}
}
pub fn send_record_high_priority(&mut self, rec: &Record) -> Result<(), Error> {

let mut w = self.high_writer.lock();
#[cfg(not(feature = "debug"))]
{
w.write_all(&rec.compute_size().to_le_bytes()[..])?;
rec.write_to_writer(&mut (*w)).map_err(|err| err.into())
}
#[cfg(feature = "debug")]
{
w.write_all(b"{\"data_type\":")?;
w.write_all(rec.data_type.to_string().as_bytes())?;
w.write_all(b",\"timestamp\":")?;
w.write_all(rec.timestamp.to_string().as_bytes())?;
w.write_all(b",\"data\":")?;
serde_json::to_writer(w.by_ref(), rec.get_data().get_fields())?;
w.write_all(b"}\n")
}
}

pub fn send_records_high_priority(&mut self, recs: &Vec<Record>) -> Result<(), Error> {
let mut w = self.high_writer.lock();
#[cfg(not(feature = "debug"))]
{
for rec in recs.iter() {
println!("send: {:?}", rec);
w.write_all(&rec.compute_size().to_le_bytes()[..])?;
rec.write_to_writer(&mut (*w))
.map_err(|err| -> std::io::Error { err.into() })?;
}
Ok(())
}
#[cfg(feature = "debug")]
{

for rec in recs.iter() {
w.write_all(b"{\"data_type\":")?;
w.write_all(rec.data_type.to_string().as_bytes())?;
w.write_all(b",\"timestamp\":")?;
w.write_all(rec.timestamp.to_string().as_bytes())?;
w.write_all(b",\"data\":")?;
serde_json::to_writer(w.by_ref(), rec.get_data().get_fields())?;
w.write_all(b"}\n")
}
Ok(())
}
}

pub fn send_records(&mut self, recs: &Vec<Record>) -> Result<(), Error> {
let mut w = self.writer.lock();
#[cfg(not(feature = "debug"))]
Expand Down
12 changes: 6 additions & 6 deletions rasp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ VCPKG_OVERLAY_PORTS ?= $(abspath overlay-ports)

.PHONY: all help install clean set-version agent-plugin nsenter pangolin jattach JVMAgent JVMProbe python-probe python-loader go-probe go-probe-ebpf node-probe php-probe librasp rasp-server NSMount

all: rasp-$(VERSION).tar.gz rasp-$(VERSION)-debug.tar.gz SHA256SUMS
all: rasp-linux-default-x86_64-$(VERSION).tar.gz rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz SHA256SUMS


help:
Expand All @@ -27,19 +27,19 @@ install: | $(OUTPUT)


clean:
rm -rf $(OUTPUT) $(DEBUG_SYMBOLS) rasp-$(VERSION).tar.gz rasp-$(VERSION)-debug.tar.gz SHA256SUMS
rm -rf $(OUTPUT) $(DEBUG_SYMBOLS) rasp-linux-default-x86_64-$(VERSION).tar.gz rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz SHA256SUMS


rasp-$(VERSION).tar.gz: rasp-$(VERSION)-debug.tar.gz
rasp-linux-default-x86_64-$(VERSION).tar.gz: rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz
cd $(OUTPUT) && tar -czvf ../$@ ./*


rasp-$(VERSION)-debug.tar.gz: | $(DEBUG_SYMBOLS)
rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz: | $(DEBUG_SYMBOLS)
tar -czvf $@ $(DEBUG_SYMBOLS)


SHA256SUMS: rasp-$(VERSION).tar.gz
sha256sum $(OUTPUT)/rasp rasp-$(VERSION).tar.gz > $@
SHA256SUMS: rasp-linux-default-x86_64-$(VERSION).tar.gz
sha256sum $(OUTPUT)/rasp rasp-linux-default-x86_64-$(VERSION).tar.gz > $@


set-version:
Expand Down
3 changes: 2 additions & 1 deletion rasp/golang/client/smith_client.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "smith_client.h"
#include "smith_probe.h"
#include <aio/ev/timer.h>
#include <aio/net/stream.h>
#include <zero/log.h>
Expand Down Expand Up @@ -80,7 +81,7 @@ startClient(const std::shared_ptr<aio::Context> &context) {
reason.code,
reason.message.c_str()
);

gProbe->discard_send++;
return zero::ptr::makeRef<aio::ev::Timer>(context)->setTimeout(1min);
});
});
Expand Down
5 changes: 4 additions & 1 deletion rasp/golang/client/smith_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ void to_json(nlohmann::json &j, const Heartbeat &heartbeat) {
j = {
{"filter", heartbeat.filter},
{"block", heartbeat.block},
{"limit", heartbeat.limit}
{"limit", heartbeat.limit},
{"discard_surplus", heartbeat.discard_surplus},
{"discard_post", heartbeat.discard_post},
{"discard_send", heartbeat.discard_send}
};
}

Expand Down
3 changes: 3 additions & 0 deletions rasp/golang/client/smith_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ struct Heartbeat {
std::string filter;
std::string block;
std::string limit;
int64_t discard_surplus;
int64_t discard_post;
int64_t discard_send;
};

struct Request {
Expand Down
11 changes: 9 additions & 2 deletions rasp/golang/client/smith_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ void startProbe() {
gProbe->quotas[classID][methodID] = it->second;
}

heartbeat->discard_surplus = gProbe->discard_surplus;
heartbeat->discard_post = gProbe->discard_post;
heartbeat->discard_send = gProbe->discard_send;
sender->trySend({HEARTBEAT, *heartbeat});
return true;
});
Expand Down Expand Up @@ -265,8 +268,12 @@ void startProbe() {
Trace trace = gProbe->buffer[*index];
gProbe->buffer.release(*index);

if (pass(trace, *filters))
sender->trySend({TRACE, gProbe->buffer[*index]});
if (pass(trace, *filters)) {
auto result = sender->trySend({TRACE, trace});
if (!result) {
gProbe->discard_send++;
}
}

P_CONTINUE(loop);
}
Expand Down
3 changes: 3 additions & 0 deletions rasp/golang/client/smith_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct Probe {
z_rwlock_t locks[CLASS_MAX][METHOD_MAX];
std::pair<size_t, Policy *> policies[CLASS_MAX][METHOD_MAX];
zero::atomic::CircularBuffer<Trace, TRACE_BUFFER_SIZE> buffer;
std::atomic<int64_t> discard_surplus;
std::atomic<int64_t> discard_post;
std::atomic<int64_t> discard_send;
};

void startProbe();
Expand Down
12 changes: 9 additions & 3 deletions rasp/golang/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,10 @@ struct APIEntry {

static bool handler(uintptr_t sp, uintptr_t g) {
if constexpr (ErrorIndex < 0) {
if (!surplus())
if (!surplus()) {
gProbe->discard_surplus++;
return true;
}
}

size_t FPSize = FLOAT_REGISTER * sizeof(double _Complex);
Expand Down Expand Up @@ -393,8 +395,10 @@ struct APIEntry {
return false;
}

if (!surplus())
if (!surplus()) {
gProbe->discard_surplus++;
return true;
}
}

post(trace);
Expand Down Expand Up @@ -481,8 +485,10 @@ struct APIEntry {
static void post(const Trace &trace) {
std::optional<size_t> index = gProbe->buffer.reserve();

if (!index)
if (!index) {
gProbe->discard_post++;
return;
}

gProbe->buffer[*index] = trace;
gProbe->buffer.commit(*index);
Expand Down
Loading

0 comments on commit 57c6a9d

Please sign in to comment.