Skip to content

Commit

Permalink
adds pooling enhancements, pooling state is propagated to derived dat…
Browse files Browse the repository at this point in the history
…a frame, pools are rebuilt when enabled on df with data
  • Loading branch information
vmzakharov committed Jan 28, 2025
1 parent a77459d commit 5e21731
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 39 deletions.
10 changes: 3 additions & 7 deletions docs/POOLING.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ To enabling pooling at the data frame level, call `enablePooling()` on a data fr

### Data Frame Transformation and Pooling

By default, the data frames created as a result of extracting data from an existing data frame (using methods such as `select`, `reject`, `partition`) will **not** have pooling disabled.
By default, the data frames created as a result of extracting data from an existing data frame (using methods such as `select`, `reject`, `partition`) will retain the pooling setting of the source data frame.

Sometimes one may wish to have pooling enabled for the data frames that are produced by these operations, like in scenarios when derived data frames will have data added to them.
Note that if the source data frame was created with pooling enabled, regardless of its current pooling status, the derived data frames will benefit from that. That is, the total number of value instances will not increase as no new instances will be created. So you only need pooling on derived data frames if you are planning to add data to them.

To achieve this, set `poolingStatusInherited` property the on the source data frame to true. If the source data frame has pooling enabled, the derived data frame(s) will have pooling enabled as well.

If the `poolingStatusInherited` property is set to `false` (this is the default) then the derived data frames will not have pooling enabled regardless of the pooling status of the source data frame.

Note that if the source data frame was created with pooling, the derived data frames will benefit from that (the total number of value instances will not increase as no new instances will be created). You only need to set pooling on the derived data frames if you are plannign to add data to it.
If the source data frame has pooling enabled, and you don't need pooling for a derived data frame as you are not planning to add any data to it, call `disablePooling` on it to avoid the overhead of having an active pool.
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,6 @@ private void attachColumn(DfColumn newColumn)
}
}

public void enablePooling()
{
this.poolingEnabled = true;
this.columns.forEach(DfColumn::enablePooling);
}

public boolean isPoolingEnabled()
{
return this.poolingEnabled;
}

public DfColumn getColumnNamed(String columnName)
{
DfColumn column = this.columnsByName.get(columnName);
Expand Down Expand Up @@ -837,12 +826,35 @@ public DataFrame seal()
return this;
}

private void disablePooling()
/**
* Turns off pooling of object values and drops object pools.
* See also {@link #enablePooling()}
*/
public void disablePooling()
{
this.poolingEnabled = false;
this.columns.forEach(DfColumn::disablePooling);
}

/**
* Turns on pooling of object values. This can result in memory savings when data frame is populated with data.
* If pooling is enabled on a data frame that already has data, the existing data will be pooled.
* Note, if pooling is enabled, data frames produced from this one via filtering or union operations will also have
* pooling enabled. Retaining pooling may be useful if rows will be added to the resulting data frame, otherwise
* call on this data frame before performing these operations, alternatively call {@link #disablePooling()} on the
* derived data frame
*/
public void enablePooling()
{
this.poolingEnabled = true;
this.columns.forEach(DfColumn::enablePooling);
}

public boolean isPoolingEnabled()
{
return this.poolingEnabled;
}

private void determineRowCount()
{
MutableIntList storedColumnsSizes = this.columns.select(DfColumn::isStored).collectInt(DfColumn::getSize);
Expand Down Expand Up @@ -1227,7 +1239,10 @@ private DataFrame uniqueRowsForColumns(ListIterable<DfColumn> uniqueColumns)
public Twin<DataFrame> partition(String filterExpressionString)
{
DataFrame selected = this.cloneStructure(this.name + "-selected");
this.inheritPoolingSettings(selected);

DataFrame rejected = this.cloneStructure(this.name + "-rejected");
this.inheritPoolingSettings(rejected);

Expression filterExpression = ExpressionParserHelper.DEFAULT.toExpression(filterExpressionString);

Expand All @@ -1245,8 +1260,8 @@ public Twin<DataFrame> partition(String filterExpressionString)
}
}

selected.seal();
rejected.seal();
this.sealMindingPooling(selected);
this.sealMindingPooling(rejected);

