Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3Uploader to Flow Aggregator #4143

Merged
merged 2 commits into from
Sep 1, 2022
Merged

Conversation

heanlan
Copy link
Contributor

@heanlan heanlan commented Aug 22, 2022

This PR adds S3Uploader as a new exporter of Flow Aggregator. It
periodically exports expired flow records from Flow Aggregator
to AWS S3 storage bucket.

Signed-off-by: heanlan hanlan@vmware.com

@codecov
Copy link

codecov bot commented Aug 22, 2022

Codecov Report

Merging #4143 (58ee55f) into main (222c2b6) will decrease coverage by 6.24%.
The diff coverage is 67.41%.

❗ Current head 58ee55f differs from pull request most recent head 8a85ca8. Consider uploading reports for the commit 8a85ca8 to get more accurate results

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4143      +/-   ##
==========================================
- Coverage   66.31%   60.06%   -6.25%     
==========================================
  Files         304      383      +79     
  Lines       46613    54256    +7643     
==========================================
+ Hits        30911    32589    +1678     
- Misses      13296    19222    +5926     
- Partials     2406     2445      +39     
Flag Coverage Δ
integration-tests 35.62% <ø> (+0.72%) ⬆️
kind-e2e-tests 48.66% <28.55%> (-0.59%) ⬇️
unit-tests 41.29% <67.41%> (-3.96%) ⬇️
Impacted Files Coverage Δ
pkg/flowaggregator/exporter/s3.go 0.00% <0.00%> (ø)
pkg/flowaggregator/options/options.go 31.25% <11.11%> (-5.49%) ⬇️
pkg/flowaggregator/flowaggregator.go 67.56% <42.50%> (-2.49%) ⬇️
pkg/flowaggregator/s3uploader/s3uploader.go 72.18% <72.18%> (ø)
pkg/flowaggregator/flowrecord/record.go 75.00% <75.00%> (ø)
pkg/config/flowaggregator/default.go 100.00% <100.00%> (+56.25%) ⬆️
...lowaggregator/clickhouseclient/clickhouseclient.go 73.59% <100.00%> (-9.13%) ⬇️
pkg/controller/ipam/antrea_ipam_controller.go 75.25% <0.00%> (-3.02%) ⬇️
pkg/agent/util/ipset/ipset.go 69.23% <0.00%> (-2.57%) ⬇️
pkg/controller/networkpolicy/store/addressgroup.go 88.37% <0.00%> (-2.33%) ⬇️
... and 145 more

@heanlan heanlan force-pushed the s3-uploader branch 2 times, most recently from 5c03a61 to 80a6123 Compare August 22, 2022 20:39
pkg/flowaggregator/exporter/s3.go Outdated Show resolved Hide resolved
pkg/flowaggregator/flowrecord/record.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/flowrecord/record.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Show resolved Hide resolved
@heanlan
Copy link
Contributor Author

heanlan commented Aug 24, 2022

image

I found an issue, and I haven't work out a solution. The .gz file generated by the latest commit CANNOT be opened by Mac's app Archive Utility ↑, but CAN be opened with command gunzip xxx.gz, also CAN be opened with Mac's app Console. But the previous commit works well with Archive Utility. Probably it's the same with a known issue: https://apple.stackexchange.com/questions/388759/archive-utility-cant-open-some-gzipped-text-files-based-on-their-contents , but I don't know how to verify it. I'm wondering do you believe it will cause any issue for consuming the file? @antoninbas @dreamtalen

Compare the latest commit - write record to the buffer one by one,
with the previous commit - write all records from the queue to the buffer at once,
I think the difference is how we use the gzipWriter, previously we create it before and close it after a batch writing. Now I put it in the struct field, before each upload, I close it, and after each upload, I reset it.

@antoninbas
Copy link
Contributor

antoninbas commented Aug 24, 2022

@heanlan I didn't take an in-depth look since you still need to make changes to the code, but maybe you need to call Flush on the gzip writer if you are going to use Reset and re-use it.

Note that I think it's simpler just to allocate a new Gzip writer each time.

@heanlan
Copy link
Contributor Author

heanlan commented Aug 26, 2022

