-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT] s3 list cost increases exponentially when using COW table #11742
Comments
resolved and answered on slack |
Thanks for responding @soumilshah1995 |
@ankit0811 does s3 has more detail log show which type file cost? |
@ankit0811 like audit log? |
@KnightChess unfortunately no. |
Hi there! It's fantastic to hear that the cost has dropped by 10%. Could you clarify the current cost? Are we looking at something in the hundreds of dollars, or is it less than $100? We also need a bit more detail to assist you better: |
Hey, since this is just a POC, its less than $100.
We have our own local clusters and deploy spark using spark-operator (k8s)
0.15.0
3.4.3
This is the hoodie writer config df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-host-port")
.option("subscribe", "topicName")
.option("auto.offset.reset", "latest")
.option("failOnDataLoss", "false")
.load();
df = df
.selectExpr("CAST(value AS STRING) as value")
.select(
col("value"),
from_json(col("value"), schema).alias("__rows"))
.select(
col("__rows.time_stamp").alias("time_stamp"),
col("..."),
col("..."),
col("..."),,
explode(col("__rows.nestedColumn")).alias("__data")
)
.select(
col("time_stamp"),
col("...."),
col("...."),
col("....");
DataStreamWriter<Row> streamWriter = df.writeStream()
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.delete.shuffle.parallelism", "2")
.option(EMBEDDED_TIMELINE_SERVER_ENABLE.key(), "true")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "col1, col2, col3, col4")
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "time_stamp")
// Clustering + Compaction config
.option(ASYNC_CLUSTERING_ENABLE.key(), "true")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", "524288000")
// Metadata Config
.option(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true")
.option(ASYNC_CLEAN.key(), "true")
.option(HoodieWriteConfig.TBL_NAME.key(), newTName)
.option(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name())
.option("hoodie.datasource.write.operation", WriteOperationType.INSERT.value())
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "ts_date")
.option("checkpointLocation", newCheckPoint)
.outputMode(OutputMode.Append());
streamWriter.trigger(new ProcessingTimeTrigger(120000)).start(newBasePath).awaitTermination();
Have pasted the relevant piece of code only. Let me kn ow if this helps.
The current volume per day for the above topic is ~11Gb compressed data per day. (FYI this is only dev data and we expect this to be at least 50 times in prod). We plan to partition this by date. As of now we haven't configured any indexing and are using the default options only (Post your suggestion) |
To provide you with the best suggestions, we'll need a sample schema or some mock data. This will help us recommend the most suitable indexing options and the optimal approach for achieving the fastest writes (UPSERT). Whether you require uniqueness across all partitions or within a single partition, we can fine-tune the script to ensure maximum performance for both reads and writes. For instance, if you need fast UPSERTs on a primary key, a record-level index might be ideal. Alternatively, a bucket index can offer faster writes since it avoids index lookups. However, the best option also depends on the query engine you're using. For example, Athena does not support metadata and data skipping. To give you the best possible guidance, please share more details about your workload, data, and schema. This information will help us recommend the most effective settings and options available. |
Sure. Please find the schema (due to security reasons, I am not allowed to share the actual schema but this is the closest representation without giving much details) The work load is approx 11GB (compressed) of streamed data. This is a consistent stream and do not expect spiky traffic. Also curious, how will schema dictate the listing cost? StructType nestedCol1 = new StructType(new StructField[] {
DataTypes.createStructField("col1", DataTypes.StringType, true),
DataTypes.createStructField("col2", DataTypes.StringType, true),
DataTypes.createStructField("col3", DataTypes.StringType, true),
DataTypes.createStructField("col4", DataTypes.StringType, true),
DataTypes.createStructField("col5", DataTypes.StringType, true),
DataTypes.createStructField("col6", DataTypes.StringType, true),
DataTypes.createStructField("col7", DataTypes.StringType, true),
DataTypes.createStructField("col8", DataTypes.StringType, true),
DataTypes.createStructField("nested_metric1", DataTypes.IntegerType, true),
DataTypes.createStructField("nested_metric2", DataTypes.IntegerType, true)
});
StructType schema = new StructType(new StructField[] {
DataTypes.createStructField("time_stamp", DataTypes.DoubleType, false),
DataTypes.createStructField("id_part1", DataTypes.StringType, false),
DataTypes.createStructField("id_part2", DataTypes.StringType, false),
DataTypes.createStructField("id_part3", DataTypes.StringType, false),
DataTypes.createStructField("id_part4", DataTypes.StringType, false),
DataTypes.createStructField("id_part5", DataTypes.StringType, false),
DataTypes.createStructField("attribute1", DataTypes.StringType, true),
DataTypes.createStructField("attribute2", DataTypes.StringType, true),
DataTypes.createStructField("attribute3", DataTypes.StringType, true),
DataTypes.createStructField("attribute4", DataTypes.StringType, true),
DataTypes.createStructField("attribute5", DataTypes.BooleanType, true),
DataTypes.createStructField("metric1", DataTypes.DoubleType, true),
DataTypes.createStructField("metric2", DataTypes.DoubleType, true),
DataTypes.createStructField("metric3", DataTypes.DoubleType, true),
DataTypes.createStructField("nested_col1", DataTypes.createArrayType(nestedCol1), true)
}); sample data {
"id_part1": "4a7545a1-9f0d-4038-a8c0-444b55a579fe",
"id_part2": "cf7755cb-faaf-4805-986c-69f1cd625754",
"time_stamp": 1723250086660005600,
"id_part3": "7fbf2e0d-0567-475e-bc16-cf115f2af435",
"id_part4": "327da669-a0e9-442b-b423-0d0481688b00",
"id_part5": "device_type1",
"attribute1": "device_attribute1",
"attribute2": "device_attribute2",
"attribute3": "device_attribute3",
"attribute4": "device_attribute4",
"attribute5": false,
"metric1": 10234.24,
"metric2": 1.99,
"metric3": -156.98,
"etl_out": 1723250087328.89,
"nested_col1": [
{
"col0": "b9e102ac-d00c-410e-b5c1-d2ada5f53933",
"col1": "74db6f8b-d99e-4483-b82f-9efaebaf17e8",
"col2": "478c6f35-e2ac-410b-a390-517f308d8a43",
"col3": "d6bf71d0-ee70-42df-937e-d9f26280a66a",
"col4": "45c94e8f-3ac0-4cbd-8284-13808a65846c",
"col5": "653256d6-bccd-49ac-9256-bae53219db3f",
"col6": "c953409e-9bf0-43fd-af2f-22751cdbeeb6",
"col7": "6371c7a4-6b97-4925-8f37-5e0dd474a8b7",
"nested_metric1": "0",
"nested_metric2": "-190"
},
{
"col0": "98d52ad0-1482-4ed8-be7b-3f6d81eac4de",
"col1": "d1022ef4-100b-4c7e-96f5-19f60426a73a",
"col2": "067a5291-7038-492f-9776-5fb87f673f6c",
"col3": "601ac625-f7ee-487d-b473-535c09103018",
"col4": "21a07cf2-633e-478f-a218-9e3ce0564f84",
"col5": "d7b66f65-388b-45a8-b1ca-ff9ad3a90750",
"col6": "bfc026d2-f5f4-4a61-9015-113e6464f630",
"col7": "5870169b-0f2f-4989-b42c-d470e3912277",
"nested_metric1": "-90",
"nested_metric2": "-190"
}
]
} |
I'll review the schema and get back to you on this GitHub issue. What are you planning to use as the primary key? Are you considering a composite key? Also, where is the data being ingested from—are you pulling it from Kafka topics? also are you following medallion architecture ? |
Hi @ankit0811 If I were you, I would create a base line with this.
And play with "ProcessingTimeTrigger(120000)".
|
@Gatsby-Lee thanks for the suggestions. I have disabled metadata, clustering and cleaner services atm. Will let it run for a day or so and update with the changes if any. If this doesnt help, will play with the trigger time @soumilshah1995 the source is a kafka stream and right now since this is a POC, we havent finalized the architecture yet. All we are trying to do is create a hudi table from our raw kafka stream and take it forward from there. |
Update: FYI, this cost is in 10s of $ per day. The storage cost is also in the same range. So wanted to understand if this is a constant cost independent of the storage volume and you see the same kind of cost behavior for your prod workflows? @Gatsby-Lee |
Hi @ankit0811 that's great news that the cost dropped. ( BTW, you need the cleaner at minimum ) In short, I always think this way. I don't use clustering since it can add extra complicity. |
Would you mind disabling only cleaning ? This reminds me this issue with same exponentiel listing behavior #6373 |
@parisni Sure will try that as well. Will target experimenting further with cleaner service in a few weeks and keep the thread updated |
@parisni I did disabled the cleaner but it did not result in any more decrease in list cost. One thing still not clear to me is wasn't the metadata service's role to avoid such increase and use it for listing instead of going to file system ? |
thx, can you confirm the usage pattern on this table is write only ? So the listing are related to writer, and not select queries ? True the metadata goal is to reduce drastically listing which happens mostly during read, but AFAIK during the write path to identify files to copy on write |
You need the cleaner at min. |
We started ingesting data from kafka using spark java and wanted to understand the storage cost associated with creating a table.
Interestingly, we see that our s3 listing cost exponentially so wanted to understand if we are missing on something that can help us reduce this cost.
We do have the metadata enabled so the assumption was s3 cost will be reduced but dont see that.
When the spark job starts, we do see that the embedded timeline server starts and errors out post processing of the 1 micro batch. (Not sure if this is causing the listing cost and I believe the embedded timeline server was disabled as per this issue)
Environment Description
Hudi version : 0.15.0
Spark version : 3.4.3
Storage (HDFS/S3/GCS..) : S3
Stacktrace
** Hudi config **
The text was updated successfully, but these errors were encountered: