Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata): remove useless fields from s3streamobject #1573

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId,
long committedTimeInMs = objectMetadata.getTimestamp();
long objectSize = objectMetadata.getObjectSize();
int attributes = objectMetadata.getAttributes();
return new S3ObjectMetadata(object.objectId(), object.objectType(), List.of(object.streamOffsetRange()), object.dataTimeInMs(),
return new S3ObjectMetadata(object.objectId(), object.objectType(), List.of(object.streamOffsetRange()), objectMetadata.getTimestamp(),
committedTimeInMs, objectSize, S3StreamConstant.INVALID_ORDER_ID, attributes);
}).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ private ControllerResult<CommitStreamSetObjectResponseData> generateStreamObject
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(streamObjectCommitResult.records());
records.add(new S3StreamObject(streamObject.objectId(), streamObject.streamId(), streamObject.startOffset(), streamObject.endOffset(), committedTs).toRecord());
records.add(new S3StreamObject(streamObject.objectId(), streamObject.streamId(), streamObject.startOffset(), streamObject.endOffset()).toRecord(featureControlManager.autoMQVersion()));
} else {
log.info("stream already deleted, then fast delete the stream object from compaction. streamId={}, streamObject={}, streamSetObjectId={}, nodeId={}, nodeEpoch={}",
streamObject.streamId(), streamObject, req.objectId(), req.nodeId(), req.nodeEpoch());
Expand Down Expand Up @@ -769,6 +769,8 @@ private ControllerResult<CommitStreamSetObjectResponseData> verifyStreamContinuo
* </ul>
*/
public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(CommitStreamObjectRequestData data) {
AutoMQVersion version = featureControlManager.autoMQVersion();

int nodeId = data.nodeId();
long nodeEpoch = data.nodeEpoch();
long streamObjectId = data.objectId();
Expand Down Expand Up @@ -815,7 +817,6 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
}
List<ApiMessageAndVersion> records = new ArrayList<>(commitResult.records());

long dataTs = committedTs;
// mark destroy compacted object
if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) {
List<CompactOperations> operations;
Expand All @@ -832,12 +833,6 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(destroyResult.records());
// update dataTs to the min compacted object's dataTs
//noinspection OptionalGetWithoutIsPresent
dataTs = sourceObjectIds.stream()
.map(id -> this.streamsMetadata.get(streamId).streamObjects().get(id))
.map(S3StreamObject::dataTimeInMs)
.min(Long::compareTo).get();
}

if (streamObjectId != NOOP_OBJECT_ID) {
Expand All @@ -846,8 +841,7 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
.setObjectId(streamObjectId)
.setStreamId(streamId)
.setStartOffset(startOffset)
.setEndOffset(endOffset)
.setDataTimeInMs(dataTs), (short) 0));
.setEndOffset(endOffset), version.streamObjectRecordVersion()));
}

// generate compacted objects' remove record
Expand Down Expand Up @@ -1303,15 +1297,14 @@ public void replay(S3StreamObjectRecord record) {
long streamId = record.streamId();
long startOffset = record.startOffset();
long endOffset = record.endOffset();
long dataTs = record.dataTimeInMs();

S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata == null) {
// should not happen
log.error("streamId={} not exist when replay stream object record {}", streamId, record);
return;
}
streamMetadata.streamObjects().put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs));
streamMetadata.streamObjects().put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset));
// the offset continuous is ensured by the process layer
// when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset.
streamMetadata.endOffset(endOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
}
writer.write(version.streamRecordVersion(), record);
ranges.forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
streamObjectsMap.forEach((id, obj) -> writer.write(obj.toRecord()));
streamObjectsMap.forEach((id, obj) -> writer.write(obj.toRecord(version)));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@
import com.automq.stream.s3.metadata.StreamOffsetRange;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;

public class S3StreamObject {

private final long objectId;
private final long dataTimeInMs;
private final long streamId;
private final long startOffset;
private final long endOffset;

public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset, long dataTimeInMs) {
public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset) {
this.objectId = objectId;
this.streamId = streamId;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.dataTimeInMs = dataTimeInMs;
}

public StreamOffsetRange streamOffsetRange() {
Expand All @@ -66,28 +65,20 @@ public S3ObjectType objectType() {
return S3ObjectType.STREAM;
}

public long dataTimeInMs() {
return dataTimeInMs;
}

public ApiMessageAndVersion toRecord() {
public ApiMessageAndVersion toRecord(AutoMQVersion version) {
return new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(objectId)
.setStreamId(streamId)
.setStartOffset(startOffset)
.setEndOffset(endOffset)
.setDataTimeInMs(dataTimeInMs), (short) 0);
.setEndOffset(endOffset), version.streamObjectRecordVersion());
}

public S3ObjectMetadata toMetadata() {
return new S3ObjectMetadata(objectId, S3ObjectType.STREAM, List.of(streamOffsetRange()), dataTimeInMs);
return new S3ObjectMetadata(objectId, S3ObjectType.STREAM, List.of(streamOffsetRange()), 0L);
}

