Skip to content

Commit

Permalink
QATAPP-22544: Keep state between calls for stream API.
Browse files Browse the repository at this point in the history
Signed-off-by: Chengfei Zhu <chengfei.zhu@intel.com>
  • Loading branch information
Fei Xue authored and cfzhu committed Sep 2, 2021
1 parent 7f4bf27 commit 8c1ac44
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/qatzip_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ typedef struct QzStreamBuf_S {
unsigned char *in_buf;
unsigned char *out_buf;
unsigned int out_offset;
unsigned int in_offset;
unsigned int flush_more;
} QzStreamBuf_T;

typedef struct ThreadData_S {
Expand Down
60 changes: 51 additions & 9 deletions src/qatzip_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ int initStream(QzSession_T *sess, QzStream_T *strm)
}

stream_buf->out_offset = 0;
stream_buf->in_offset = 0;
stream_buf->flush_more = 0;
stream_buf->buf_len = qz_sess->sess_params.strm_buff_sz;
stream_buf->in_buf =
streamBufferAlloc(stream_buf->buf_len, NODE_0, PINNED_MEM);
Expand Down Expand Up @@ -359,6 +361,8 @@ int qzCompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
unsigned int copied_output = 0;
unsigned int copied_input = 0;
unsigned int copied_input_last = 0;
unsigned int copy_more = 1;
unsigned int inbuf_offset = 0;
unsigned int consumed = 0;
unsigned int produced = 0;
unsigned int strm_last = 0;
Expand Down Expand Up @@ -441,22 +445,37 @@ int qzCompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
0 == strm->pending_in &&
0 == strm->in_sz)) {
rc = QZ_OK;
/* When pending_out and pending_in are all greater than zero, we
* set the flush_more flag to indicate that we should not copy more
* input and should do the process(compression or decompression) */
if (strm->pending_in > 0) {
stream_buf->flush_more = 1;
}
goto done;
}
}

while (0 == strm->pending_out) {
copied_input_last = copied_input;
copied_input += copyStreamInput(strm, strm->in + consumed);

if (strm->pending_in < stream_buf->buf_len &&
last != 1) {
rc = QZ_OK;
goto done;
if (copy_more == 1 && stream_buf->flush_more != 1) {
copied_input_last = copied_input;
copied_input += copyStreamInput(strm, strm->in + consumed);

if (strm->pending_in < stream_buf->buf_len &&
last != 1) {
rc = QZ_OK;
goto done;
} else {
copy_more = 0;
}
}

input_len = strm->pending_in;
output_len = stream_buf->buf_len;
if (stream_buf->flush_more == 1) {
inbuf_offset = stream_buf->in_offset;
stream_buf->flush_more = 0;
}

strm_last = (0 == strm->in_sz && last) ? 1 : 0;
QZ_DEBUG("Before Call qzCompressCrc input_len %u output_len %u "
Expand All @@ -465,14 +484,16 @@ int qzCompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
input_len, output_len, strm->pending_in, strm->pending_out,
strm->in_sz, strm->out_sz);

rc = qzCompressCrc(sess, stream_buf->in_buf, &input_len,
rc = qzCompressCrc(sess, stream_buf->in_buf + inbuf_offset, &input_len,
stream_buf->out_buf, &output_len, strm_last, strm_crc);

strm->pending_in = 0;
strm->pending_in -= input_len;
strm->pending_out = output_len;
copied_output = copyStreamOutput(strm, strm->out + produced);
consumed += input_len;
produced += copied_output;
inbuf_offset += input_len;
stream_buf->in_offset = inbuf_offset;

QZ_DEBUG("After Call qzCompressCrc input_len %u output_len %u "
"stream->pending_in %u stream->pending_out %u "
Expand All @@ -498,6 +519,12 @@ int qzCompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
goto done;
}

if (0 == strm->pending_in) {
copy_more = 1;
inbuf_offset = 0;
}


if (0 == strm->pending_in && 0 == strm->in_sz) {
rc = QZ_OK;
goto done;
Expand Down Expand Up @@ -574,6 +601,12 @@ int qzDecompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
produced += copied_output;
if (0 == copied_output) {
rc = QZ_OK;
if (strm->pending_in > 0) {
/* We need to handle all the input that has left pending in the input buffer next time we are called.
* Otherwise we'd append additional bits to the pending data, violate the buffer's boundary and
* corrupt the memory behind the boundary. */
stream_buf->flush_more = 1;
}
QZ_DEBUG("No space for pending output...\n");
goto done;
}
Expand All @@ -582,7 +615,7 @@ int qzDecompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)

while (0 == strm->pending_out) {

if (1 == copy_more) {
if (1 == copy_more && stream_buf->flush_more != 1) {
copied_input_last = copied_input;
copied_input += copyStreamInput(strm, strm->in + consumed);

Expand All @@ -596,6 +629,14 @@ int qzDecompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
}
}

if (stream_buf->flush_more == 1) {
/* We need to flush all the input that has left pending in the input buffer since the previous call to this function.
* Otherwise we'd append additional bits to the pending data, violate the buffer's boundary and
* corrupt the memory behind the boundary. */
stream_buf->flush_more = 0;
inbuf_offset = stream_buf->in_offset;
}

input_len = strm->pending_in;
output_len = stream_buf->buf_len;

Expand All @@ -614,6 +655,7 @@ int qzDecompressStream(QzSession_T *sess, QzStream_T *strm, unsigned int last)
}

inbuf_offset += input_len;
stream_buf->in_offset = inbuf_offset;
consumed += input_len;
strm->pending_in -= input_len;
strm->pending_out = output_len;
Expand Down

0 comments on commit 8c1ac44

Please sign in to comment.