Skip to content

Commit

Permalink
add iceberg and delta
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Aug 26, 2024
1 parent 1d0e44c commit f1a479b
Show file tree
Hide file tree
Showing 73 changed files with 1,660 additions and 142 deletions.
152 changes: 57 additions & 95 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type FileSysClient interface {
FsType() dbio.Type
GetReader(path string) (reader io.Reader, err error)
GetReaders(paths ...string) (readers []io.Reader, err error)
GetDatastream(path string) (ds *iop.Datastream, err error)
GetDatastream(path string, cfg ...FileStreamConfig) (ds *iop.Datastream, err error)
GetWriter(path string) (writer io.Writer, err error)
Buckets() (paths []string, err error)
List(path string) (paths FileNodes, err error)
Expand Down Expand Up @@ -393,83 +393,12 @@ func (fs *BaseFileSysClient) Prefix(suffix ...string) string {

// Query queries the file system via duckdb
func (fs *BaseFileSysClient) Query(uri, sql string) (data iop.Dataset, err error) {
duck := iop.NewDuckDb(fs.context.Ctx)
props := g.MapToKVArr(map[string]string{"fs_props": g.Marshal(fs.Props())})
duck := iop.NewDuckDb(fs.context.Ctx, props...)
duck.AddExtension("iceberg")
duck.AddExtension("delta")

err = duck.Open()
if err != nil {
return data, g.Error(err, "could not open duckdb")
}

// add credentials to the query

// map from sling to duckdb
var secretKeyMap map[string]string
secretProps := []string{}
scopeScheme := fs.FsType().String()

switch fs.FsType() {
case dbio.TypeFileS3:
secretKeyMap = map[string]string{
"ACCESS_KEY_ID": "KEY_ID",
"SECRET_ACCESS_KEY": "SECRET",
"BUCKET": "SCOPE",
"REGION": "REGION",
"SESSION_TOKEN": "SESSION_TOKEN",
"ENDPOINT": "ENDPOINT",
}

if strings.Contains(fs.GetProp("endpoint"), "r2.cloudflarestorage.com") {
accountID := strings.Split(fs.GetProp("endpoint"), ".")[0]
secretProps = append(secretProps, "ACCOUNT_ID "+accountID)
secretProps = append(secretProps, "TYPE R2")
scopeScheme = "r2"
sql = strings.ReplaceAll(sql, "s3://", "r2://")
} else {
secretProps = append(secretProps, "TYPE S3")
}

case dbio.TypeFileGoogle:
secretKeyMap = map[string]string{
"ACCESS_KEY_ID": "KEY_ID",
"SECRET_ACCESS_KEY": "SECRET",
}
secretProps = append(secretProps, "TYPE GCS")
scopeScheme = "gcs"
sql = strings.ReplaceAll(sql, "gs://", "gcs://")

case dbio.TypeFileAzure:
secretKeyMap = map[string]string{
"CONN_STR": "CONNECTION_STRING",
"ACCOUNT": "ACCOUNT_NAME",
}
secretProps = append(secretProps, "TYPE AZURE")

case dbio.TypeFileLocal:
// nothing to do

default:
return data, g.Error("unknown file system type for querying")
}

// populate secret props and make secret sql
if len(secretProps) > 0 {
for slingKey, duckdbKey := range secretKeyMap {
if val := fs.GetProp(slingKey); val != "" {
if duckdbKey == "SCOPE" {
val = scopeScheme + "://" + val
}
duckdbVal := "'" + val + "'" // add quotes
secretProps = append(secretProps, g.F("%s %s", duckdbKey, duckdbVal))
}
}
secretSQL := g.R(
"create secret {name} ({key_vals})",
"name", strings.ToUpper(fs.GetProp("name")),
"key_vals", strings.Join(secretProps, ",\n "),
)

sql = secretSQL + sql
}
_ = duck.PrepareFsSecretAndURI(uri)

data, err = duck.Query(sql)
if err != nil {
Expand Down Expand Up @@ -530,17 +459,21 @@ func (fs *BaseFileSysClient) GetRefTs() time.Time {
}

// GetDatastream return a datastream for the given path
func (fs *BaseFileSysClient) GetDatastream(urlStr string) (ds *iop.Datastream, err error) {
func (fs *BaseFileSysClient) GetDatastream(uri string, cfg ...FileStreamConfig) (ds *iop.Datastream, err error) {
Cfg := FileStreamConfig{} // infinite
if len(cfg) > 0 {
Cfg = cfg[0]
}

ds = iop.NewDatastreamContext(fs.Context().Ctx, nil)
ds.SafeInference = true
ds.SetMetadata(fs.GetProp("METADATA"))
ds.Metadata.StreamURL.Value = urlStr
ds.Metadata.StreamURL.Value = uri
ds.SetConfig(fs.Props())

fileFormat := FileType(cast.ToString(fs.GetProp("FORMAT")))
if string(fileFormat) == "" {
fileFormat = InferFileFormat(urlStr)
fileFormat = InferFileFormat(uri)
}

go func() {
Expand All @@ -556,8 +489,24 @@ func (fs *BaseFileSysClient) GetDatastream(urlStr string) (ds *iop.Datastream, e
defer fs.Context().Wg.Read.Done()
fs.Context().Wg.Read.Add()

g.Debug("reading datastream from %s [format=%s]", urlStr, fileFormat)
reader, err := fs.Self().GetReader(urlStr)
g.Debug("reading datastream from %s [format=%s]", uri, fileFormat)

// no reader needed for iceberg, delta, duckdb will handle it
if g.In(fileFormat, FileTypeIceberg, FileTypeDelta) {
switch fileFormat {
case FileTypeIceberg:
err = ds.ConsumeIcebergReader(uri, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
case FileTypeDelta:
err = ds.ConsumeDeltaReader(uri, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
}

if err != nil {
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", uri))
}
return
}

reader, err := fs.Self().GetReader(uri)
if err != nil {
ds.Context.CaptureErr(g.Error(err, "error getting reader"))
return
Expand Down Expand Up @@ -598,7 +547,7 @@ func (fs *BaseFileSysClient) GetDatastream(urlStr string) (ds *iop.Datastream, e
}

if err != nil {
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", urlStr))
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", uri))
}

}()
Expand All @@ -613,6 +562,10 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...FileStreamConfig) (
Cfg = cfg[0]
}

if Cfg.Format == FileTypeNone {
Cfg.Format = FileType(strings.ToLower(cast.ToString(fs.GetProp("FORMAT"))))
}

if strings.HasSuffix(strings.ToLower(url), ".zip") {
localFs, err := NewFileSysClient(dbio.TypeFileLocal)
if err != nil {
Expand Down Expand Up @@ -651,12 +604,18 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...FileStreamConfig) (
return df, nil
}

g.Trace("listing path: %s", url)
nodes, err := fs.Self().ListRecursive(url)
if err != nil {
err = g.Error(err, "Error getting paths")
return
var nodes FileNodes
if g.In(Cfg.Format, FileTypeIceberg, FileTypeDelta) {
nodes = FileNodes{FileNode{URI: url}}
} else {
g.Trace("listing path: %s", url)
nodes, err = fs.Self().ListRecursive(url)
if err != nil {
err = g.Error(err, "Error getting paths")
return
}
}

df, err = GetDataflow(fs.Self(), nodes, Cfg)
if err != nil {
err = g.Error(err, "error getting dataflow")
Expand Down Expand Up @@ -1002,11 +961,15 @@ func Delete(fs FileSysClient, uri string) (err error) {
type FileStreamConfig struct {
Limit int
Select []string
Format FileType
}

// GetDataflow returns a dataflow from specified paths in specified FileSysClient
func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *iop.Dataflow, err error) {
fileFormat := FileType(strings.ToLower(cast.ToString(fs.GetProp("FORMAT"))))
if cfg.Format == FileTypeNone {
cfg.Format = FileType(strings.ToLower(cast.ToString(fs.GetProp("FORMAT"))))
}

if len(nodes) == 0 {
err = g.Error("Provided 0 files for: %#v", nodes)
return
Expand All @@ -1015,7 +978,6 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i
df = iop.NewDataflowContext(fs.Context().Ctx, cfg.Limit)
dsCh := make(chan *iop.Datastream)
fs.setDf(df)
fs.SetProp("selectFields", g.Marshal(cfg.Select))

go func() {
defer close(dsCh)
Expand All @@ -1024,7 +986,7 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i

pushDatastream := func(ds *iop.Datastream) {
// use selected fields only when not parquet
skipSelect := g.In(fs.GetProp("FORMAT"), string(FileTypeParquet))
skipSelect := g.In(cfg.Format, FileTypeParquet, FileTypeIceberg, FileTypeDelta)
if len(cfg.Select) > 1 && !skipSelect {
cols := iop.NewColumnsFromFields(cfg.Select...)
fm := ds.Columns.FieldMap(true)
Expand All @@ -1045,7 +1007,7 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i
}
}

if allowMerging && (fileFormat.IsJson() || isFiletype(FileTypeJson, nodes.URIs()...) || isFiletype(FileTypeJsonLines, nodes.URIs()...)) {
if allowMerging && (cfg.Format.IsJson() || isFiletype(FileTypeJson, nodes.URIs()...) || isFiletype(FileTypeJsonLines, nodes.URIs()...)) {
ds, err := MergeReaders(fs, FileTypeJson, nodes, cfg.Limit)
if err != nil {
df.Context.CaptureErr(g.Error(err, "Unable to merge paths at %s", fs.GetProp("url")))
Expand All @@ -1056,7 +1018,7 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i
return // done
}

if allowMerging && (fileFormat == FileTypeXml || isFiletype(FileTypeXml, nodes.URIs()...)) {
if allowMerging && (cfg.Format == FileTypeXml || isFiletype(FileTypeXml, nodes.URIs()...)) {
ds, err := MergeReaders(fs, FileTypeXml, nodes, cfg.Limit)
if err != nil {
df.Context.CaptureErr(g.Error(err, "Unable to merge paths at %s", fs.GetProp("url")))
Expand All @@ -1068,7 +1030,7 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i
}

// csvs
if allowMerging && (fileFormat == FileTypeCsv || isFiletype(FileTypeCsv, nodes.URIs()...)) {
if allowMerging && (cfg.Format == FileTypeCsv || isFiletype(FileTypeCsv, nodes.URIs()...)) {
ds, err := MergeReaders(fs, FileTypeCsv, nodes, cfg.Limit)
if err != nil {
df.Context.CaptureErr(g.Error(err, "Unable to merge paths at %s", fs.GetProp("url")))
Expand All @@ -1084,7 +1046,7 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg FileStreamConfig) (df *i
continue
}

ds, err := fs.GetDatastream(path)
ds, err := fs.GetDatastream(path, cfg)
if err != nil {
df.Context.CaptureErr(g.Error(err, "Unable to process "+path))
return
Expand Down
31 changes: 25 additions & 6 deletions core/dbio/filesys/fs_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func (fs *LocalFileSysClient) GetReader(uri string) (reader io.Reader, err error
}

// GetDatastream return a datastream for the given path
func (fs *LocalFileSysClient) GetDatastream(uri string) (ds *iop.Datastream, err error) {
func (fs *LocalFileSysClient) GetDatastream(uri string, cfg ...FileStreamConfig) (ds *iop.Datastream, err error) {
Cfg := FileStreamConfig{}
if len(cfg) > 0 {
Cfg = cfg[0]
}

path, err := fs.GetPath(uri)
if err != nil {
err = g.Error(err, "Error Parsing url: "+uri)
Expand All @@ -116,12 +121,10 @@ func (fs *LocalFileSysClient) GetDatastream(uri string) (ds *iop.Datastream, err
ds.SetConfig(fs.Props())

// set selectFields for pruning at source
selectFields := []string{}
g.Unmarshal(fs.GetProp("selectFields"), &selectFields)
ds.Columns = iop.NewColumnsFromFields(selectFields...)
ds.Columns = iop.NewColumnsFromFields(Cfg.Select...)

fileFormat := FileType(cast.ToString(fs.GetProp("FORMAT")))
if string(fileFormat) == "" {
fileFormat := Cfg.Format
if fileFormat == FileTypeNone {
fileFormat = InferFileFormat(path)
}

Expand All @@ -140,6 +143,22 @@ func (fs *LocalFileSysClient) GetDatastream(uri string) (ds *iop.Datastream, err

g.Debug("reading datastream from %s [format=%s]", path, fileFormat)

// no reader for iceberg, delta, duckdb will handle it
if g.In(fileFormat, FileTypeIceberg, FileTypeDelta) {
file.Close() // no need to keep the file open

switch fileFormat {
case FileTypeIceberg:
err = ds.ConsumeIcebergReader("file://"+path, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
case FileTypeDelta:
err = ds.ConsumeDeltaReader("file://"+path, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
}
if err != nil {
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", path))
}
return
}

switch fileFormat {
case FileTypeJson, FileTypeJsonLines:
err = ds.ConsumeJsonReader(bufio.NewReader(file))
Expand Down
Loading

0 comments on commit f1a479b

Please sign in to comment.