public static S3StreamObject of(S3StreamObjectRecord record) {
S3StreamObject s3StreamObject = new S3StreamObject(
record.objectId(), record.streamId(),
record.startOffset(), record.endOffset(), record.dataTimeInMs());
return s3StreamObject;
return new S3StreamObject(record.objectId(), record.streamId(), record.startOffset(), record.endOffset());
}

@Override
Expand All @@ -111,7 +102,6 @@ public int hashCode() {
public String toString() {
return "S3StreamObject{" +
"objectId=" + objectId +
", dataTimeInMs=" + dataTimeInMs +
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"apiKey": 505,
"type": "metadata",
"name": "S3StreamObjectRecord",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
Expand Down Expand Up @@ -47,7 +47,7 @@
{
"name": "DataTimeInMs",
"type": "int64",
"versions": "0+",
"versions": "0",
"about": "The data time of the S3 object"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ public void testCommitStreamObject() {
ControllerResult<CommitStreamSetObjectResponseData> result0 = manager.commitStreamSetObject(commitRequest0);
assertEquals(Errors.NONE.code(), result0.response().errorCode());
replay(manager, result0.records());
long object0DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(1L).dataTimeInMs();

// 3. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal
List<ObjectStreamRange> streamRanges1 = List.of(
Expand All @@ -982,7 +981,6 @@ public void testCommitStreamObject() {
ControllerResult<CommitStreamSetObjectResponseData> result1 = manager.commitStreamSetObject(commitRequest1);
assertEquals(Errors.NONE.code(), result1.response().errorCode());
replay(manager, result1.records());
long object1DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(3L).dataTimeInMs();

// 4. compact these two stream objects
CommitStreamObjectRequestData streamObjectRequest = new CommitStreamObjectRequestData()
Expand Down Expand Up @@ -1023,7 +1021,6 @@ public void testCommitStreamObject() {

// 7. verify stream objects
assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size());
assertEquals(object0DataTs, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).dataTimeInMs());
assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId());
assertEquals(0L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamOffsetRange().startOffset());
assertEquals(400L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamOffsetRange().endOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,18 @@ public void testStreamObjects() {
.setObjectId(0L)
.setStreamId(STREAM0)
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
.setEndOffset(100L), AutoMQVersion.LATEST.streamObjectRecordVersion()));
delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(1L)
.setStreamId(STREAM0)
.setStartOffset(100L)
.setEndOffset(200L), (short) 0));
.setEndOffset(200L), AutoMQVersion.LATEST.streamObjectRecordVersion()));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
new S3StreamObject(0L, 999, STREAM0, 0L),
new S3StreamObject(1L, 999, STREAM0, 100L)));
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);

Expand All @@ -176,7 +176,7 @@ public void testStreamObjects() {
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
new S3StreamObject(1L, 999, STREAM0, 100L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ public void testGetObjects(boolean isHugeCluster) throws ExecutionException, Int
new RangeMetadata(STREAM0, 3L, 3, 420L, 520L, BROKER1),
new RangeMetadata(STREAM0, 4L, 4, 520L, 600L, BROKER0));
List<S3StreamObject> streamObjects = List.of(
new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS),
new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS),
new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS));
new S3StreamObject(8, STREAM0, 10L, 100L),
new S3StreamObject(9, STREAM0, 200L, 300L),
new S3StreamObject(10, STREAM0, 300L, 400L));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));
Expand Down Expand Up @@ -273,8 +273,8 @@ public void testGetObjectsWithFirstStreamObject() throws ExecutionException, Int
new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0),
new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0));
List<S3StreamObject> streamObjects = List.of(
new S3StreamObject(8, STREAM0, 10L, 20L, S3StreamConstant.INVALID_TS),
new S3StreamObject(8, STREAM0, 40L, 60L, S3StreamConstant.INVALID_TS));
new S3StreamObject(8, STREAM0, 10L, 20L),
new S3StreamObject(8, STREAM0, 40L, 60L));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));
Expand All @@ -300,7 +300,7 @@ private S3StreamsMetadataImage createStreamImage() {
new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0),
new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0));
List<S3StreamObject> streamObjects = List.of(
new S3StreamObject(8, STREAM0, 20L, 40L, S3StreamConstant.INVALID_TS));
new S3StreamObject(8, STREAM0, 20L, 40L));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));
Expand All @@ -320,7 +320,7 @@ private S3StreamsMetadataImage generateStreamImage(long streamId, Range<Long> st
ranges.add(new RangeMetadata(streamId, 0L, rangeIndex,
i, i + step, BROKER0));

streamObjects.add(new S3StreamObject(objectId, streamId, i, i + step, S3StreamConstant.INVALID_TS));
streamObjects.add(new S3StreamObject(objectId, streamId, i, i + step));
rangeIndex++;
objectId++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public short streamSetObjectRecordVersion() {
}
}

public short streamObjectRecordVersion() {
if (isAtLeast(V2)) {
return 1;
} else {
return 0;
}
}

public Version s3streamVersion() {
return s3streamVersion;
}
Expand Down
Loading