Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Change index protobuf #97

Merged
merged 1 commit into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions cmd/blast/indexer_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"syscall"

"github.com/blevesearch/bleve/mapping"
"github.com/mosuka/blast/config"
"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/indexutils"
"github.com/mosuka/blast/logutils"
Expand Down Expand Up @@ -117,14 +116,7 @@ func indexerStart(c *cli.Context) error {
indexMapping = mapping.NewIndexMapping()
}

// create index config
indexConfig := &config.IndexConfig{
IndexMapping: indexMapping,
IndexType: indexType,
IndexStorageType: indexStorageType,
}

svr, err := indexer.NewServer(managerGRPCAddr, shardId, peerGRPCAddr, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpAccessLogger)
svr, err := indexer.NewServer(managerGRPCAddr, shardId, peerGRPCAddr, node, dataDir, raftStorageType, indexMapping, indexType, indexStorageType, logger.Named(nodeId), grpcLogger.Named(nodeId), httpAccessLogger)
if err != nil {
return err
}
Expand Down
10 changes: 1 addition & 9 deletions cmd/blast/manager_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"syscall"

"github.com/blevesearch/bleve/mapping"
"github.com/mosuka/blast/config"
"github.com/mosuka/blast/indexutils"
"github.com/mosuka/blast/logutils"
"github.com/mosuka/blast/manager"
Expand Down Expand Up @@ -115,14 +114,7 @@ func managerStart(c *cli.Context) error {
indexMapping = mapping.NewIndexMapping()
}

// create index config
indexConfig := &config.IndexConfig{
IndexMapping: indexMapping,
IndexType: indexType,
IndexStorageType: indexStorageType,
}