I didn't take an in-depth look since you still need to make changes to the code, but maybe you need to call Flush on the gzip writer if you are going to use Reset and re-use it.

Note that I think it's simpler just to allocate a new Gzip writer each time.

Thanks. I called Close, which includes Flush. And Reset does the same thing with NewWriter. I also tried with NewWriter, the issue still exists.

pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
@antoninbas
Copy link
Contributor

@heanlan
I was thinking about it more, and I think with a single queue, it will not be possible to achieve all the goals:

  • avoid copies
  • avoid holding a lock during IO
  • avoid blocking in CacheRecord
  • handle upload failures in a reasonable way

we probably need 2 queues:

  • 1 for buffers which are complete but for which we haven't attempted upload (no need to enforce a max length, it will typically have 0 or 1 buffers) only. Call it Q1.
  • 1 for buffers for which upload has failed (max length 5). Call it Q2.

we need a single lock to protect currentBuffer and Q1.

then in pseudo-code:

func CacheRecord(r) {
    lock()
    defer unlock()
    writeToCurrentBuffer(r)
    if currentBufferIsFull() {
        addCurrentBufferToQ1()
        resetCurrentBuffer()
    }
}

func Upload() {
    lock()
    newBuffers = getAllBuffersFromQ1()
    unlock()
    sendAllBuffers() // newBuffers + Q2 buffers
    foreach failedBuffer {
        addBufferToQ2(failedBuffer) // enforce max length for Q2
    }
}

func TimerFunction() {
    lock()
    addCurrentBufferToQ1()
    resetCurrentBuffer()
    unlock()
    Upload()
}

let me know what you think. Sorry for some of my comments that were misleading.

When reading from the buffers during upload, we need to wrap the *bytes.Buffer with bytes.Reader. Otherwise, data may be truncated as you pointed out before.

reader := `bytes.NewReader(buffer.Bytes())`

This doesn't make any copy.

@heanlan
Copy link
Contributor Author

heanlan commented Aug 26, 2022

Thanks @antoninbas . I have two questions:

  1. 1 for buffers which are complete but for which we haven't attempted upload (no need to enforce a max length, it will typically have 0 or 1 buffers) only. Call it Q1.

Why it will typically have <=1 buffers? Because we set the maxRecordPerFile big enough?

  1. I'm trying to understand why we need Q2. Is it only for avoid holding the lock while handling upload failures ⬇️ ?
    foreach failedBuffer {
        addBufferToQ2(failedBuffer) // enforce max length for Q2
    }

In the design, we are making actual CSV data copies for newBuffers, right? When upload fails, we can also grab the lock and append the failedBuffer to Q1?

@antoninbas
Copy link
Contributor

antoninbas commented Aug 26, 2022

  1. Why it will typically have <=1 buffers? Because we set the maxRecordPerFile big enough?

Yes, but we can also consider waking up the upload goroutine when we add a buffer to Q1, without waiting for the next timer to fire. This can be done with a signal channel for example.

  1. I'm trying to understand why we need Q2. Is it only for avoid holding the lock while handling upload failures

Yes, it it only to handle upload failures. If we didn't need to handle upload failures, we would be ok with a single queue.
We would: 1) pop the buffer from the queue (with lock), 2) upload to S3 (without lock), 3) ignore failures

In the design, we are making actual CSV data copies for newBuffers

No we don't make any copy. We remove the *byte.Buffer from Q1 (these are just pointers to the data) and we just add them to the list of buffers we need to send. One option is to add them to Q2 directly, drop as needed if the max length is exceeded, and then attempt to upload all buffers in Q2. If any upload fails, the buffer will stay in Q2. Can you clarify why you think we need a copy?

@heanlan
Copy link
Contributor Author

heanlan commented Aug 26, 2022

Can you clarify why you think we need a copy?

I didn't realize before that we are removing buffers from Q1. It looks good then. Thanks.

Signed-off-by: Antonin Bas <abas@vmware.com>
@heanlan
Copy link
Contributor Author

heanlan commented Aug 29, 2022

Hi @antoninbas , thanks for the suggestion. I've implemented it. Please help check.

Yes, but we can also consider waking up the upload goroutine when we add a buffer to Q1, without waiting for the next timer to fire. This can be done with a signal channel for example.

