Skip to content

Commit

Permalink
Fix log_parsing pipeline null output issue
Browse files Browse the repository at this point in the history
  • Loading branch information
yczhang-nv committed Oct 30, 2024
1 parent 6f89f83 commit 1a1c8a7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
11 changes: 6 additions & 5 deletions examples/log_parsing/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(ControlMessage)

@staticmethod
def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: TensorMemory) -> ControlMessage:
def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: TensorMemory,
batch_offset: int) -> ControlMessage:
memory = output.tensors()

out_seq_ids = memory.get_tensor('seq_ids')
Expand All @@ -153,17 +154,17 @@ def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: Tens
seq_offset = seq_ids[0, 0].item()
seq_count = seq_ids[-1, 0].item() + 1 - seq_offset

input_ids[0:inf.tensors().count, :] = inf.tensors().get_tensor('input_ids')
out_seq_ids[0:inf.tensors().count, :] = seq_ids
input_ids[batch_offset:inf.tensors().count + batch_offset, :] = inf.tensors().get_tensor('input_ids')
out_seq_ids[batch_offset:inf.tensors().count + batch_offset, :] = seq_ids

resp_confidences = res.get_tensor('confidences')
resp_labels = res.get_tensor('labels')

# Two scenarios:
if (inf.payload().count == inf.tensors().count):
assert seq_count == res.count
confidences[0:inf.tensors().count, :] = resp_confidences
labels[0:inf.tensors().count, :] = resp_labels
confidences[batch_offset:inf.tensors().count + batch_offset, :] = resp_confidences
labels[batch_offset:inf.tensors().count + batch_offset, :] = resp_labels
else:
assert inf.tensors().count == res.count

Expand Down
10 changes: 7 additions & 3 deletions python/morpheus/morpheus/stages/inference/inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,18 @@ def on_next(message: ControlMessage):

fut_list = []

batch_offset = 0

for batch in batches:
outstanding_requests += 1

completion_future = mrc.Future()

def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future):
nonlocal outstanding_requests
mess = self._convert_one_response(output_message, inner_batch, resp)

nonlocal batch_offset
mess = self._convert_one_response(output_message, inner_batch, resp, batch_offset)
batch_offset += inner_batch.tensors().count
outstanding_requests -= 1

batch_future.set_result(mess)
Expand Down Expand Up @@ -340,7 +343,8 @@ def _split_batches(msg: ControlMessage, max_batch_size: int) -> typing.List[Cont
return out_resp

@staticmethod
def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: TensorMemory):
def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: TensorMemory,
batch_offset: int) -> ControlMessage: # pylint:disable=unused-argument
# Make sure we have a continuous list
# assert inf.mess_offset == saved_offset + saved_count

Expand Down

0 comments on commit 1a1c8a7

Please sign in to comment.