Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(raw): Merge all operations into one enum #4977

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
use await_tree::InstrumentAwait;
use futures::Future;
use futures::FutureExt;
use oio::ListOperation;
use oio::ReadOperation;
use oio::WriteOperation;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -182,7 +179,7 @@ impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
self.inner
.read()
.instrument_await(format!("opendal::{}", ReadOperation::Read))
.instrument_await(format!("opendal::{}", Operation::ReaderRead))
.await
}
}
Expand All @@ -197,19 +194,19 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}", WriteOperation::Write.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterWrite.into_static()))
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.abort()
.instrument_await(format!("opendal::{}", WriteOperation::Abort.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterAbort.into_static()))
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.close()
.instrument_await(format!("opendal::{}", WriteOperation::Close.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterClose.into_static()))
}
}

Expand All @@ -227,7 +224,7 @@ impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner
.next()
.instrument_await(format!("opendal::{}", ListOperation::Next))
.instrument_await(format!("opendal::{}", Operation::ListerNext))
.await
}
}
Expand Down
21 changes: 9 additions & 12 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ use std::sync::Arc;

use futures::TryFutureExt;

use crate::raw::oio::ListOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -358,7 +355,7 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::Read)
err.with_operation(Operation::ReaderRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
Expand All @@ -376,7 +373,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
err.with_operation(Operation::BlockingReaderRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
Expand All @@ -395,7 +392,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
err.with_operation(Operation::WriterWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
Expand All @@ -405,7 +402,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
err.with_operation(Operation::WriterClose)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
Expand All @@ -414,7 +411,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
err.with_operation(WriteOperation::Abort)
err.with_operation(Operation::WriterAbort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("processed", self.processed.to_string())
Expand All @@ -431,7 +428,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
err.with_operation(Operation::BlockingWriterWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
Expand All @@ -441,7 +438,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {

fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
err.with_operation(WriteOperation::BlockingClose)
err.with_operation(Operation::BlockingWriterClose)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
Expand All @@ -459,7 +456,7 @@ impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ListOperation::Next)
err.with_operation(Operation::ListerNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
Expand All @@ -476,7 +473,7 @@ impl<T: oio::BlockingList> oio::BlockingList for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ListOperation::BlockingNext)
err.with_operation(Operation::BlockingListerNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
Expand Down
19 changes: 9 additions & 10 deletions core/src/layers/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use std::sync::Arc;
use fastrace::prelude::*;
use futures::FutureExt;

use crate::raw::oio::ListOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -301,41 +298,43 @@ impl<R: oio::Read> oio::Read for FastraceWrapper<R> {
impl<R: oio::BlockingRead> oio::BlockingRead for FastraceWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::BlockingReaderRead.into_static());
self.inner.read()
}
}

impl<R: oio::Write> oio::Write for FastraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterWrite.into_static());
self.inner.write(bs)
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Abort.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterAbort.into_static());
self.inner.abort()
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Close.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterClose.into_static());
self.inner.close()
}
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for FastraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
let _span =
LocalSpan::enter_with_local_parent(Operation::BlockingWriterWrite.into_static());
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static());
let _span =
LocalSpan::enter_with_local_parent(Operation::BlockingWriterClose.into_static());
self.inner.close()
}
}
Expand All @@ -350,7 +349,7 @@ impl<R: oio::List> oio::List for FastraceWrapper<R> {
impl<R: oio::BlockingList> oio::BlockingList for FastraceWrapper<R> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ListOperation::BlockingNext.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::BlockingListerNext.into_static());
self.inner.next()
}
}
26 changes: 12 additions & 14 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use log::log;
use log::trace;
use log::Level;

use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -988,7 +986,7 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> read returns {}B",
self.ctx.scheme,
ReadOperation::Read,
Operation::ReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
bs.remaining()
Expand All @@ -1002,7 +1000,7 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
lvl,
"service={} operation={} path={} read={} -> read failed: {}",
self.ctx.scheme,
ReadOperation::Read,
Operation::ReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
self.ctx.error_print(&err),
Expand All @@ -1024,7 +1022,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> read returns {}B",
self.ctx.scheme,
ReadOperation::BlockingRead,
Operation::BlockingReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
bs.remaining()
Expand All @@ -1038,7 +1036,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
lvl,
"service={} operation={} path={} read={} -> read failed: {}",
self.ctx.scheme,
ReadOperation::BlockingRead,
Operation::BlockingReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1081,7 +1079,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::Write,
Operation::WriterWrite,
self.path,
self.written,
size,
Expand All @@ -1095,7 +1093,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data write failed: {}",
self.ctx.scheme,
WriteOperation::Write,
Operation::WriterWrite,
self.path,
self.written,
self.ctx.error_print(&err),
Expand All @@ -1113,7 +1111,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> abort writer",
self.ctx.scheme,
WriteOperation::Abort,
Operation::WriterAbort,
self.path,
self.written,
);
Expand All @@ -1126,7 +1124,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> abort writer failed: {}",
self.ctx.scheme,
WriteOperation::Abort,
Operation::WriterAbort,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1157,7 +1155,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data close failed: {}",
self.ctx.scheme,
WriteOperation::Close,
Operation::WriterClose,
self.path,
self.written,
self.ctx.error_print(&err),
Expand All @@ -1177,7 +1175,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::BlockingWrite,
Operation::BlockingWriterWrite,
self.path,
self.written,
bs.len(),
Expand All @@ -1191,7 +1189,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data write failed: {}",
self.ctx.scheme,
WriteOperation::BlockingWrite,
Operation::BlockingWriterWrite,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1222,7 +1220,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data close failed: {}",
self.ctx.scheme,
WriteOperation::BlockingClose,
Operation::BlockingWriterClose,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use prometheus::register_int_counter_vec_with_registry;
use prometheus::HistogramVec;
use prometheus::Registry;

use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::Access;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -681,7 +679,7 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
ReadOperation::Read.into_static(),
Operation::ReaderRead.into_static(),
&self.path,
);

Expand Down Expand Up @@ -713,7 +711,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
ReadOperation::BlockingRead.into_static(),
Operation::BlockingReaderRead.into_static(),
&self.path,
);

Expand Down Expand Up @@ -747,7 +745,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Write.into_static(),
Operation::WriterWrite.into_static(),
&self.path,
);

Expand Down Expand Up @@ -777,7 +775,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn abort(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Abort.into_static(),
Operation::WriterAbort.into_static(),
&self.path,
);

Expand All @@ -801,7 +799,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn close(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Close.into_static(),
Operation::WriterClose.into_static(),
&self.path,
);

Expand Down Expand Up @@ -859,7 +857,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn close(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::BlockingClose.into_static(),
Operation::BlockingWriterClose.into_static(),
&self.path,
);

Expand Down
Loading
Loading