I didn't add this for now. I think it will bring possible concurrent read&write to Q2. And the second half of batchUploadAll is not protected by lock.

I didn't add the logging for number of flow records has been uploaded either, just for simplicity.

Please let me know if you believe we do need any of these two.

Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still have the same issue with the generated gzip files? because I don't remember having this issue when I prototyped that code in the past.

pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
@antoninbas
Copy link
Contributor

I didn't add this for now. I think it will bring possible concurrent read&write to Q2. And the second half of batchUploadAll is not protected by lock.

I don't think so. All the accesses to Q2 would still happen in flowRecordPeriodicCommit which runs in a single goroutine. But there would be one more select case, so that the goroutine can be notified when a buffer is full and has been moved to Q1. That being said, I agree that it's not necessary to implement this now.

@heanlan
Copy link
Contributor Author

heanlan commented Aug 29, 2022

do you still have the same issue with the generated gzip files? because I don't remember having this issue when I prototyped that code in the past.

Yes, I have. Yes, your poc code doesn't have this issue. And my first commit doesn't have this issue. Both of them creates the gzipWriter, write the file at one time, and close it. The current version write the file every time it caches a record. I suppose it should be the known issue with Archive Utility: https://apple.stackexchange.com/questions/388759/archive-utility-cant-open-some-gzipped-text-files-based-on-their-contents

@heanlan
Copy link
Contributor Author

heanlan commented Aug 29, 2022

I don't think so. All the accesses to Q2 would still happen in flowRecordPeriodicCommit which runs in a single goroutine. But there would be one more select case, so that the goroutine can be notified when a buffer is full and has been moved to Q1. That being said, I agree that it's not necessary to implement this now.

Yes, you are right. I was thinking wrong. I have implemented a uploadCh for receiving the "buffer-full" signal in the initial commit, but without the Q1 to temporarily store the full currentBuffer, and reset it to let it continue caching new records. So the new records won't be cached until currentBuffer has been reset.

Yes, it should work well with the current Q1&Q2 approach. We can add it later if we need it.

@antoninbas
Copy link
Contributor

https://apple.stackexchange.com/questions/388759/archive-utility-cant-open-some-gzipped-text-files-based-on-their-contents

could you check that you do get Missing type keyword in mtree specification when you run the Archive Utility in the terminal?

@heanlan
Copy link
Contributor Author

heanlan commented Aug 29, 2022

could you check that you do get Missing type keyword in mtree specification when you run the Archive Utility in the terminal?

I wanted to check that. But I didn't find a way to run the app in terminal. I don't know how they did it. That's why I say I don't know how to verify it is the cause.

I've tried the following, they still pop out the UI window.

➜ Applications open /Users/hanlan/Downloads/records-qfm1xxnkxyf1.csv.gz -b com.apple.archiveutility
➜ Archive Utility.app open /Users/hanlan/Downloads/records-qfm1xxnkxyf1.csv.gz
➜ Downloads open records-qfm1xxnkxyf1.csv.gz

build/charts/flow-aggregator/README.md Outdated Show resolved Hide resolved
build/images/flow-aggregator/Dockerfile Outdated Show resolved Hide resolved
pkg/flowaggregator/options/options.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader.go Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader_test.go Outdated Show resolved Hide resolved
pkg/flowaggregator/s3uploader/s3uploader_test.go Outdated Show resolved Hide resolved
antoninbas
antoninbas previously approved these changes Aug 31, 2022
Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

This PR adds S3Uploader as a new exporter of Flow Aggregator. It
periodically exports expired flow records from Flow Aggregator
to AWS S3 storage bucket.

Signed-off-by: heanlan <hanlan@vmware.com>
@heanlan
Copy link
Contributor Author

heanlan commented Aug 31, 2022

/test-all

Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving again after squash

@antoninbas
Copy link
Contributor

@dreamtalen any more comments on this?

@dreamtalen
Copy link
Contributor

@dreamtalen any more comments on this?

No, LGTM

@antoninbas antoninbas merged commit dfc4b7d into antrea-io:main Sep 1, 2022
@heanlan heanlan deleted the s3-uploader branch September 1, 2022 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants