Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Commit

Permalink
Add index stats (#37)
Browse files Browse the repository at this point in the history
* Add index stats

* Update example

* Fix typo

* Update README.md

* Update CHANGES.md
  • Loading branch information
mosuka authored Mar 18, 2019
1 parent 525ac6d commit e6b42da
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 78 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added

- Add index stats #37
- Add Wikipedia example #35
- Support cznicb and leveldb #34
- Add logging #33
Expand All @@ -23,7 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- update Makefile #31


## [0.4.0] - 2019-03-13
## [0.4.0] - 2019-03-14

### Changed

Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,18 @@ $ docker exec -it blast-index1 blast-index node --grpc-addr=:5050
This section explain how to index Wikipedia dump to Blast.


### Download wikipedia dump
### Install wikiextractor

```bash
$ curl -o ~/tmp/enwiki-20190101-pages-articles.xml.bz2 https://dumps.wikimedia.org/enwiki/20190101/enwiki-20190101-pages-articles.xml.bz2
$ cd ${HOME}
$ git clone git@github.com:attardi/wikiextractor.git
```


### Install wikiextractor
### Download wikipedia dump

```bash
$ cd ${HOME}
$ git clone git@github.com:attardi/wikiextractor.git
$ curl -o ~/tmp/enwiki-20190101-pages-articles.xml.bz2 https://dumps.wikimedia.org/enwiki/20190101/enwiki-20190101-pages-articles.xml.bz2
```


Expand All @@ -613,10 +613,12 @@ $ ./WikiExtractor.py -o ~/tmp/enwiki --json ~/tmp/enwiki-20190101-pages-articles
```bash
$ for FILE in $(find ~/tmp/enwiki -type f -name '*' | sort)
do
echo "${FILE}"
cat ${FILE} | while read -r LINE; do
TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ")
ID=$(echo ${LINE} | jq -r .id)
FIELDS=$(echo ${LINE} | jq -c -r '{url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'"}')
FIELDS=$(echo "${LINE}" | jq -c -r '{url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'"}')
echo "- ${ID} ${FIELDS}"
curl -X PUT "http://127.0.0.1:8080/documents/${ID}" -d "${FIELDS}"
done
done
Expand Down
15 changes: 13 additions & 2 deletions cmd/blast-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func main() {
Usage: "gRPC address to connect to",
},
},
//ArgsUsage: "[id]",
Action: execCluster,
},
{
Expand Down Expand Up @@ -244,13 +243,25 @@ func main() {
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-addr, g",
Value: "gRPC :5050",
Value: ":5050",
Usage: "address to connect to",
},
},
ArgsUsage: "[id]",
Action: execDelete,
},
{
Name: "stats",
Usage: "Get a index stats",
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-addr, g",
Value: ":5050",
Usage: "address to connect to",
},
},
Action: execStats,
},
}

cli.HelpFlag = cli.BoolFlag{
Expand Down
67 changes: 67 additions & 0 deletions cmd/blast-index/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2019 Minoru Osuka
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"errors"
"fmt"
"os"

"github.com/mosuka/blast/index"
"github.com/mosuka/blast/protobuf"
"github.com/urfave/cli"
)

func execStats(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")

client, err := index.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
err := client.Close()
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()

resp, err := client.GetIndexStats()
if err != nil {
return err
}

// Any -> map[string]interface{}
var statsMap *map[string]interface{}
statsInstance, err := protobuf.MarshalAny(resp.Stats)
if err != nil {
return err
}
if statsInstance == nil {
return errors.New("nil")
}
statsMap = statsInstance.(*map[string]interface{})

// map[string]interface -> []byte
fieldsBytes, err := json.MarshalIndent(statsMap, "", " ")
if err != nil {
return err
}

fmt.Fprintln(os.Stdout, fmt.Sprintf("%v\n", string(fieldsBytes)))

return nil
}
11 changes: 11 additions & 0 deletions index/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,14 @@ func (c *GRPCClient) Delete(doc *index.Document, opts ...grpc.CallOption) error

return nil
}

func (c *GRPCClient) GetIndexStats(opts ...grpc.CallOption) (*index.Stats, error) {
resp, err := c.client.GetStats(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

return resp.Stats, nil
}
18 changes: 18 additions & 0 deletions index/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,21 @@ func (s *GRPCService) Delete(ctx context.Context, req *index.DeleteRequest) (*em

return resp, nil
}

func (s *GRPCService) GetStats(ctx context.Context, req *empty.Empty) (*index.GetStatsResponse, error) {
start := time.Now()
defer RecordMetrics(start, "stats")

resp := &index.GetStatsResponse{}

s.logger.Printf("[INFO] stats %v", req)

stats, err := s.raftServer.Stats()
if err != nil {
return resp, status.Error(codes.Internal, err.Error())
}

resp.Stats = stats

return resp, nil
}
11 changes: 11 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ func (b *Index) Delete(id string) error {
return nil
}

func (b *Index) Stats() (map[string]interface{}, error) {
start := time.Now()
defer func() {
b.logger.Printf("[DEBUG] stats %f", float64(time.Since(start))/float64(time.Second))
}()

stats := b.index.StatsMap()

return stats, nil
}

func (b *Index) SnapshotItems() <-chan *pbindex.Document {
ch := make(chan *pbindex.Document, 1024)

Expand Down
9 changes: 9 additions & 0 deletions index/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} {
}
}

func (f *RaftFSM) Stats() (map[string]interface{}, error) {
stats, err := f.index.Stats()
if err != nil {
return nil, err
}

return stats, nil
}

func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
return &IndexFSMSnapshot{
index: f.index,
Expand Down
20 changes: 20 additions & 0 deletions index/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,23 @@ func (s *RaftServer) Delete(doc *index.Document) error {

return nil
}

func (s *RaftServer) Stats() (*index.Stats, error) {
statsMap, err := s.fsm.Stats()
if err != nil {
return nil, err
}

// map[string]interface{} -> Any
statsAny := &any.Any{}
err = protobuf.UnmarshalAny(statsMap, statsAny)
if err != nil {
return nil, err
}

indexStats := &index.Stats{
Stats: statsAny,
}

return indexStats, nil
}
Loading

0 comments on commit e6b42da

Please sign in to comment.