Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

fix: copy Attributes on Splitting Result Sets #147

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## v1.27.0 [unreleased]

### Bug Fixes
1. [#147](https://github.com/influxdata/nifi-influxdb-bundle/pull/147): Copy Attributes on Splitting Result Sets

## v1.26.0 [2023-12-05]

### Others
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,14 @@ private QueryProcessor(final String org,
if (recordsPerFlowFile == -1 || createdFlowFile) {
this.flowFile = flowFile;
} else {
this.flowFile = session.create();
FlowFile newflowFile = session.create();
if (!createdFlowFile) {
newflowFile = session.putAllAttributes(newflowFile, flowFile.getAttributes());
}
this.flowFile = newflowFile;
this.flowFiles.add(this.flowFile);
}

this.org = org;
this.recordsPerFlowFile = recordsPerFlowFile;
this.query = query;
Expand Down Expand Up @@ -375,7 +380,9 @@ private void beforeOnResponse() {
recordIndex++;
if (recordsPerFlowFile != -1 && recordIndex > recordsPerFlowFile) {
closeRecordWriter();
flowFile = session.create();
FlowFile newflowFile = session.create();
newflowFile = session.putAllAttributes(newflowFile, flowFile.getAttributes());
flowFile = newflowFile;
flowFiles.add(flowFile);
recordIndex = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

import com.influxdb.query.FluxRecord;

import java.util.Map;
import java.util.HashMap;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -82,6 +87,45 @@ public void success() {
Assert.assertEquals("InfluxDB rocks!", nifiRecord.getValue("string_value"));
}


@Test
public void singleFlowFileAttributes() {

// Create input flowfile
Map<String, String> inputAttributes = new HashMap<>();
inputAttributes.put("Testattribute", "testvalue");
runner.enqueue("someContent", inputAttributes);

// Add FluxRecords to queryOnResponseRecords
FluxRecord fluxRecord1 = new FluxRecord(0);
fluxRecord1.getValues().put("value", 1L);

queryOnResponseRecords.add(fluxRecord1);

runner.run();

// Verify that the flowfile has the expected attributes
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
Assert.assertEquals(1, writer.getRecordsWritten().size());

for (FlowFile outputFlowFile: runner.getFlowFilesForRelationship(REL_SUCCESS))
{
Map<String, String> outputAttributes = outputFlowFile.getAttributes();

for (Map.Entry<String, String> entry : inputAttributes.entrySet()) {
Assert.assertTrue(outputAttributes.containsKey(entry.getKey()));
Assert.assertEquals(entry.getValue(), outputAttributes.get(entry.getKey()));
}
}

Record nifiRecord = writer.getRecordsWritten().get(0);
RecordSchema schema = nifiRecord.getSchema();

Assert.assertEquals(1, schema.getFieldCount());
Assert.assertEquals(RecordFieldType.LONG, schema.getField("value").get().getDataType().getFieldType());
Assert.assertEquals(1L, nifiRecord.getValue("value"));
}

@Test
public void moreRecords() {

Expand Down Expand Up @@ -197,6 +241,69 @@ public void useMoreFlowFiles() {

}

@Test
public void useMoreFlowFilesWithAttributes() {

FluxRecord fluxRecord1 = new FluxRecord(0);
fluxRecord1.getValues().put("value", 1L);

FluxRecord fluxRecord2 = new FluxRecord(0);
fluxRecord2.getValues().put("value", 2L);

FluxRecord fluxRecord3 = new FluxRecord(0);
fluxRecord3.getValues().put("value", 3L);

FluxRecord fluxRecord4 = new FluxRecord(0);
fluxRecord4.getValues().put("value", 4L);

FluxRecord fluxRecord5 = new FluxRecord(0);
fluxRecord5.getValues().put("value", 5L);

FluxRecord fluxRecord6 = new FluxRecord(0);
fluxRecord6.getValues().put("value", 6L);

queryOnResponseRecords.add(fluxRecord1);
queryOnResponseRecords.add(fluxRecord2);
queryOnResponseRecords.add(fluxRecord3);
queryOnResponseRecords.add(fluxRecord4);
queryOnResponseRecords.add(fluxRecord5);
queryOnResponseRecords.add(fluxRecord6);

// Create input flowfile
Map<String, String> inputAttributes = new HashMap<>();
inputAttributes.put("Testattribute", "testvalue");
runner.enqueue("someContent", inputAttributes);

runner.setProperty(GetInfluxDatabase_2.RECORDS_PER_FLOWFILE, "2");

runner.enqueue("");
runner.run();

runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4);
Assert.assertEquals(6, writer.getRecordsWritten().size());

List<MockFlowFile> outputFlowFiles = runner.getFlowFilesForRelationship(REL_SUCCESS);

for (FlowFile outputFlowFile: runner.getFlowFilesForRelationship(REL_SUCCESS))
{
Map<String, String> outputAttributes = outputFlowFile.getAttributes();

for (Map.Entry<String, String> entry : inputAttributes.entrySet()) {
Assert.assertTrue(outputAttributes.containsKey(entry.getKey()));
Assert.assertEquals(entry.getValue(), outputAttributes.get(entry.getKey()));
}
}

for (int i = 0; i < 6; i++) {
Record nifiRecord = writer.getRecordsWritten().get(i);
RecordSchema schema = nifiRecord.getSchema();

Assert.assertEquals(RecordFieldType.LONG, schema.getField("value").get().getDataType().getFieldType());
Assert.assertEquals((long) i + 1, nifiRecord.getValue("value"));
}

}

@Test
public void containsProvenanceReport() {

Expand Down
Loading