Skip to content

Commit

Permalink
fix list_records
Browse files Browse the repository at this point in the history
Signed-off-by: Ayush Kamat <ayush@latch.bio>
  • Loading branch information
ayushkamat committed Dec 2, 2024
1 parent 91eb9e1 commit b461fa0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 53 deletions.
14 changes: 10 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ Types of changes

# Latch SDK Changelog

## 2.54.4
## 2.54.5 - 2024-12-02

## Added
### Fixed

* Fix pagination in `TableUpdater.list_records` to not OOM when table size is too large

## 2.54.4 - 2024-11-25

### Added

* Plots Artifact dataclasses

## 2.54.3
## 2.54.3 - 2024-11-25

## Added
### Added

* `cache` parameter to `LPath` download method to allow for caching of downloaded files

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ include = ["src/**/*.py", "src/latch_cli/services/init/*"]

[project]
name = "latch"
version = "2.54.4"
version = "2.54.5"
description = "The Latch SDK"
authors = [{ name = "Kenny Workman", email = "kenny@latch.bio" }]
maintainers = [
Expand Down
106 changes: 58 additions & 48 deletions src/latch/registry/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,70 +245,80 @@ def list_records(self, *, page_size: int = 100) -> Iterator[Dict[str, Record]]:

cols = self.get_columns()

# todo(maximsmol): because allSamples returns each column as its own
# row, we can't paginate by samples because we don't know when a sample is finished
data = execute(
gql.gql("""
query TableQuery($id: BigInt!) {
catalogExperiment(id: $id) {
allSamples {
nodes {
sampleId
sampleName
sampleDataKey
sampleDataValue
offset = 0
while True:
data = execute(
gql.gql("""
query TableQuery(
$id: BigInt!,
$argLimit: BigInt!,
$argOffset: BigInt!
) {
catalogExperiment(id: $id) {
allSamplesJoinInfoPaginated(
argLimit: $argLimit,
argOffset: $argOffset
) {
nodes {
id
name
key
data
}
}
}
}
}
"""),
{"id": self.id},
)["catalogExperiment"]
{"id": self.id, "argLimit": page_size, "argOffset": offset},
)["catalogExperiment"]

if data is None:
raise TableNotFoundError(
f"table does not exist or you lack permissions: id={self.id}"
)
if data is None:
raise TableNotFoundError(
f"table does not exist or you lack permissions: id={self.id}"
)

nodes: List[_AllRecordsNode] = data["allSamples"]["nodes"]
nodes: List[_AllRecordsNode] = data["allSamplesJoinInfoPaginated"]["nodes"]

record_names: Dict[str, str] = {}
record_values: Dict[str, Dict[str, RecordValue]] = {}
record_names: Dict[str, str] = {}
record_values: Dict[str, Dict[str, RecordValue]] = {}

for node in nodes:
record_names[node["sampleId"]] = node["sampleName"]
vals = record_values.setdefault(node["sampleId"], {})
for node in nodes:
record_names[node["id"]] = node["name"]
vals = record_values.setdefault(node["id"], {})

col = cols.get(node["sampleDataKey"])
if col is None:
continue
col = cols.get(node["key"])
if col is None:
continue

# todo(maximsmol): in the future, allow storing or yielding values that failed to parse
vals[col.key] = to_python_literal(
node["sampleDataValue"], col.upstream_type["type"]
)
# todo(maximsmol): in the future, allow storing or yielding values that failed to parse
vals[col.key] = to_python_literal(
node["data"], col.upstream_type["type"]
)

page: Dict[str, Record] = {}
for id, values in record_values.items():
for col in cols.values():
if col.key in values:
continue
page: Dict[str, Record] = {}
for id, values in record_values.items():
for col in cols.values():
if col.key in values:
continue

if not col.upstream_type["allowEmpty"]:
values[col.key] = InvalidValue("")
if not col.upstream_type["allowEmpty"]:
values[col.key] = InvalidValue("")

cur = Record(id)
cur._cache.name = record_names[id]
cur._cache.values = values
cur._cache.columns = cols
page[id] = cur
cur = Record(id)
cur._cache.name = record_names[id]
cur._cache.values = values
cur._cache.columns = cols
page[id] = cur

if len(page) == page_size:
if len(page) > 0:
yield page
page = {}

if len(page) > 0:
yield page
if len(page) < page_size:
break

offset += page_size
else:
break

def get_dataframe(self):
"""Get a pandas DataFrame of all records in this table.
Expand Down

0 comments on commit b461fa0

Please sign in to comment.