Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify tablet usage #358

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter)
measurementSchemas.add(new MeasurementSchema(SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
measurementSchemas.add(new MeasurementSchema(SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
Tablet tablet = new Tablet(DEVICE_2, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
int rowNum = 100;
int sensorNum = measurementSchemas.size();
long timestamp = 1;
Expand All @@ -143,8 +141,7 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter)
int row = tablet.getRowSize();
tablet.addTimestamp(row, timestamp++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
tablet.addValue(row, i, value);
}
// write
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ private static void writeWithTablet(
long startValue)
throws IOException, WriteProcessException {
Tablet tablet = new Tablet(deviceId, schemas);
long[] timestamps = tablet.timestamps;
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.getRowSize();
timestamps[row] = startTime++;
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
tablet.addValue(
schemas.get(i).getMeasurementName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected Field getNonNullField(int columnIndex) {
}

protected Field getField(int columnIndex) {
if (columnIndex > this.columnNameToColumnIndexMap.size()) {
if (columnIndex > this.columnNameToColumnIndexMap.size() || columnIndex <= 0) {
throw new IndexOutOfBoundsException("column index " + columnIndex + " out of bound");
}
Field field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ public static void writeWithTablet(
boolean isAligned)
throws IOException, WriteProcessException {
Tablet tablet = new Tablet(deviceId, schemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.getRowSize();
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = startValue;
tablet.addValue(row, i, startValue);
}
// write
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
// TODO: changing to a column-first style by calculating the remaining page space of each
// column firsts
for (int row = startRowIndex; row < endRowIndex; row++) {
long time = tablet.timestamps[row];
long time = tablet.getTimestamps()[row];
checkIsHistoryData(time);
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); columnIndex++) {
if (tablet.getColumnTypes() != null
Expand All @@ -242,42 +242,43 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
}

boolean isNull =
tablet.bitMaps != null
&& tablet.bitMaps[columnIndex] != null
&& tablet.bitMaps[columnIndex].isMarked(row);
tablet.getBitMaps() != null
&& tablet.getBitMaps()[columnIndex] != null
&& tablet.getBitMaps()[columnIndex].isMarked(row);
// check isNull by bitMap in tablet
ValueChunkWriter valueChunkWriter =
tryToAddSeriesWriterInternal(measurementSchemas.get(columnIndex));
switch (measurementSchemas.get(columnIndex).getType()) {
case BOOLEAN:
valueChunkWriter.write(time, ((boolean[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(
time, ((boolean[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case INT32:
valueChunkWriter.write(time, ((int[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((int[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case DATE:
valueChunkWriter.write(
time,
isNull
? 0
: DateUtils.parseDateExpressionToInt(
((LocalDate[]) tablet.values[columnIndex])[row]),
((LocalDate[]) tablet.getValues()[columnIndex])[row]),
isNull);
break;
case INT64:
case TIMESTAMP:
valueChunkWriter.write(time, ((long[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((long[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case FLOAT:
valueChunkWriter.write(time, ((float[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((float[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case DOUBLE:
valueChunkWriter.write(time, ((double[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((double[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case TEXT:
case BLOB:
case STRING:
valueChunkWriter.write(time, ((Binary[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((Binary[]) tablet.getValues()[columnIndex])[row], isNull);
break;
default:
throw new UnSupportedDataTypeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,42 +121,51 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
pointCount = 0;
for (int row = startRowIndex; row < endRowIndex; row++) {
// check isNull in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[column] != null
&& tablet.bitMaps[column].isMarked(row)) {
if (tablet.getBitMaps() != null
&& tablet.getBitMaps()[column] != null
&& tablet.getBitMaps()[column].isMarked(row)) {
continue;
}
long time = tablet.timestamps[row];
long time = tablet.getTimestamps()[row];
checkIsHistoryData(measurementId, time);
pointCount++;
switch (tsDataType) {
case INT32:
chunkWriters.get(measurementId).write(time, ((int[]) tablet.values[column])[row]);
chunkWriters.get(measurementId).write(time, ((int[]) tablet.getValues()[column])[row]);
break;
case DATE:
chunkWriters
.get(measurementId)
.write(
time,
DateUtils.parseDateExpressionToInt(((LocalDate[]) tablet.values[column])[row]));
DateUtils.parseDateExpressionToInt(
((LocalDate[]) tablet.getValues()[column])[row]));
break;
case INT64:
case TIMESTAMP:
chunkWriters.get(measurementId).write(time, ((long[]) tablet.values[column])[row]);
chunkWriters.get(measurementId).write(time, ((long[]) tablet.getValues()[column])[row]);
break;
case FLOAT:
chunkWriters.get(measurementId).write(time, ((float[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((float[]) tablet.getValues()[column])[row]);
break;
case DOUBLE:
chunkWriters.get(measurementId).write(time, ((double[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((double[]) tablet.getValues()[column])[row]);
break;
case BOOLEAN:
chunkWriters.get(measurementId).write(time, ((boolean[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((boolean[]) tablet.getValues()[column])[row]);
break;
case TEXT:
case BLOB:
case STRING:
chunkWriters.get(measurementId).write(time, ((Binary[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((Binary[]) tablet.getValues()[column])[row]);
break;
default:
throw new UnSupportedDataTypeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,11 @@ public class Tablet {
/** MeasurementId->indexOf({@link MeasurementSchema}) */
private final Map<String, Integer> measurementIndex;

/** Timestamps in this {@link Tablet} */
public long[] timestamps;
private long[] timestamps;

/** Each object is a primitive type array, which represents values of one measurement */
public Object[] values;
private Object[] values;

/** Each {@link BitMap} represents the existence of each value in the current column. */
public BitMap[] bitMaps;
private BitMap[] bitMaps;

/**
* For compatibility with the usage of directly modifying Tablet content through public fields.
Expand Down Expand Up @@ -292,6 +289,7 @@ public void initBitMaps() {
public void addTimestamp(int rowIndex, long timestamp) {
timestamps[rowIndex] = timestamp;
this.rowSize = Math.max(this.rowSize, rowIndex + 1);
initBitMapsWithApiUsage();
}

public void addValue(final String measurementId, final int rowIndex, final Object value) {
Expand Down Expand Up @@ -413,6 +411,10 @@ public void addValue(int rowIndex, String measurement, int val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, int val) {
if (!(values[columnIndex] instanceof int[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not INT32");
}
final int[] sensor = (int[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -426,6 +428,10 @@ public void addValue(int rowIndex, String measurement, long val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, long val) {
if (!(values[columnIndex] instanceof long[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not INT64/TIMESTAMP");
}
final long[] sensor = (long[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -439,6 +445,10 @@ public void addValue(int rowIndex, String measurement, float val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, float val) {
if (!(values[columnIndex] instanceof float[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not FLOAT");
}
final float[] sensor = (float[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -452,6 +462,10 @@ public void addValue(int rowIndex, String measurement, double val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, double val) {
if (!(values[columnIndex] instanceof double[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not DOUBLE");
}
final double[] sensor = (double[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -465,6 +479,10 @@ public void addValue(int rowIndex, String measurement, boolean val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, boolean val) {
if (!(values[columnIndex] instanceof boolean[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not BOOLEAN");
}
final boolean[] sensor = (boolean[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -478,6 +496,10 @@ public void addValue(int rowIndex, String measurement, String val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, String val) {
if (!(values[columnIndex] instanceof Binary[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not TEXT/STRING/BLOB");
}
final Binary[] sensor = (Binary[]) values[columnIndex];
sensor[rowIndex] = new Binary(val, TSFileConfig.STRING_CHARSET);
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -491,6 +513,10 @@ public void addValue(int rowIndex, String measurement, byte[] val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, byte[] val) {
if (!(values[columnIndex] instanceof Binary[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not TEXT/STRING/BLOB");
}
final Binary[] sensor = (Binary[]) values[columnIndex];
sensor[rowIndex] = new Binary(val);
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -504,6 +530,10 @@ public void addValue(int rowIndex, String measurement, LocalDate val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, LocalDate val) {
if (!(values[columnIndex] instanceof LocalDate[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not DATE");
}
final LocalDate[] sensor = (LocalDate[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -521,6 +551,15 @@ private int getColumnIndexByMeasurement(String measurement) {
}

private void updateBitMap(int rowIndex, int columnIndex, boolean mark) {
initBitMapsWithApiUsage();
if (mark) {
bitMaps[columnIndex].mark(rowIndex);
} else {
bitMaps[columnIndex].unmark(rowIndex);
}
}

private void initBitMapsWithApiUsage() {
if (bitMaps == null) {
initBitMaps();
}
Expand All @@ -530,11 +569,6 @@ private void updateBitMap(int rowIndex, int columnIndex, boolean mark) {
bitMap.markAll();
}
}
if (mark) {
bitMaps[columnIndex].mark(rowIndex);
} else {
bitMaps[columnIndex].unmark(rowIndex);
}
}

public List<IMeasurementSchema> getSchemas() {
Expand Down Expand Up @@ -1195,6 +1229,34 @@ public void setRowSize(int rowSize) {
this.rowSize = rowSize;
}

public long getTimestamp(int i) {
return timestamps[i];
}

public long[] getTimestamps() {
return timestamps;
}

public void setTimestamps(long[] timestamps) {
this.timestamps = timestamps;
}

public Object[] getValues() {
return values;
}

public void setValues(Object[] values) {
this.values = values;
}

public BitMap[] getBitMaps() {
return bitMaps;
}

public void setBitMaps(BitMap[] bitMaps) {
this.bitMaps = bitMaps;
}

public enum ColumnCategory {
TAG,
FIELD,
Expand Down Expand Up @@ -1247,4 +1309,13 @@ public void setTableName(String tableName) {
public List<ColumnCategory> getColumnTypes() {
return columnCategories;
}

public boolean isSorted() {
for (int i = 1; i < rowSize; i++) {
if (timestamps[i] < timestamps[i - 1]) {
return false;
}
}
return true;
}
}
Loading
Loading