Skip to content

Commit

Permalink
Addressing issues raised in PR
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Sep 11, 2019
1 parent cd0ed62 commit c9b3e09
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
2 changes: 1 addition & 1 deletion elastic/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
}

func (es *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
return util.SteamBatch(stream, es.AddVertex, es.AddEdge)
return util.SteamBatch(stream, 50, es.AddVertex, es.AddEdge)
}

// DelEdge deletes edge `eid`
Expand Down
7 changes: 5 additions & 2 deletions kvgraph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,14 @@ func insertEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gr
// in the graph, it is replaced
func (kgdb *KVInterfaceGDB) AddEdge(edges []*gripql.Edge) error {
err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error {
var anyErr error
for _, edge := range edges {
insertEdge(tx, kgdb.kvg.idx, kgdb.graph, edge)
if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, edge); err != nil {
anyErr = err
}
}
kgdb.kvg.ts.Touch(kgdb.graph)
return nil
return anyErr
})
return err
}
Expand Down
2 changes: 1 addition & 1 deletion mongo/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (mg *Graph) AddEdge(edges []*gripql.Edge) error {
}

func (mg *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
return util.SteamBatch(stream, mg.AddVertex, mg.AddEdge)
return util.SteamBatch(stream, 50, mg.AddVertex, mg.AddEdge)
}

// deleteConnectedEdges deletes edges where `from` or `to` equal `key`
Expand Down
2 changes: 1 addition & 1 deletion psql/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (g *Graph) AddEdge(edges []*gripql.Edge) error {
}

func (g *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
return util.SteamBatch(stream, g.AddVertex, g.AddEdge)
return util.SteamBatch(stream, 50, g.AddVertex, g.AddEdge)
}

// DelVertex is not implemented in the SQL driver
Expand Down
16 changes: 7 additions & 9 deletions util/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ func newGraphElementArray(vertexBufSize, edgeBufSize int) *graphElementArray {

// SteamBatch a stream of inputs and loads them into the graph
// This function assumes incoming stream is GraphElemnts from a single graph
func SteamBatch(stream <-chan *gripql.GraphElement, vertexAdd func([]*gripql.Vertex) error, edgeAdd func([]*gripql.Edge) error) error {
vertexBatchSize := 50
edgeBatchSize := 50
func SteamBatch(stream <-chan *gripql.GraphElement, batchSize int, vertexAdd func([]*gripql.Vertex) error, edgeAdd func([]*gripql.Edge) error) error {

vertCount := 0
edgeCount := 0
Expand Down Expand Up @@ -60,14 +58,14 @@ func SteamBatch(stream <-chan *gripql.GraphElement, vertexAdd func([]*gripql.Ver
closeChan <- true
}()

vertexBatch := newGraphElementArray(vertexBatchSize, 0)
edgeBatch := newGraphElementArray(0, edgeBatchSize)
vertexBatch := newGraphElementArray(batchSize, 0)
edgeBatch := newGraphElementArray(0, batchSize)
var loopErr error
for element := range stream {
if element.Vertex != nil {
if len(vertexBatch.vertices) >= vertexBatchSize {
if len(vertexBatch.vertices) >= batchSize {
vertexBatchChan <- vertexBatch
vertexBatch = newGraphElementArray(vertexBatchSize, 0)
vertexBatch = newGraphElementArray(batchSize, 0)
}
vertex := element.Vertex
err := vertex.Validate()
Expand All @@ -77,9 +75,9 @@ func SteamBatch(stream <-chan *gripql.GraphElement, vertexAdd func([]*gripql.Ver
vertexBatch.vertices = append(vertexBatch.vertices, vertex)
vertCount++
} else if element.Edge != nil {
if len(edgeBatch.edges) >= edgeBatchSize {
if len(edgeBatch.edges) >= batchSize {
edgeBatchChan <- edgeBatch
edgeBatch = newGraphElementArray(0, edgeBatchSize)
edgeBatch = newGraphElementArray(0, batchSize)
}
edge := element.Edge
if edge.Gid == "" {
Expand Down

0 comments on commit c9b3e09

Please sign in to comment.