svr, err := manager.NewServer(peerGrpcAddr, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
svr, err := manager.NewServer(peerGrpcAddr, node, dataDir, raftStorageType, indexMapping, indexType, indexStorageType, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
if err != nil {
return err
}
Expand Down
66 changes: 0 additions & 66 deletions config/index_config.go

This file was deleted.

36 changes: 0 additions & 36 deletions config/index_config_test.go

This file was deleted.

56 changes: 38 additions & 18 deletions dispatcher/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/mosuka/blast/indexutils"

"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/logutils"
"github.com/mosuka/blast/manager"
Expand Down Expand Up @@ -56,13 +58,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping1, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType1 := "upside_down"
managerIndexStorageType1 := "boltdb"

// create server
managerServer1, err := manager.NewServer(managerPeerGrpcAddress1, managerNode1, managerDataDir1, managerRaftStorageType1, managerIndexConfig1, logger, grpcLogger, httpAccessLogger)
managerServer1, err := manager.NewServer(managerPeerGrpcAddress1, managerNode1, managerDataDir1, managerRaftStorageType1, managerIndexMapping1, managerIndexType1, managerIndexStorageType1, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer1 != nil {
managerServer1.Stop()
Expand Down Expand Up @@ -93,13 +97,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping2, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType2 := "upside_down"
managerIndexStorageType2 := "boltdb"

// create server
managerServer2, err := manager.NewServer(managerPeerGrpcAddress2, managerNode2, managerDataDir2, managerRaftStorageType2, managerIndexConfig2, logger, grpcLogger, httpAccessLogger)
managerServer2, err := manager.NewServer(managerPeerGrpcAddress2, managerNode2, managerDataDir2, managerRaftStorageType2, managerIndexMapping2, managerIndexType2, managerIndexStorageType2, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer2 != nil {
managerServer2.Stop()
Expand Down Expand Up @@ -130,13 +136,15 @@ func TestServer_Start(t *testing.T) {
},
}

managerIndexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
managerIndexMapping3, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
managerIndexType3 := "upside_down"
managerIndexStorageType3 := "boltdb"

// create server
managerServer3, err := manager.NewServer(managerPeerGrpcAddress3, managerNode3, managerDataDir3, managerRaftStorageType3, managerIndexConfig3, logger, grpcLogger, httpAccessLogger)
managerServer3, err := manager.NewServer(managerPeerGrpcAddress3, managerNode3, managerDataDir3, managerRaftStorageType3, managerIndexMapping3, managerIndexType3, managerIndexStorageType3, logger, grpcLogger, httpAccessLogger)
defer func() {
if managerServer3 != nil {
managerServer3.Stop()
Expand Down Expand Up @@ -226,11 +234,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress1,
},
}
indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping1, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer1, err := indexer.NewServer(indexerManagerGrpcAddress1, indexerShardId1, indexerPeerGrpcAddress1, indexerNode1, indexerDataDir1, indexerRaftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger)
indexerIndexType1 := "upside_down"
indexerIndexStorageType1 := "boltdb"
indexerServer1, err := indexer.NewServer(indexerManagerGrpcAddress1, indexerShardId1, indexerPeerGrpcAddress1, indexerNode1, indexerDataDir1, indexerRaftStorageType1, indexerIndexMapping1, indexerIndexType1, indexerIndexStorageType1, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer1.Stop()
}()
Expand Down Expand Up @@ -264,11 +274,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress2,
},
}
indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping2, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer2, err := indexer.NewServer(indexerManagerGrpcAddress2, indexerShardId2, indexerPeerGrpcAddress2, indexerNode2, indexerDataDir2, indexerRaftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger)
indexerIndexType2 := "upside_down"
indexerIndexStorageType2 := "boltdb"
indexerServer2, err := indexer.NewServer(indexerManagerGrpcAddress2, indexerShardId2, indexerPeerGrpcAddress2, indexerNode2, indexerDataDir2, indexerRaftStorageType2, indexerIndexMapping2, indexerIndexType2, indexerIndexStorageType2, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer2.Stop()
}()
Expand Down Expand Up @@ -302,11 +314,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress3,
},
}
indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping3, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer3, err := indexer.NewServer(indexerManagerGrpcAddress3, indexerShardId3, indexerPeerGrpcAddress3, indexerNode3, indexerDataDir3, indexerRaftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger)
indexerIndexType3 := "upside_down"
indexerIndexStorageType3 := "boltdb"
indexerServer3, err := indexer.NewServer(indexerManagerGrpcAddress3, indexerShardId3, indexerPeerGrpcAddress3, indexerNode3, indexerDataDir3, indexerRaftStorageType3, indexerIndexMapping3, indexerIndexType3, indexerIndexStorageType3, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer3.Stop()
}()
Expand Down Expand Up @@ -392,11 +406,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress4,
},
}
indexConfig4, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping4, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer4, err := indexer.NewServer(indexerManagerGrpcAddress4, indexerShardId4, indexerPeerGrpcAddress4, indexerNode4, indexerDataDir4, indexerRaftStorageType4, indexConfig4, logger, grpcLogger, httpAccessLogger)
indexerIndexType4 := "upside_down"
indexerIndexStorageType4 := "boltdb"
indexerServer4, err := indexer.NewServer(indexerManagerGrpcAddress4, indexerShardId4, indexerPeerGrpcAddress4, indexerNode4, indexerDataDir4, indexerRaftStorageType4, indexerIndexMapping4, indexerIndexType4, indexerIndexStorageType4, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer4.Stop()
}()
Expand Down Expand Up @@ -430,11 +446,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress5,
},
}
indexConfig5, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping5, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer5, err := indexer.NewServer(indexerManagerGrpcAddress5, indexerShardId5, indexerPeerGrpcAddress5, indexerNode5, indexerDataDir5, indexerRaftStorageType5, indexConfig5, logger, grpcLogger, httpAccessLogger)
indexerIndexType5 := "upside_down"
indexerIndexStorageType5 := "boltdb"
indexerServer5, err := indexer.NewServer(indexerManagerGrpcAddress5, indexerShardId5, indexerPeerGrpcAddress5, indexerNode5, indexerDataDir5, indexerRaftStorageType5, indexerIndexMapping5, indexerIndexType5, indexerIndexStorageType5, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer5.Stop()
}()
Expand Down Expand Up @@ -468,11 +486,13 @@ func TestServer_Start(t *testing.T) {
HttpAddress: indexerHttpAddress6,
},
}
indexConfig6, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb")
indexerIndexMapping6, err := indexutils.NewIndexMappingFromFile(filepath.Join(curDir, "../example/wiki_index_mapping.json"))
if err != nil {
t.Fatalf("%v", err)
}
indexerServer6, err := indexer.NewServer(indexerManagerGrpcAddress6, indexerShardId6, indexerPeerGrpcAddress6, indexerNode6, indexerDataDir6, indexerRaftStorageType6, indexConfig6, logger, grpcLogger, httpAccessLogger)
indexerIndexType6 := "upside_down"
indexerIndexStorageType6 := "boltdb"
indexerServer6, err := indexer.NewServer(indexerManagerGrpcAddress6, indexerShardId6, indexerPeerGrpcAddress6, indexerNode6, indexerDataDir6, indexerRaftStorageType6, indexerIndexMapping6, indexerIndexType6, indexerIndexStorageType6, logger, grpcLogger, httpAccessLogger)
defer func() {
indexerServer6.Stop()
}()
Expand Down
13 changes: 7 additions & 6 deletions indexer/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,20 @@ func (c *GRPCClient) GetIndexConfig(opts ...grpc.CallOption) (map[string]interfa
resp, err := c.client.GetIndexConfig(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

indexConfigIntr, err := protobuf.MarshalAny(resp.IndexConfig)
indexMapping, err := protobuf.MarshalAny(resp.IndexConfig.IndexMapping)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}
indexConfig := *indexConfigIntr.(*map[string]interface{})

indexConfig := map[string]interface{}{
"index_mapping": indexMapping,
"index_type": resp.IndexConfig.IndexType,
"index_storage_type": resp.IndexConfig.IndexStorageType,
}

return indexConfig, nil
}
Expand All @@ -325,14 +328,12 @@ func (c *GRPCClient) GetIndexStats(opts ...grpc.CallOption) (map[string]interfac
resp, err := c.client.GetIndexStats(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

indexStatsIntr, err := protobuf.MarshalAny(resp.IndexStats)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}
indexStats := *indexStatsIntr.(*map[string]interface{})
Expand Down
Loading