Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream multipart PUT operation #626

Closed
postalC opened this issue Feb 13, 2018 · 3 comments · Fixed by #703
Closed

Stream multipart PUT operation #626

postalC opened this issue Feb 13, 2018 · 3 comments · Fixed by #703

Comments

@postalC
Copy link

postalC commented Feb 13, 2018

when i put huge file using minio-py

_stream_put_object

it is good for some small file but not so good large file, since it is read the entire data-stream as partition park into memory before the parallel upload, and could be an issue for running the application in small instance.
screen shot 2018-02-13 at 2 24 22 pm

Shall it only read the data stream partition (part-by-part) while uploading? it will reduce unnecessary memory piling-up in application that queuing to be sent.

So, what i did is following:

  1. Override _stream_put_object
  2. De-couple (remove) the parts reading/generating
  3. Make it as a generator that return the part_data in batch, the parallel_run will need to call it every-time to return the next value

Read data in batch

# -- Batch Data Read --
def data_batch(bucket_name, object_name, upload_id,
               data, total_parts_count, part_size, last_part_size):
    """
    Read Raw Data by Batch
    :param bucket_name: Bucket to retrieve object from
    :param object_name: Name of object to retrieve
    :param upload_id: Unique Id for upload
    :param data: Raw data request stream
    :param total_parts_count: Number of part
    :param part_size: Size of each part
    :param last_part_size:  Size of last part
    :return: part number, part_data
    """
    for part_number in range(1, total_parts_count + 1):
        current_part_size = (part_size if part_number < total_parts_count
                             else last_part_size)
        part_data = read_full(data, current_part_size)
        yield bucket_name, object_name, upload_id, part_number, part_data

On the original code just call the existing parallel_run and pass it the data batch function

        # Stream parts upload in parallel <= current_part_size until
        # part_number reaches total_parts_count calculated for the
        # given size. Additionally part_manager() also provides
        # md5digest and sha256digest for the partitioned data.
        try:
            pool.parallel_run(self._upload_part_routine,
                              data_batch(bucket_name, object_name, upload_id,
                                         data, total_parts_count,
                                         part_size, last_part_size))
        except:
            # Any exception that occurs sends an abort on the
            # on-going multipart operation.
            self._remove_incomplete_upload(bucket_name,
                                           object_name,
                                           upload_id)
            raise

Does it make sense? it also could useful to others. Your input and feedback would be much appreciated. thanks

@vadmeste
Copy link
Member

yeah using yielding is a good idea to reduce memory consumption, can you send a PR for that @postalC ?

@cugu
Copy link

cugu commented May 29, 2018

This is not enough for real streaming. In ThreadPool.parallel_run() all data if buffered in the tasks_queue (https://github.com/minio/minio-py/blob/master/minio/thread_pool.py#L74:L75). This results in errors when you send files that are larger than memory.

@cugu
Copy link

cugu commented May 29, 2018

A working, but unpolished solution is:

api.py (excerpt)

        # Instantiate a thread pool with 3 worker threads
        pool = ThreadPool(_PARALLEL_UPLOADERS)

        # Run parts upload in parallel
        try:
            pool.parallel_run(self._upload_part_routine, bucket_name, object_name, upload_id,
                                          data, total_parts_count,
                                          part_size, last_part_size)
        except:
            # Any exception that occurs sends an abort on the
            # on-going multipart operation.
            self._remove_incomplete_upload(bucket_name,
                                           object_name,
                                           upload_id)
            raise

thread_pool.py

from threading import Thread, Lock
from .compat import queue
from .helpers import read_full

class LockedIterator:
    def __init__(self, it):
        self.lock = Lock()
        self.it = it.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        self.lock.acquire()
        try:
            return self.it.__next__()
        finally:
            self.lock.release()

def data_batch(bucket_name, object_name, upload_id,
               data, total_parts_count, part_size, last_part_size):
    """
    Read Raw Data by Batch
    :param bucket_name: Bucket to retrieve object from
    :param object_name: Name of object to retrieve
    :param upload_id: Unique Id for upload
    :param data: Raw data request stream
    :param total_parts_count: Number of part
    :param part_size: Size of each part
    :param last_part_size:  Size of last part
    :return: part number, part_data
    """
    for part_number in range(1, total_parts_count + 1):
        current_part_size = (part_size if part_number < total_parts_count
                             else last_part_size)
        part_data = read_full(data, current_part_size)
        yield bucket_name, object_name, upload_id, part_number, part_data

class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """

    def __init__(self, func, it, results_queue, exceptions_queue, lock):
        Thread.__init__(self)
        self.func = func
        self.it = it
        self.results_queue = results_queue
        self.exceptions_queue = exceptions_queue
        self.daemon = True
        self.lock = lock
        self.start()

    def run(self):
        fast_quit = False
        for job in self.it:
            if not fast_quit:
                try:
                    result = self.func(job)
                    self.results_queue.put(result)
                except Exception as e:
                    self.exceptions_queue.put(e)
                    fast_quit = True
            # Mark this task as done, whether an exception happened or not
            # self.tasks_queue.task_done()


class ThreadPool:
    """ Pool of threads consuming tasks from a queue """

    def __init__(self, num_threads):
        self.results_queue = queue()
        self.exceptions_queue = queue()
        self.tasks_queue = queue()
        self.num_threads = num_threads

    def parallel_run(self, func, bucket_name, object_name, upload_id, data, total_parts_count, part_size, last_part_size):
        workers = []
        lock = Lock()
        it = data_batch(bucket_name, object_name, upload_id, data, total_parts_count, part_size, last_part_size)
        it = LockedIterator(it)
        for _ in range(self.num_threads):
            workers.append(Worker(func, it, self.results_queue, self.exceptions_queue, lock))

        # Wait for completion of all the tasks in the queue
        for worker in workers:
            worker.join()

        # Check if one of the thread raised an exception, if yes
        # raise it here in the function
        if not self.exceptions_queue.empty():
            raise self.exceptions_queue.get()

    def result(self):
        """ Return the result of all called tasks """
        return self.results_queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants