Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Commit

Permalink
Change index protobuf (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosuka authored Aug 6, 2019
1 parent ea1e45d commit 11881db
Show file tree
Hide file tree
Showing 19 changed files with 641 additions and 431 deletions.
10 changes: 1 addition & 9 deletions cmd/blast/indexer_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"syscall"

"github.com/blevesearch/bleve/mapping"
"github.com/mosuka/blast/config"
"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/indexutils"
"github.com/mosuka/blast/logutils"
Expand Down Expand Up @@ -117,14 +116,7 @@ func indexerStart(c *cli.Context) error {
indexMapping = mapping.NewIndexMapping()
}

// create index config
indexConfig := &config.IndexConfig{
IndexMapping: indexMapping,
IndexType: indexType,
IndexStorageType: indexStorageType,
}

svr, err := indexer.NewServer(managerGRPCAddr, shardId, peerGRPCAddr, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpAccessLogger)
svr, err := indexer.NewServer(managerGRPCAddr, shardId, peerGRPCAddr, node, dataDir, raftStorageType, indexMapping, indexType, indexStorageType, logger.Named(nodeId), grpcLogger.Named(nodeId), httpAccessLogger)
if err != nil {
return err
}
Expand Down
10 changes: 1 addition & 9 deletions cmd/blast/manager_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"syscall"

"github.com/blevesearch/bleve/mapping"
"github.com/mosuka/blast/config"
"github.com/mosuka/blast/indexutils"
"github.com/mosuka/blast/logutils"
"github.com/mosuka/blast/manager"
Expand Down Expand Up @@ -115,14 +114,7 @@ func managerStart(c *cli.Context) error {
indexMapping = mapping.NewIndexMapping()
}

// create index config
indexConfig := &config.IndexConfig{
IndexMapping: indexMapping,
IndexType: indexType,
IndexStorageType: indexStorageType,
}

svr, err := manager.NewServer(peerGrpcAddr, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
svr, err := manager.NewServer(peerGrpcAddr, node, dataDir, raftStorageType, indexMapping, indexType, indexStorageType, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
if err != nil {
return err
}
Expand Down
66 changes: 0 additions & 66 deletions config/index_config.go

This file was deleted.

36 changes: 0 additions & 36 deletions config/index_config_test.go

This file was deleted.

56 changes: 38 additions & 18 deletions dispatcher/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/mosuka/blast/indexutils"

"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/logutils"
"github.com/mosuka/blast/manager"
Expand Down Expand Up @@ -56,13 +58,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping1, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType1 := "upside_down"
managerIndexStorageType1 := "boltdb"

// create server
managerServer1, err := manager.NewServer(managerPeerGrpcAddress1, managerNode1, managerDataDir1, managerRaftStorageType1, managerIndexConfig1, logger, grpcLogger, httpAccessLogger)
managerServer1, err := manager.NewServer(managerPeerGrpcAddress1, managerNode1, managerDataDir1, managerRaftStorageType1, managerIndexMapping1, managerIndexType1, managerIndexStorageType1, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer1 != nil {
managerServer1.Stop()
Expand Down Expand Up @@ -93,13 +97,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping2, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType2 := "upside_down"
managerIndexStorageType2 := "boltdb"

// create server
managerServer2, err := manager.NewServer(managerPeerGrpcAddress2, managerNode2, managerDataDir2, managerRaftStorageType2, managerIndexConfig2, logger, grpcLogger, httpAccessLogger)
managerServer2, err := manager.NewServer(managerPeerGrpcAddress2, managerNode2, managerDataDir2, managerRaftStorageType2, managerIndexMapping2, managerIndexType2, managerIndexStorageType2, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer2 != nil {
managerServer2.Stop()
Expand Down Expand Up @@ -130,13 +136,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping3, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType3 := "upside_down"
managerIndexStorageType3 := "boltdb"

// create server
managerServer3, err := manager.NewServer(managerPeerGrpcAddress3, managerNode3, managerDataDir3, managerRaftStorageType3, managerIndexConfig3, logger, grpcLogger, httpAccessLogger)
managerServer3, err := manager.NewServer(managerPeerGrpcAddress3, managerNode3, managerDataDir3, managerRaftStorageType3, managerIndexMapping3, managerIndexType3, managerIndexStorageType3, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer3 != nil {
managerServer3.Stop()
Expand Down Expand Up @@ -226,11 +234,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress1,
},
}
indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping1, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer1, err := indexer.NewServer(indexerManagerGrpcAddress1, indexerShardId1, indexerPeerGrpcAddress1, indexerNode1, indexerDataDir1, indexerRaftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger)
indexerIndexType1 := "upside_down"
indexerIndexStorageType1 := "boltdb"
indexerServer1, err := indexer.NewServer(indexerManagerGrpcAddress1, indexerShardId1, indexerPeerGrpcAddress1, indexerNode1, indexerDataDir1, indexerRaftStorageType1, indexerIndexMapping1, indexerIndexType1, indexerIndexStorageType1, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer1.Stop()
}()
Expand Down Expand Up @@ -264,11 +274,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress2,
},
}
indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping2, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer2, err := indexer.NewServer(indexerManagerGrpcAddress2, indexerShardId2, indexerPeerGrpcAddress2, indexerNode2, indexerDataDir2, indexerRaftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger)
indexerIndexType2 := "upside_down"
indexerIndexStorageType2 := "boltdb"
indexerServer2, err := indexer.NewServer(indexerManagerGrpcAddress2, indexerShardId2, indexerPeerGrpcAddress2, indexerNode2, indexerDataDir2, indexerRaftStorageType2, indexerIndexMapping2, indexerIndexType2, indexerIndexStorageType2, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer2.Stop()
}()
Expand Down Expand Up @@ -302,11 +314,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress3,
},
}
indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping3, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer3, err := indexer.NewServer(indexerManagerGrpcAddress3, indexerShardId3, indexerPeerGrpcAddress3, indexerNode3, indexerDataDir3, indexerRaftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger)
indexerIndexType3 := "upside_down"
indexerIndexStorageType3 := "boltdb"
indexerServer3, err := indexer.NewServer(indexerManagerGrpcAddress3, indexerShardId3, indexerPeerGrpcAddress3, indexerNode3, indexerDataDir3, indexerRaftStorageType3, indexerIndexMapping3, indexerIndexType3, indexerIndexStorageType3, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer3.Stop()
}()
Expand Down Expand Up @@ -392,11 +406,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress4,
},
}
indexConfig4, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping4, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer4, err := indexer.NewServer(indexerManagerGrpcAddress4, indexerShardId4, indexerPeerGrpcAddress4, indexerNode4, indexerDataDir4, indexerRaftStorageType4, indexConfig4, logger, grpcLogger, httpAccessLogger)
indexerIndexType4 := "upside_down"
indexerIndexStorageType4 := "boltdb"
indexerServer4, err := indexer.NewServer(indexerManagerGrpcAddress4, indexerShardId4, indexerPeerGrpcAddress4, indexerNode4, indexerDataDir4, indexerRaftStorageType4, indexerIndexMapping4, indexerIndexType4, indexerIndexStorageType4, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer4.Stop()
}()
Expand Down Expand Up @@ -430,11 +446,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress5,
},
}
indexConfig5, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping5, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer5, err := indexer.NewServer(indexerManagerGrpcAddress5, indexerShardId5, indexerPeerGrpcAddress5, indexerNode5, indexerDataDir5, indexerRaftStorageType5, indexConfig5, logger, grpcLogger, httpAccessLogger)
indexerIndexType5 := "upside_down"
indexerIndexStorageType5 := "boltdb"
indexerServer5, err := indexer.NewServer(indexerManagerGrpcAddress5, indexerShardId5, indexerPeerGrpcAddress5, indexerNode5, indexerDataDir5, indexerRaftStorageType5, indexerIndexMapping5, indexerIndexType5, indexerIndexStorageType5, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer5.Stop()
}()
Expand Down Expand Up @@ -468,11 +486,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress6,
},
}
indexConfig6, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping6, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer6, err := indexer.NewServer(indexerManagerGrpcAddress6, indexerShardId6, indexerPeerGrpcAddress6, indexerNode6, indexerDataDir6, indexerRaftStorageType6, indexConfig6, logger, grpcLogger, httpAccessLogger)
indexerIndexType6 := "upside_down"
indexerIndexStorageType6 := "boltdb"
indexerServer6, err := indexer.NewServer(indexerManagerGrpcAddress6, indexerShardId6, indexerPeerGrpcAddress6, indexerNode6, indexerDataDir6, indexerRaftStorageType6, indexerIndexMapping6, indexerIndexType6, indexerIndexStorageType6, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer6.Stop()
}()
Expand Down
13 changes: 7 additions & 6 deletions indexer/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,20 @@ func (c *GRPCClient) GetIndexConfig(opts ...grpc.CallOption) (map[string]interfa
resp, err := c.client.GetIndexConfig(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

indexConfigIntr, err := protobuf.MarshalAny(resp.IndexConfig)
indexMapping, err := protobuf.MarshalAny(resp.IndexConfig.IndexMapping)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}
indexConfig := *indexConfigIntr.(*map[string]interface{})

indexConfig := map[string]interface{}{
"index_mapping": indexMapping,
"index_type": resp.IndexConfig.IndexType,
"index_storage_type": resp.IndexConfig.IndexStorageType,
}

return indexConfig, nil
}
Expand All @@ -325,14 +328,12 @@ func (c *GRPCClient) GetIndexStats(opts ...grpc.CallOption) (map[string]interfac
resp, err := c.client.GetIndexStats(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

indexStatsIntr, err := protobuf.MarshalAny(resp.IndexStats)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}
indexStats := *indexStatsIntr.(*map[string]interface{})
Expand Down
Loading

0 comments on commit 11881db

Please sign in to comment.