Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Toggle dynamic batcher for add_samples(), others #3923

Merged
merged 11 commits into from
Dec 20, 2023
34 changes: 32 additions & 2 deletions docs/source/user_guide/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,29 @@ FiftyOne supports the configuration options described below:
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `default_batch_size` | `FIFTYONE_DEFAULT_BATCH_SIZE` | `None` | A default batch size to use when :ref:`applying models to datasets <model-zoo-apply>`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `bulk_write_batch_size` | `FIFTYONE_BULK_WRITE_BATCH_SIZE` | `100_000` | Batch size to use for bulk writing MongoDB operations; cannot be > 100,000 |
| `bulk_write_batch_size` | `FIFTYONE_BULK_WRITE_BATCH_SIZE` | `100_000` | Batch size to use for bulk writing MongoDB operations; cannot be > 100,000. |
| | | | |
| | | | Default changes to `10_000` for |
| | | | :ref:`Teams SDK in API connection mode <teams-api-connection>`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `default_batcher` | `FIFTYONE_DEFAULT_BATCHER` | `latency` | Batching implementation to use in some batched database operations such as |
swheaton marked this conversation as resolved.
Show resolved Hide resolved
| | | | :meth:`add_samples() <fiftyone.core.dataset.Dataset.add_samples>`. Supported values |
| | | | are `latency`, `size`, and `static`. |
| | | | |
| | | | `latency` is the default, which uses a dynamic batch size to achieve a target latency |
| | | | of `batcher_target_latency` between calls. `size` is the default for the |
| | | | FiftyOne Teams SDK, which targets a size of `batcher_target_size` bytes for |
| | | | each call. `static` uses a fixed batch size of `batcher_static_size`. |
| | | | |
| | | | Default changes to `size` for |
| | | | :ref:`Teams SDK in API connection mode <teams-api-connection>`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `batcher_static_size` | `FIFTYONE_BATCHER_STATIC_SIZE` | `100` | Fixed size of batches. Ignored if `default_batcher` is not `static`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `batcher_target_size_bytes` | `FIFTYONE_BATCHER_TARGET_SIZE_BYTES`| `2 ** 20` | Target content size of batches, in bytes. Ignored if `default_batcher` is not `size`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `batcher_target_latency` | `FIFTYONE_BATCHER_TARGET_LATENCY` | `0.2` | Target latency between batches, in seconds. Ignored if `default_batcher` is not |
| | | | `latency`. |
+-------------------------------+-------------------------------------+-------------------------------+----------------------------------------------------------------------------------------+
| `default_sequence_idx` | `FIFTYONE_DEFAULT_SEQUENCE_IDX` | `%06d` | The default numeric string pattern to use when writing sequential lists of |
| | | | files. |
Expand Down Expand Up @@ -138,6 +160,9 @@ and the CLI:
.. code-block:: text

