Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Commoncrawl pipeline] Add component download_commoncrawl_segments #273

Merged

Conversation

shayorshay
Copy link
Contributor

This PR adds the second component of the Commoncrawl pipeline. The component downloads the WARC segment files and extracts the webpage urls and html code to be returned as a dask dataframe.

Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Sharon :)
Looks good overall but just have a small doubt regarding the current scalability of this approach.

BASE_URL = "https://data.commoncrawl.org/"


def get_records_from_warc_file(warc_file: str, n_records_to_download: int) -> List:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The n_records_to_download should be an optional argument and below you should only break if it not None

A list of webpages.
"""
logger.info(f"Processing WARC file from segment path: {warc_file}...")
records = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we risk running into out-of-memory issues if the number of extracted records is too large? right now it seems like we're collecting them all in a list and materializing them in memory before transforming them into a Dask Dataframe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Materializing a list and then converting it to a dataframe is the recommended approach, since appending a dataframe is a lot more expensive. We might want to do this on a partition level though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean that we need one partition per WARC file to keep the dataframes as small as possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this component and run into the expected out-of-memory.
I have tried to download 1 segment with 30k records. Seems to be Philippes assumption is correct, that the records list didn't fit into memory in my case.

What do you think about using dask.delayed? Basically, create a delayed_object with a fixed size (e.g. 1000 records), append them to a dask dataframe, and afterwards initialise the dataframe from the delayed objects.

def load_record(record_counter, batch_size) -> list:
    # returning a list of delayed objects with a fixed size 
    offset = record_counter * batch_size # use this to determine the starting point to read from
    counter = 0
    records = []
    ...
    for record in ArchiveIterator(response.raw, arc2warc=True):
        if counter >= offset:
             # read content and append to list
        if len(records) >= batch_size:
             return records


delayed_data = []
for segment in segments:
    total_number_of_records = xxx
    batch_size = 1000
    delayed_data.extend([delayed(load_record)(record_counter, batch) for record_counter in range(0, 
    total_number_of_records, batch_size)])

# initialise the final dataframe, download will be executed
dataframe = dd.from_delayed(delayed_data)

Copy link
Contributor

@satishjasthi satishjasthi Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrchtr That's good idea, in fact Im also using chunking to create delayed objects for load_from_files component in order to tackle the same issue

url:
type: string
html:
type: binary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not a string as well?

A list of webpages.
"""
logger.info(f"Processing WARC file from segment path: {warc_file}...")
records = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Materializing a list and then converting it to a dataframe is the recommended approach, since appending a dataframe is a lot more expensive. We might want to do this on a partition level though.

