Skip to content

Commit

Permalink
fix #416. Improve the time it takes to drop an entire database
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Apr 7, 2014
1 parent 9dce95e commit b303a1d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

- [Issue #310](https://github.com/influxdb/influxdb/issues/310). Request should support multiple timeseries
- [Issue #416](https://github.com/influxdb/influxdb/issues/416). Improve the time it takes to drop database

### Bugfixes

Expand Down
28 changes: 14 additions & 14 deletions src/datastore/leveldb_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,15 @@ func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.Q
}

func (self *LevelDbShard) DropDatabase(database string) error {
wb := levigo.NewWriteBatch()
defer wb.Close()

seriesNames := self.getSeriesForDatabase(database)
for _, name := range seriesNames {
if err := self.dropSeries(database, name); err != nil {
log.Error("DropDatabase: ", err)
return err
}

}

return self.db.Write(self.writeOptions, wb)
self.compact()
return nil
}

func (self *LevelDbShard) IsClosed() bool {
Expand Down Expand Up @@ -364,7 +361,12 @@ func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, proces
func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
database := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
return self.dropSeries(database, series)
err := self.dropSeries(database, series)
if err != nil {
return err
}
self.compact()
return nil
}

func (self *LevelDbShard) dropSeries(database, series string) error {
Expand Down Expand Up @@ -415,15 +417,13 @@ func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, sta
ro := levigo.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
rangesToCompact := make([]*levigo.Range, 0)
for _, field := range fields {
it := self.db.NewIterator(ro)
defer it.Close()
wb := levigo.NewWriteBatch()
defer wb.Close()

startKey := append(field.Id, startTimeBytes...)
endKey := startKey
it.Seek(startKey)
if it.Valid() {
if !bytes.Equal(it.Key()[:8], field.Id) {
Expand All @@ -449,20 +449,20 @@ func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, sta
count = 0
wb.Clear()
}
endKey = k
}
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
rangesToCompact = append(rangesToCompact, &levigo.Range{startKey, endKey})
}
for _, r := range rangesToCompact {
self.db.CompactRange(*r)
}
return nil
}

func (self *LevelDbShard) compact() {
log.Info("Compacting shard")
self.db.CompactRange(levigo.Range{})
}

func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
return self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes)
Expand Down

0 comments on commit b303a1d

Please sign in to comment.