return Tuples.twin(selected, rejected);
}
Expand Down Expand Up @@ -1280,26 +1295,52 @@ public DataFrame rejectBy(String filterExpressionString)
return this.filterBy(filterExpressionString, "rejected", false);
}

private void inheritPoolingSettings(DataFrame dataFrame)
{
if (this.isPoolingEnabled())
{
dataFrame.enablePooling();
}
}

private void sealMindingPooling(DataFrame dataFrame)
{
if (this.isPoolingEnabled())
{
dataFrame.determineRowCount();
dataFrame.resetBitmap();
}
else
{
dataFrame.seal();
}
}

private DataFrame filterBy(String filterExpressionString, String operation, boolean select)
{
DataFrame filtered = this.cloneStructure(this.getName() + "-" + operation);
this.inheritPoolingSettings(filtered);

Expression filterExpression = ExpressionParserHelper.DEFAULT.toExpression(filterExpressionString);
for (int i = 0; i < this.rowCount; i++)
{
this.getEvalContext().setRowIndex(i);
this.getEvalContext()
.setRowIndex(i);
Value filterValue = filterExpression.evaluate(this.getEvalVisitor());
if (((BooleanValue) filterValue).is(select))
{
filtered.copyRowFrom(this, i);
}
}
filtered.seal();

this.sealMindingPooling(filtered);
return filtered;
}

private DataFrame selectByMarkValue(IntPredicate flaggedAtIndex, String description)
{
DataFrame filtered = this.cloneStructure(this.getName() + "-" + description);
this.inheritPoolingSettings(filtered);

for (int i = 0; i < this.rowCount; i++)
{
Expand All @@ -1308,7 +1349,8 @@ private DataFrame selectByMarkValue(IntPredicate flaggedAtIndex, String descript
filtered.copyRowFrom(this, this.rowIndexMap(i));
}
}
filtered.seal();

this.sealMindingPooling(filtered);

return filtered;
}
Expand Down Expand Up @@ -1494,23 +1536,29 @@ public DataFrame union(DataFrame other)
}

DataFrame dfUnion = new DataFrame("union");
this.inheritPoolingSettings(dfUnion);

this.columns.forEach(
col -> col.mergeWithInto(other.getColumnNamed(col.getName()), dfUnion)
);

dfUnion.seal();
this.sealMindingPooling(dfUnion);
return dfUnion;
}

/**
* enables flagging the rows as true or false - effectively creating a bitmap of the data frame
* Enables flagging the rows as true or false - effectively creating a bitmap of the data frame rows.
* This method initializes the bitmap so that no flags are set.
*/
public void resetBitmap()
{
this.bitmap = BooleanArrayList.newWithNValues(this.rowCount, false);
}

/**
* Flags a row in the data frame. The data frame can be filtered based on the flagged rows.
* @param rowIndex the index of the row to flag
*/
public void setFlag(int rowIndex)
{
this.ensureBitmapCapacity();
Expand All @@ -1528,6 +1576,11 @@ private void ensureBitmapCapacity()
}
}

/**
* checks if a row is flagged
* @param rowIndex the index of the row to check
* @return {@code true} if the row a {@code rowIndex} is flagged, {@code false}
*/
public boolean isFlagged(int rowIndex)
{
this.ensureBitmapCapacity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ protected void addMyType(T anObject)
@Override
public void enablePooling()
{
this.pool = new UnifiedSet<>();
if (this.pool == null)
{
this.pool = new UnifiedSet<>();
for (int i = 0; i < this.values.size(); i++)
{
T value = this.values.get(i);
this.values.set(i, this.pool.put(value));
}
}
}

@Override
Expand Down Expand Up @@ -122,6 +130,13 @@ public void ensureInitialCapacity(int newCapacity)
@Override
protected void addAllItems(ListIterable<T> items)
{
this.values.addAllIterable(items);
if (this.pool == null)
{
this.values.addAllIterable(items);
}
else
{
items.forEach(each -> this.values.add(this.pool.put(each)));
}
}
}
Loading

0 comments on commit 5e21731

Please sign in to comment.