Returns:
A Dask DataFrame containing the downloaded webpages.
"""
segment_paths = df["segment_path"].to_bag()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to use the bag interface here?

Copy link
Contributor Author

@shayorshay shayorshay Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can use dask map to process each WARC in parallel. Using a list caused errors when flattening the results to a dask dataframe later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I mean compared to the dataframe API, where we have the apply function which behaves similarly.

for record in ArchiveIterator(response.raw, arc2warc=True):
if record.rec_type == "response":
url = record.rec_headers.get_header("WARC-Target-URI")
content = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible to add an optional possibility to return plain text instead of the html?
We could add an additional parameter if html or plain text should be returned. I think you don't need it for your use case, but for the large language model use case it would be super useful! :)

For the plain text transformation you could use BeautifulSoup. Some time ago I did something similar by using this code function:

    def _convert_to_plain_text(html):
        """Convert html body into plain text. Making sure table rows are on seperate lines."""
        soup = BeautifulSoup(html)
        body_content = soup.find("body")
        text = ""

        for e in body_content.descendants:
            if isinstance(e, str):
                text += e.strip()
            elif e.name in ['br', 'h1', 'h2', 'h3', 'h4', 'tr', 'th']:
                text += '\n'
            elif e.name in ['td']:
                text += ' '
            elif e.name == 'p' and not any(parent.name == 'table' for parent in e.parents):
                text += '\n'
            elif e.name in ['td']:
                text += '\t'
            elif e.name == 'li':
                text += '\n- '

        return text

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is soup.get_text() not sufficient?

Copy link
Contributor

@mrchtr mrchtr Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works as well. The custom code handles table a bit differently. Basically, try to keep the table structure line by line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the custom code keeps the CSS content in the <style> tag. I'm assuming you don't want this. Will soup.get_text() be ok for you @mrchtr ? otherwise, we'll have a long list of tags to parse

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

soup.get_text() works! Even better as my custom code.

@shayorshay
Copy link
Contributor Author

I updated the PR with the following changes:

  • option to convert html to plain text as suggested by @mrchtr
  • use apply() to process files
  • add partition_size as fix to Dask scalability issue
  • http and s3 clients for downloading files. We want to s3 in the end but for local testing, the http client is faster

@PhilippeMoussalli @RobbeSneyders

Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shayorshay, left some more comments.

@PhilippeMoussalli can you follow up this PR?

use_s3:
description: Whether to use S3 to download the commoncrawl segment file. Set to True if you are running this component on an AWS cluster.
type: bool
default: 'False'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default: 'False'
default: false

Aren't you passing a string here instead of a boolean? Same for the argument below.

Copy link
Contributor Author

@shayorshay shayorshay Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

component_spec.json currently allows only strings and integers as default values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the schema then :)


logger = logging.getLogger(__name__)

BASE_URL = "https://data.commoncrawl.org/"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is used.



def get_warc_file_using_requests(warc_file: str) -> requests.Response:
retry = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



if __name__ == "__main__":
component = DownloadCommoncrawlSegments.from_args()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outdated since #302. Can you rebase on main and update this?

def transform(
self,
df: dd.DataFrame,
use_s3: Optional[bool] = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since #302, arguments are passed to __init__ instead of transform. Can you rebase on main and update this?

if partition_size:
df = df.repartition(partition_size=f"{partition_size}MB")

df = df.reset_index(drop=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you resetting the index here?

"webpage_html",
]

if partition_size:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need an argument for this. This can be 250MB at all times. I think it's mainly the partition size before the apply that might be useful to provide an argument for. Then the user can define a smaller size so the data will still fit into memory after the apply.

"""
try:
soup = BeautifulSoup(html, "html.parser")
return soup.get_text()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there seems to be a performance bottleneck by converting the page into plain text.
I have found a different library (specially designed for this use case) that seems to be faster and more robust.
Can you check out trafilatura? https://trafilatura.readthedocs.io/en/latest/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I'll update the PR to reflect the change.

@shayorshay
Copy link
Contributor Author

I updated the PR to reflect the following changes:

  • html_text packages for extracting plain text as suggested by @mrchtr . this is significantly faster than beautifulsoup when we tested on kubeflow.
  • reverted to using a dask bag for downloading webpages. compared to dask.dataframe.apply(), this is much faster when I I tested them. I think this has to do with the flatten step at the end where we apply pd.Series.
  • updated the retry for requests according to @RobbeSneyders' suggestion.
  • updated the component to use new transform and executors.

Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shayorshay. Two final comments and we can merge.

  • reverted to using a dask bag for downloading webpages. compared to dask.dataframe.apply(), this is much faster when I I tested them. I think this has to do with the flatten step at the end where we apply pd.Series.

Can you explain this a bit more? I wouldn't really expect a bag to be faster here. Maybe it's due to the different schedulers they use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pin all the versions here?

Copy link
Contributor Author

@shayorshay shayorshay Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I retested both approaches since there've been updates on how we handle repartitioning and you're right, the apply approach is faster than the bag. Here's the time comparisons:

  1. without repartition on 10k webpages. bag - 01:12, apply - 00:56
  2. with repartition on 10k webpage. bag - 01:19, apply - 01:11

We can use the apply approach so I'll update the PR


dataframe = dataframe.reset_index(drop=True)

logger.info(f"Downloaded {len(dataframe)} webpages from Commoncrawl.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len() triggers a compute here. We might be able to use .size here instead, which is evaluated lazily, but not sure how we can combine it with logging. Also fine for me to just remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've removed it. I'll look into .size for local testing

@mrchtr
Copy link
Contributor

mrchtr commented Aug 10, 2023

Thanks for the update @shayorshay!

I have tested the component in depth and tried to make some changes to improve the performance slightly.

  • We should use trafilatura. The library offers fast and powerful html extraction including language filtering based on meta data, blocking urls, local deduplication of repetitions within a single document

  • The performance is already good. Nevertheless, machine cores are not used equally. I observed some performance peaks (repeating behaviour: CPU usage super high for some seconds, and goes almost down to zero, stays there for several seconds).

Maybe we could merge this PR already. I think it would be easier to incorporate new changes and improve the performance step wise.

@RobbeSneyders RobbeSneyders merged commit ed73f1b into ml6team:main Aug 10, 2023
5 checks passed
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
)

This PR adds the second component of the Commoncrawl pipeline. The
component downloads the WARC segment files and extracts the webpage urls
and html code to be returned as a dask dataframe.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants