Skip to content

Commit

Permalink
[ISSUE #1228]🚀Implement Java MessageDecoder class encodeUniquely and …
Browse files Browse the repository at this point in the history
…encode method (#1229)
  • Loading branch information
mxsm authored Nov 19, 2024
1 parent f06b4ff commit c2281c4
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use rocketmq_rust::ArcMut;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tokio::sync::Semaphore;
use tokio_util::bytes::Bytes;
use tracing::warn;

use crate::base::client_config::ClientConfig;
Expand Down Expand Up @@ -1246,7 +1245,7 @@ impl DefaultMQProducerImpl {
.compress(body, self.producer_config.compress_level());
if let Ok(data) = data {
//store the compressed data
msg.set_compressed_body_mut(Bytes::from(data));
msg.set_compressed_body_mut(data);
return true;
}
}
Expand Down
16 changes: 8 additions & 8 deletions rocketmq-common/src/common/compression/compression_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,37 @@ impl CompressionType {
}
}

pub fn compression(&self, data: &Bytes) -> Bytes {
pub fn compression(&self, data: &[u8]) -> Bytes {
match self {
CompressionType::LZ4 => {
let compressed = compress_prepend_size(data.chunk());
let compressed = compress_prepend_size(data);
Bytes::from(compressed)
}
CompressionType::Zstd => {
let compressed = zstd::encode_all(data.clone().reader(), 5).unwrap();
let compressed = zstd::encode_all(data.reader(), 5).unwrap();
Bytes::from(compressed)
}
CompressionType::Zlib => {
let mut zlib_encoder = ZlibEncoder::new(Vec::new(), Compression::default());
let _ = zlib_encoder.write_all(data.chunk());
let _ = zlib_encoder.write_all(data);
let result = zlib_encoder.finish().unwrap();
Bytes::from(result)
}
}
}

pub fn decompression(&self, data: &Bytes) -> Bytes {
pub fn decompression(&self, data: &[u8]) -> Bytes {
match self {
CompressionType::LZ4 => {
let compressed = decompress_size_prepended(data.chunk()).unwrap();
let compressed = decompress_size_prepended(data).unwrap();
Bytes::from(compressed)
}
CompressionType::Zstd => {
let compressed = zstd::decode_all(data.clone().reader()).unwrap();
let compressed = zstd::decode_all(data.reader()).unwrap();
Bytes::from(compressed)
}
CompressionType::Zlib => {
let mut zlib_encoder = ZlibDecoder::new(data.clone().reader());
let mut zlib_encoder = ZlibDecoder::new(data.reader());
let mut decompressed_data = Vec::new();
zlib_encoder.read_to_end(&mut decompressed_data).unwrap();
Bytes::from(decompressed_data)
Expand Down
8 changes: 6 additions & 2 deletions rocketmq-common/src/common/compression/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use bytes::Bytes;

use crate::Result;

pub trait Compressor {
/// Compress message by different compressor.
///
Expand All @@ -25,7 +29,7 @@ pub trait Compressor {
/// # Returns
///
/// Compressed byte data or an `std::io::Error`.
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, std::io::Error>;
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes>;

/// Decompress message by different compressor.
///
Expand All @@ -36,5 +40,5 @@ pub trait Compressor {
/// # Returns
///
/// Decompressed byte data or an `std::io::Error`.
fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, std::io::Error>;
fn decompress(&self, src: &[u8]) -> Result<Bytes>;
}
12 changes: 7 additions & 5 deletions rocketmq-common/src/common/compression/lz4_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::io::Error;
use bytes::Bytes;

use crate::common::compression::compression_type::CompressionType;
use crate::common::compression::compressor::Compressor;
use crate::Result;

pub struct Lz4Compressor;

impl Compressor for Lz4Compressor {
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
todo!()
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
Ok(CompressionType::LZ4.compression(src))
}

fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
todo!()
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
Ok(CompressionType::LZ4.decompression(src))
}
}
12 changes: 8 additions & 4 deletions rocketmq-common/src/common/compression/zlib_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
*/
use std::io::Error;

use bytes::Bytes;

use crate::common::compression::compression_type::CompressionType;
use crate::common::compression::compressor::Compressor;
use crate::Result;

pub struct ZlibCompressor;

impl Compressor for ZlibCompressor {
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
todo!()
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
Ok(CompressionType::Zlib.compression(src))
}

fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
todo!()
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
Ok(CompressionType::Zlib.decompression(src))
}
}
12 changes: 8 additions & 4 deletions rocketmq-common/src/common/compression/zstd_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
*/
use std::io::Error;

use bytes::Bytes;

use crate::common::compression::compression_type::CompressionType;
use crate::common::compression::compressor::Compressor;
use crate::Result;

pub struct ZstdCompressor;

impl Compressor for ZstdCompressor {
fn compress(&self, src: &[u8], level: i32) -> Result<Vec<u8>, Error> {
todo!()
fn compress(&self, src: &[u8], level: i32) -> Result<Bytes> {
Ok(CompressionType::Zstd.compression(src))
}

fn decompress(&self, src: &[u8]) -> Result<Vec<u8>, Error> {
todo!()
fn decompress(&self, src: &[u8]) -> Result<Bytes> {
Ok(CompressionType::Zstd.decompression(src))
}
}
Loading

0 comments on commit c2281c4

Please sign in to comment.