Skip to content

Commit

Permalink
Check for empty data for add/delete/+async (#5242)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver authored and devinrsmith committed Mar 12, 2024
1 parent bebaafc commit 0893e61
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ public TableDefinition getTableDefinition() {
public void add(@NotNull final Table newData) throws IOException {
checkBlockingEditSafety();
PendingChange pendingChange = enqueueAddition(newData);
blockingContinuation(pendingChange);
if (pendingChange != null) {
blockingContinuation(pendingChange);
}
}

@Override
Expand All @@ -207,14 +209,21 @@ public void addAsync(
@NotNull final InputTableStatusListener listener) {
checkAsyncEditSafety(newData);
final PendingChange pendingChange = enqueueAddition(newData);
asynchronousContinuation(pendingChange, listener);
if (pendingChange != null) {
asynchronousContinuation(pendingChange, listener);
} else {
listener.onSuccess();
}
}

private PendingChange enqueueAddition(@NotNull final Table newData) {
validateAddOrModify(newData);
// we want to get a clean copy of the table; that can not change out from under us or result in long reads
// during our UGP run
final Table newDataSnapshot = snapshotData(newData);
if (newDataSnapshot.size() == 0) {
return null;
}
final PendingChange pendingChange;
synchronized (pendingChanges) {
pendingChange = new PendingChange(newDataSnapshot, false);
Expand All @@ -228,7 +237,9 @@ private PendingChange enqueueAddition(@NotNull final Table newData) {
public void delete(@NotNull final Table table) throws IOException {
checkBlockingEditSafety();
final PendingChange pendingChange = enqueueDeletion(table);
blockingContinuation(pendingChange);
if (pendingChange != null) {
blockingContinuation(pendingChange);
}
}

@Override
Expand All @@ -237,12 +248,19 @@ public void deleteAsync(
@NotNull final InputTableStatusListener listener) {
checkAsyncEditSafety(table);
final PendingChange pendingChange = enqueueDeletion(table);
asynchronousContinuation(pendingChange, listener);
if (pendingChange != null) {
asynchronousContinuation(pendingChange, listener);
} else {
listener.onSuccess();
}
}

private PendingChange enqueueDeletion(@NotNull final Table table) {
validateDelete(table);
final Table oldDataSnapshot = snapshotData(table);
if (oldDataSnapshot.size() == 0) {
return null;
}
final PendingChange pendingChange;
synchronized (pendingChanges) {
pendingChange = new PendingChange(oldDataSnapshot, true);
Expand Down
20 changes: 20 additions & 0 deletions py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,26 @@ def test_instant_array(self):
self.wait_ticking_table_update(t5, row_count=1, timeout=5)
self.assertEqual(t5.size, 1)

def test_input_table_empty_data(self):
from deephaven import update_graph as ugp
from deephaven import execution_context as ec

ug = ec.get_exec_ctx().update_graph
cm = ugp.exclusive_lock(ug)

with cm:
t = time_table("PT1s", blink_table=True)
it = input_table({c.name: c.data_type for c in t.columns}, key_cols="Timestamp")
it.add(t)
self.assertEqual(it.size, 0)
it.delete(t)
self.assertEqual(it.size, 0)

t = empty_table(0).update("Timestamp=nowSystem()")
it.add(t)
self.assertEqual(it.size, 0)
it.delete(t)
self.assertEqual(it.size, 0)

if __name__ == '__main__':
unittest.main()

0 comments on commit 0893e61

Please sign in to comment.