Skip to content

Commit

Permalink
checking if file exists in add methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jun 13, 2024
1 parent 2360f6d commit 9127c53
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,32 @@ class OperatorStateMetadataLog(
case "v2" => OperatorStateMetadataV2.deserialize(bufferedReader)
}
}


/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*/
override def add(batchId: Long, metadata: OperatorStateMetadata): Boolean = {
require(metadata != null, "'null' metadata cannot written to a metadata log")
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
fileManager.delete(batchMetadataFile)
}
val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) }
if (metadataCacheEnabled && res) batchCache.put(batchId, metadata)
res
}

override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
val batchMetadataFile = batchIdToPath(batchId)

if (metadataCacheEnabled && batchCache.containsKey(batchId)) {
false
} else {
write(batchMetadataFile, fn)
true
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,26 @@ class StateSchemaV3File(
val json = buf.toString()
JsonMethods.parse(json)
}

override def add(batchId: Long, metadata: JValue): Boolean = {
require(metadata != null, "'null' metadata cannot written to a metadata log")
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
fileManager.delete(batchMetadataFile)
}
val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) }
if (metadataCacheEnabled && res) batchCache.put(batchId, metadata)
res
}

override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
val batchMetadataFile = batchIdToPath(batchId)

if (metadataCacheEnabled && batchCache.containsKey(batchId)) {
false
} else {
write(batchMetadataFile, fn)
true
}
}
}

0 comments on commit 9127c53

Please sign in to comment.