Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
Move proto-dag reader to separate file and change name
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protonmail.ch>
  • Loading branch information
Kubuxu committed Nov 21, 2016
1 parent b736dc1 commit 72df8b2
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 250 deletions.
2 changes: 1 addition & 1 deletion archive/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error
return err
}

dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag)
dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag)
if _, err := dagr.WriteTo(w.TarW); err != nil {
return err
}
Expand Down
250 changes: 1 addition & 249 deletions io/dagreader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package io

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"

mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
Expand All @@ -27,36 +25,6 @@ type DagReader interface {
Offset() int64
}

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv mdag.DAGService

// the node being read
node *mdag.ProtoNode

// cached protobuf structure from node.Data
pbdata *ftpb.Data

// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf ReadSeekCloser

// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter

// the index of the child link currently being read from
linkPosition int

// current offset for the read head within the 'file'
offset int64

// Our context
ctx context.Context

// context cancel for children
cancel func()
}

type ReadSeekCloser interface {
io.Reader
io.Seeker
Expand All @@ -83,7 +51,7 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagRe
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw:
return NewDataFileReader(ctx, n, pb, serv), nil
return NewPBFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
Expand All @@ -107,219 +75,3 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagRe
return nil, fmt.Errorf("unrecognized node type")
}
}

func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
return &pbDagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.linkPosition++

switch nxt := nxt.(type) {
case *mdag.ProtoNode:
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data(), pb)
if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}

switch pb.GetType() {
case ftpb.Data_Directory:
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
return nil
case ftpb.Data_Metadata:
return errors.New("shouldnt have had metadata object inside file")
case ftpb.Data_Symlink:
return errors.New("shouldnt have had symlink inside file")
default:
return ft.ErrUnrecognizedType
}
case *mdag.RawNode:
dr.buf = NewRSNCFromBytes(nxt.RawData())
return nil
default:
return errors.New("unrecognized node type in pbDagReader")
}
}

// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
func (dr *pbDagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
dr.offset += int64(n)
if err != nil {
// EOF is expected
if err != io.EOF {
return total, err
}
}

// If weve read enough bytes, return
if total == len(b) {
return total, nil
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
}
}

func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}

func (dr *pbDagReader) Close() error {
dr.cancel()
return nil
}

func (dr *pbDagReader) Offset() int64 {
return dr.offset
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
if offset < 0 {
return -1, errors.New("Invalid offset")
}

// Grab cached protobuf object (solely to make code look cleaner)
pb := dr.pbdata

// left represents the number of bytes remaining to seek to (from beginning)
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])

// start reading links from the beginning
dr.linkPosition = 0
dr.offset = offset
return offset, nil
} else {
// skip past root block data
left -= int64(len(pb.Data))
}

// iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(pb.Blocksizes[i])
}
}

// start sub-block request
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}

// set proper offset within child readseeker
n, err := dr.buf.Seek(left, os.SEEK_SET)
if err != nil {
return -1, err
}

// sanity
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
// TODO: be smarter here
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
noffset := int64(dr.pbdata.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
}
}

// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type readSeekNopCloser struct {
*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }
Loading

0 comments on commit 72df8b2

Please sign in to comment.