{
"batcher_static_size": 100,
"batcher_target_latency": 0.2,
"batcher_target_size": 1048576,
"bulk_write_batch_size": 100000,
"database_admin": true,
"database_dir": "~/.fiftyone/var/lib/mongo",
Expand All @@ -149,6 +174,7 @@ and the CLI:
"default_app_address": null,
"default_app_port": 5151,
"default_batch_size": null,
"default_batcher": "latency",
"default_dataset_dir": "~/fiftyone",
"default_image_ext": ".jpg",
"default_ml_backend": "torch",
Expand Down Expand Up @@ -185,6 +211,9 @@ and the CLI:
.. code-block:: text

{
"batcher_static_size": 100,
"batcher_target_latency": 0.2,
"batcher_target_size": 1048576,
"bulk_write_batch_size": 100000,
"database_admin": true,
"database_dir": "~/.fiftyone/var/lib/mongo",
Expand All @@ -196,6 +225,7 @@ and the CLI:
"default_app_address": null,
"default_app_port": 5151,
"default_batch_size": null,
"default_batcher": "latency",
"default_dataset_dir": "~/fiftyone",
"default_image_ext": ".jpg",
"default_ml_backend": "torch",
Expand Down Expand Up @@ -975,4 +1005,4 @@ code snippet shows how to configure the proxy based on your instance.
# before launching the App, configure a proxy_url
fo.app_config.proxy_url = "https://<myinstance>.notebook.<region>.sagemaker.aws/proxy/<port>/"

session = fo.launch_app(port=<port>)
session = fo.launch_app(port=<port>)
24 changes: 24 additions & 0 deletions fiftyone/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ def __init__(self, d=None):
env_var="FIFTYONE_BULK_WRITE_BATCH_SIZE",
default=100_000, # mongodb limit
)
self.default_batcher = self.parse_string(
d,
"default_batcher",
env_var="FIFTYONE_DEFAULT_BATCHER",
default="latency",
)
self.batcher_static_size = self.parse_int(
d,
"batcher_static_size",
env_var="FIFTYONE_BATCHER_STATIC_SIZE",
default=100,
)
self.batcher_target_size_bytes = self.parse_int(
d,
"batcher_target_size_bytes",
env_var="FIFTYONE_BATCHER_TARGET_SIZE_BYTES",
default=2**20,
)
self.batcher_target_latency = self.parse_number(
d,
"batcher_target_latency",
env_var="FIFTYONE_BATCHER_TARGET_LATENCY",
default=0.2,
)
self.default_sequence_idx = self.parse_string(
d,
"default_sequence_idx",
Expand Down
41 changes: 18 additions & 23 deletions fiftyone/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2472,22 +2472,15 @@ def add_samples(
except:
pass

# Dynamically size batches so that they are as large as possible while
# still achieving a nice frame rate on the progress bar
batcher = fou.DynamicBatcher(
samples,
target_latency=0.2,
init_batch_size=1,
max_batch_beta=2.0,
progress=True,
total=num_samples,
batcher = fou.get_default_batcher(
samples, progress=True, total=num_samples
)

sample_ids = []
with batcher:
for batch in batcher:
_ids = self._add_samples_batch(
batch, expand_schema, dynamic, validate
batch, expand_schema, dynamic, validate, batcher=batcher
)
sample_ids.extend(_ids)

Expand All @@ -2510,7 +2503,7 @@ def add_collection(
samples refer to the same source media.

Args:
samples: a :class:`fiftyone.core.collections.SampleCollection`
sample_collection: a :class:`fiftyone.core.collections.SampleCollection`
include_info (True): whether to merge dataset-level information
such as ``info`` and ``classes``
overwrite_info (False): whether to overwrite existing dataset-level
Expand Down Expand Up @@ -2540,7 +2533,9 @@ def add_collection(
)
return self.skip(num_samples).values("id")

def _add_samples_batch(self, samples, expand_schema, dynamic, validate):
def _add_samples_batch(
self, samples, expand_schema, dynamic, validate, batcher=None
):
samples = [s.copy() if s._in_db else s for s in samples]

if self.media_type is None and samples:
Expand All @@ -2567,6 +2562,9 @@ def _add_samples_batch(self, samples, expand_schema, dynamic, validate):
if sample.media_type == fom.VIDEO:
sample.frames.save()

if batcher is not None and batcher.manual_backpressure:
batcher.apply_backpressure(dicts)

return [str(d["_id"]) for d in dicts]

def _upsert_samples(
Expand All @@ -2583,23 +2581,17 @@ def _upsert_samples(
except:
pass

# Dynamically size batches so that they are as large as possible while
# still achieving a nice frame rate on the progress bar
batcher = fou.DynamicBatcher(
samples,
target_latency=0.2,
init_batch_size=1,
max_batch_beta=2.0,
progress=True,
)
batcher = fou.get_default_batcher(samples, progress=True)

with batcher:
for batch in batcher:
self._upsert_samples_batch(
batch, expand_schema, dynamic, validate
batch, expand_schema, dynamic, validate, batcher=batcher
)

def _upsert_samples_batch(self, samples, expand_schema, dynamic, validate):
def _upsert_samples_batch(
self, samples, expand_schema, dynamic, validate, batcher=None
):
if self.media_type is None and samples:
self.media_type = _get_media_type(samples[0])

Expand Down Expand Up @@ -2630,6 +2622,9 @@ def _upsert_samples_batch(self, samples, expand_schema, dynamic, validate):
if sample.media_type == fom.VIDEO:
sample.frames.save()

if batcher is not None and batcher.manual_backpressure:
batcher.apply_backpressure(dicts)

def _make_dict(self, sample, include_id=False):
d = sample.to_mongo_dict(include_id=include_id)

Expand Down
14 changes: 4 additions & 10 deletions fiftyone/core/odm/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,23 +755,17 @@ def insert_documents(docs, coll, ordered=False, progress=False, num_docs=None):
a list of IDs of the inserted documents
"""
ids = []
batcher = fou.get_default_batcher(docs, progress=progress, total=num_docs)

try:
batcher = fou.DynamicBatcher(
docs,
target_latency=0.2,
init_batch_size=1,
max_batch_beta=2.0,
max_batch_size=100000, # mongodb limit
progress=progress,
total=num_docs,
)

with batcher:
for batch in batcher:
batch = list(batch)
coll.insert_many(batch, ordered=ordered)
ids.extend(b["_id"] for b in batch)
if batcher.manual_backpressure:
batcher.apply_backpressure(batch)

except BulkWriteError as bwe:
msg = bwe.details["writeErrors"][0]["errmsg"]
raise ValueError(msg) from bwe
Expand Down
Loading
Loading