Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use buffer write file simpl #3

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (

var _ eds.AccessorStreamer = (*ODSFile)(nil)

// writeBufferSize defines buffer size for optimized batched writes into the file system.
// TODO(@Wondertan): Consider making it configurable
const writeBufferSize = 64 << 10

// ErrEmptyFile signals that the ODS file is empty.
// This helps avoid storing empty block EDSes.
var ErrEmptyFile = errors.New("file is empty")
Expand Down Expand Up @@ -73,11 +77,19 @@ func CreateODSFile(
return nil, fmt.Errorf("file create: %w", err)
}

h, err := writeODSFile(f, eds, roots)
// buffering gives us ~4x speed up
buf := bufio.NewWriterSize(f, writeBufferSize)

h, err := writeODSFile(buf, eds, roots)
if err != nil {
return nil, fmt.Errorf("writing ODS file: %w", err)
}

err = buf.Flush()
if err != nil {
return nil, fmt.Errorf("flushing ODS file: %w", err)
}

err = f.Sync()
if err != nil {
return nil, fmt.Errorf("syncing file: %w", err)
Expand Down Expand Up @@ -110,49 +122,43 @@ func writeODSFile(w io.Writer, eds *rsmt2d.ExtendedDataSquare, axisRoots *share.
}

// write quadrants
err = writeQuadrant(w, eds, int(h.shareSize), 0)
err = writeQ1(w, eds)
if err != nil {
return nil, fmt.Errorf("writing Q1: %w", err)
}

return h, nil
}

// writeQuadrant writes the quadrant of the square to the writer. it writes the quadrant in row-major
// order. It uses buffer to write the shares in bulk (row by row), which improves the write performance.
func writeQuadrant(w io.Writer, eds *rsmt2d.ExtendedDataSquare, shareSize, quadrantIdx int) error {
fromRow := quadrantIdx / 2 * int(eds.Width()) / 2
toRow := fromRow + int(eds.Width())/2

fromCol := quadrantIdx % 2 * int(eds.Width()) / 2
toCol := fromCol + int(eds.Width())/2

buf := bufio.NewWriterSize(w, shareSize*int(eds.Width()))
for rowIdx := fromRow; rowIdx < toRow; rowIdx++ {
for colIdx := fromCol; colIdx < toCol; colIdx++ {
share := eds.GetCell(uint(rowIdx), uint(colIdx))
_, err := buf.Write(share)
// writeQ1 writes the first quadrant of the square to the writer. It writes the quadrant in row-major
// order
func writeQ1(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
for i := range eds.Width() / 2 {
for j := range eds.Width() / 2 {
shr := eds.GetCell(i, j) // TODO: Avoid copying inside GetCell
_, err := w.Write(shr)
if err != nil {
return fmt.Errorf("writing shares: %w", err)
return fmt.Errorf("writing share: %w", err)
}
}
}
err := buf.Flush()
if err != nil {
return fmt.Errorf("flushing buffer: %w", err)
}
return nil
}

// writeAxisRoots writes RowRoots followed by ColumnRoots.
func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
buf := make([]byte, 0, share.AxisRootSize*len(roots.RowRoots))
for _, roots := range [][][]byte{roots.RowRoots, roots.ColumnRoots} {
for _, root := range roots {
buf = append(buf, root...)
for _, root := range roots.RowRoots {
if _, err := w.Write(root); err != nil {
return fmt.Errorf("writing row roots: %w", err)
}
}
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("writing axis roots: %w", err)

for _, root := range roots.ColumnRoots {
if _, err := w.Write(root); err != nil {
return fmt.Errorf("writing columm roots: %w", err)
}
}

return nil
}

Expand Down
32 changes: 31 additions & 1 deletion store/file/q1q4_file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package file

import (
"bufio"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -39,16 +40,45 @@ func CreateQ1Q4File(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDat
return nil, err
}

err = writeQuadrant(ods.fl, eds, int(ods.hdr.shareSize), 3)
// buffering gives us ~4x speed up
buf := bufio.NewWriterSize(ods.fl, writeBufferSize)

err = writeQ4(buf, eds)
if err != nil {
return nil, fmt.Errorf("writing Q4: %w", err)
}

err = buf.Flush()
if err != nil {
return nil, fmt.Errorf("flushing Q4: %w", err)
}

err = ods.fl.Sync()
if err != nil {
return nil, fmt.Errorf("syncing file: %w", err)
}

return &Q1Q4File{
ods: ods,
}, nil
}

// writeQ4 writes the frth quadrant of the square to the writer. iIt writes the quadrant in row-major
// order
func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
half := eds.Width() / 2
for i := range half {
for j := range half {
shr := eds.GetCell(i+half, j+half) // TODO: Avoid copying inside GetCell
_, err := w.Write(shr)
if err != nil {
return fmt.Errorf("writing share: %w", err)
}
}
}
return nil
}

func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}
Expand Down
Loading