Skip to content

Commit

Permalink
Added support to upload segments in batch mode with METADATA upload type
Browse files Browse the repository at this point in the history
  • Loading branch information
rajagopr committed Jul 17, 2024
1 parent c79d97a commit 8a81fbf
Show file tree
Hide file tree
Showing 12 changed files with 1,265 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static FileUploadType getDefaultUploadType() {
private static final String SCHEMA_PATH = "/schemas";
private static final String OLD_SEGMENT_PATH = "/segments";
private static final String SEGMENT_PATH = "/v2/segments";
private static final String SEGMENT_UPLOAD_BATCH_PATH = "/v3/segments";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "type=";
private static final String START_REPLACE_SEGMENTS_PATH = "/startReplaceSegments";
Expand Down Expand Up @@ -365,6 +366,12 @@ public static URI getUploadSegmentURI(URI controllerURI)
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
}

public static URI getUploadSegmentBatchURI(URI controllerURI)
throws URISyntaxException {
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),
SEGMENT_UPLOAD_BATCH_PATH);
}

public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType,
boolean forceCleanup)
throws URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -128,6 +130,14 @@ public void testSendFileWithJson()
}
}

@Test
public void testGetUploadSegmentListURI()
throws URISyntaxException {
URI controllerURI = new URI("https://myhost:9443");
URI uriWithEndpoint = FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI);
Assert.assertEquals(new URI("https://myhost:9443/v3/segments"), uriWithEndpoint);
}

@AfterClass
public void shutDown() {
_testServer.stop(0);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.upload;

import java.io.File;
import java.net.URI;
import java.util.Objects;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.segment.spi.SegmentMetadata;


/**
* Data object used while adding or updating segments. It's comprised of the following fields:
* <ol>
* <li>segmentDownloadURIStr – The segment download URI persisted into the ZK metadata.</li>
* <li>sourceDownloadURIStr – The URI from where the segment could be downloaded.</li>
* <li>finalSegmentLocationURI – The final location of the segment in the deep-store.</li>
* <li>segmentSizeInBytes – The segment size in bytes.</li>
* <li>segmentMetadata – The segment metadata as defined in {@link org.apache.pinot.segment.spi.SegmentMetadata}.</li>
* <li>encryptionInfo – A pair consisting of the crypter class used to encrypt the segment, and the encrypted segment
* file.</li>
* <li>segmentMetadataZNRecord – The segment metadata represented as a helix
* {@link org.apache.helix.zookeeper.datamodel.ZNRecord}.</li>
* </ol>
*/
public class SegmentUploadMetadata {
private final String _segmentDownloadURIStr;
private final String _sourceDownloadURIStr;
private final URI _finalSegmentLocationURI;
// Segment size reported in bytes.
private final Long _segmentSizeInBytes;
// The segment met
private final SegmentMetadata _segmentMetadata;
// The crypter class used to encrypt the segment, The encrypted segment file.
private final Pair<String, File> _encryptionInfo;
private ZNRecord _segmentMetadataZNRecord;

public SegmentUploadMetadata(String segmentDownloadURIStr, String sourceDownloadURIStr, URI finalSegmentLocationURI,
Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair<String, File> encryptionInfo) {
_segmentDownloadURIStr = segmentDownloadURIStr;
_sourceDownloadURIStr = sourceDownloadURIStr;
_segmentSizeInBytes = segmentSizeInBytes;
_segmentMetadata = segmentMetadata;
_encryptionInfo = encryptionInfo;
_finalSegmentLocationURI = finalSegmentLocationURI;
}

public String getSegmentDownloadURIStr() {
return _segmentDownloadURIStr;
}

public String getSourceDownloadURIStr() {
return _sourceDownloadURIStr;
}

public URI getFinalSegmentLocationURI() {
return _finalSegmentLocationURI;
}

public Long getSegmentSizeInBytes() {
return _segmentSizeInBytes;
}

public SegmentMetadata getSegmentMetadata() {
return _segmentMetadata;
}

public Pair<String, File> getEncryptionInfo() {
return _encryptionInfo;
}

public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) {
_segmentMetadataZNRecord = segmentMetadataZNRecord;
}

public ZNRecord getSegmentMetadataZNRecord() {
return _segmentMetadataZNRecord;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentUploadMetadata that = (SegmentUploadMetadata) o;
return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr)
&& Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr)
&& Objects.equals(_finalSegmentLocationURI, that._finalSegmentLocationURI)
&& Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes)
&& Objects.equals(_segmentMetadata, that._segmentMetadata)
&& Objects.equals(_encryptionInfo, that._encryptionInfo)
&& Objects.equals(_segmentMetadataZNRecord, that._segmentMetadataZNRecord);
}

@Override
public int hashCode() {
return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr, _finalSegmentLocationURI,
_segmentSizeInBytes, _segmentMetadata, _encryptionInfo, _segmentMetadataZNRecord);
}
}
Loading

0 comments on commit 8a81fbf

Please sign in to comment.