Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup!…
Browse files Browse the repository at this point in the history
… fixup! fixup! fixup! clean up orphaned blobs
  • Loading branch information
jvansanten committed Feb 16, 2024
1 parent 64f3f43 commit cf3ffc1
Showing 1 changed file with 37 additions and 43 deletions.
80 changes: 37 additions & 43 deletions migrations/02_add_archive_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,37 @@
values={"refcount": sa.select([sa.func.count(Alert.c.avro_archive_id)])},
)

if False:
# fill in count
for row in connection.execute(
Archive.select(Archive.c.count == None)
).fetchall():
key = get_key_for_url(bucket, row["uri"])
count = len(list(fastavro.reader(get_stream(bucket, key))))

connection.execute(update_count, id=row["avro_archive_id"], count=count)

print(row, count)

if False:
# fill in refcount
target = (
sa.select(
[
Alert.c.avro_archive_id,
sa.func.count(Alert.c.avro_archive_id).label("refcount"),
]
)
.where(
Alert.c.avro_archive_id.in_(
sa.select([Archive.c.avro_archive_id]).where(
Archive.c.refcount == 0
)
)
log.info("filling in missing avro_archive.count")
for row in connection.execute(Archive.select(Archive.c.count == None)).fetchall():
key = get_key_for_url(bucket, row["uri"])
count = len(list(fastavro.reader(get_stream(bucket, key))))

connection.execute(update_count, id=row["avro_archive_id"], count=count)

print(row, count)

log.info("filling in missing avro_archive.refcount")
target = (
sa.select(
[
Alert.c.avro_archive_id,
sa.func.count(Alert.c.avro_archive_id).label("refcount"),
]
)
.where(
Alert.c.avro_archive_id.in_(
sa.select([Archive.c.avro_archive_id]).where(Archive.c.refcount == 0)
)
.group_by(Alert.c.avro_archive_id)
).alias("target")
connection.execute(
Archive.update()
.values(refcount=target.c.refcount)
.where(Archive.c.avro_archive_id == target.c.avro_archive_id)
)
.group_by(Alert.c.avro_archive_id)
).alias("target")
connection.execute(
Archive.update()
.values(refcount=target.c.refcount)
.where(Archive.c.avro_archive_id == target.c.avro_archive_id)
)

# clean up blobs that are not referenced by any alert
log.info("cleaning up blobs not referenced by any alert")
with connection.begin() as transaction:

i = 0
Expand All @@ -107,16 +101,16 @@
):
key = get_key_for_url(bucket, row["uri"])
if args.dry_run:
print(bucket.Object(key).delete())
log.info(bucket.Object(key).delete())
else:
print(f"would delete {key}")
if i > 0:
print(f"Deleted {i} objects not referenced by any alert row")
log.info(f"would delete {key}")
log.info(f"Deleted {i} objects not referenced by any alert row")
if args.dry_run:
transaction.rollback()
else:
transaction.commit()

log.info("finding blobs not referenced in avro_archive")
in_bucket = {summary.key: summary.size for summary in bucket.objects.all()}
print(
f"{bucket.name}: {sum(in_bucket.values()) / 2**30:.1f} GB in {len(in_bucket)} items"
Expand All @@ -133,12 +127,12 @@
in_archive[key] for key in set(in_archive.keys()).difference(in_bucket)
)
missing_in_archive = [key for key in set(in_bucket).difference(in_archive)]
print(f"{len(missing_in_bucket)} missing in bucket: {missing_in_bucket}")
print(f"{len(missing_in_archive)} missing in archive {missing_in_archive}")
log.info(f"{len(missing_in_bucket)} missing in bucket: {missing_in_bucket}")
log.info(f"{len(missing_in_archive)} missing in archive {missing_in_archive}")

for key in missing_in_archive:
if args.dry_run:
print(f"would delete {key}")
log.info(f"would delete {key}")
else:
print(bucket.Object(key).delete())
print(f"Deleted {i} objects not referenced by any avro_archive row")
log.info(bucket.Object(key).delete())
log.info(f"Deleted {len(missing_in_archive)} objects not referenced by any avro_archive row")

0 comments on commit cf3ffc1

Please sign in to comment.