Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Commit

Permalink
fix #177: implement read input command
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilsk committed Oct 17, 2018
1 parent 77f75df commit ed50e1e
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 22 deletions.
49 changes: 49 additions & 0 deletions cmd/grpc_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"os"
"reflect"
"strings"
"time"

pb "github.com/kamilsk/form-api/pkg/server/grpc"
kit "github.com/kamilsk/go-kit/pkg/strings"

"github.com/kamilsk/form-api/pkg/config"
"github.com/kamilsk/form-api/pkg/domain"
"github.com/kamilsk/form-api/pkg/server/grpc/middleware"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand All @@ -25,6 +27,7 @@ import (
const (
schemaKind kind = "Schema"
templateKind kind = "Template"
inputKind kind = "Input"
)

var entities factory
Expand All @@ -38,6 +41,7 @@ func init() {
readCmd: {
schemaKind: func() interface{} { return &pb.ReadSchemaRequest{} },
templateKind: func() interface{} { return &pb.ReadTemplateRequest{} },
inputKind: func() interface{} { return &inputReadProxy{} },
},
updateCmd: {
schemaKind: func() interface{} { return &pb.UpdateSchemaRequest{} },
Expand All @@ -50,6 +54,19 @@ func init() {
}
}

type inputReadProxy struct {
Filter struct {
ID domain.ID `json:"id" mapstructure:"id" yaml:"id"`
Condition struct {
SchemaID domain.ID `json:"schema_id" mapstructure:"schema_id" yaml:"schema_id"`
CreatedAt struct {
Start string `json:"start" mapstructure:"start" yaml:"start"`
End string `json:"end" mapstructure:"end" yaml:"end"`
} `json:"created_at" mapstructure:"created_at" yaml:"created_at"`
} `json:"condition" mapstructure:"condition" yaml:"condition"`
} `json:"filter" mapstructure:"filter" yaml:"filter"`
}

func communicate(cmd *cobra.Command, _ []string) error {
entity, err := entities.new(cmd)
if err != nil {
Expand Down Expand Up @@ -167,6 +184,38 @@ func call(cnf config.GRPCConfig, entity interface{}) (interface{}, error) {
case *pb.ReadTemplateRequest:
return pb.NewTemplateClient(conn).Read(ctx, request)

// TODO issue#180
// - remove hacks with proxies
// - remove deps to github.com/mitchellh/mapstructure
// - use github.com/ghodss/yaml and github.com/grpc-ecosystem/grpc-gateway/runtime instead
case *inputReadProxy:
grpcRequest := &pb.ReadInputRequest{}
if request.Filter.ID != "" {
grpcRequest.Filter = &pb.ReadInputRequest_Id{Id: request.Filter.ID.String()}
} else {
var start, end *time.Time
if request.Filter.Condition.CreatedAt.Start != "" {
t, parseErr := time.Parse(time.RFC3339, request.Filter.Condition.CreatedAt.Start)
if parseErr == nil {
start = &t
}
}
if request.Filter.Condition.CreatedAt.End != "" {
t, parseErr := time.Parse(time.RFC3339, request.Filter.Condition.CreatedAt.End)
if parseErr == nil {
end = &t
}
}
grpcRequest.Filter = &pb.ReadInputRequest_Condition{Condition: &pb.InputFilter{
SchemaId: request.Filter.Condition.SchemaID.String(),
CreatedAt: &pb.TimestampRange{
Start: pb.Timestamp(start),
End: pb.Timestamp(end),
},
}}
}
return pb.NewInputClient(conn).Read(ctx, grpcRequest)

case *pb.UpdateSchemaRequest:
return pb.NewSchemaClient(conn).Update(ctx, request)
case *pb.UpdateTemplateRequest:
Expand Down
82 changes: 78 additions & 4 deletions pkg/server/grpc/server_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ package grpc

import (
"context"
"log"
"encoding/json"
"time"

repository "github.com/kamilsk/form-api/pkg/storage/types"

"github.com/kamilsk/form-api/pkg/domain"
"github.com/kamilsk/form-api/pkg/server/grpc/middleware"
"github.com/kamilsk/form-api/pkg/storage/query"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// NewInputServer returns new instance of server API for Input service.
Expand All @@ -15,7 +24,72 @@ type inputServer struct {
}

// Read TODO issue#173
func (*inputServer) Read(context.Context, *ReadInputRequest) (*ReadInputResponse, error) {
log.Println("InputServer.Read was called")
return &ReadInputResponse{}, nil
func (server *inputServer) Read(ctx context.Context, req *ReadInputRequest) (*ReadInputResponse, error) {
tokenID, authErr := middleware.TokenExtractor(ctx)
if authErr != nil {
return nil, authErr
}

var entries []*InputEntry
push := func(input repository.Input) error {
data, encodeErr := json.Marshal(input.Data)
if encodeErr != nil {
return status.Errorf(codes.Internal,
"trying to marshal data `%#v` of the input %q into JSON: %+v",
input.Data, input.ID, encodeErr)
}
entries = append(entries, &InputEntry{
Id: input.ID.String(),
SchemaId: input.SchemaID.String(),
Data: string(data),
})
return nil
}

switch filter := req.Filter.(type) {
case *ReadInputRequest_Condition:
inputs, readErr := server.storage.ReadInputByFilter(ctx, tokenID, query.InputFilter{
SchemaID: func() domain.ID {
if filter.Condition == nil {
return ""
}
return domain.ID(filter.Condition.SchemaId)
}(),
From: func() *time.Time {
if filter.Condition == nil {
return nil
}
if filter.Condition.CreatedAt == nil {
return nil
}
return Time(filter.Condition.CreatedAt.Start)
}(),
To: func() *time.Time {
if filter.Condition == nil {
return nil
}
if filter.Condition.CreatedAt == nil {
return nil
}
return Time(filter.Condition.CreatedAt.End)
}(),
})
if readErr != nil {
return nil, status.Errorf(codes.Internal, "error happened: %+v", readErr)
}
for _, input := range inputs {
if pushErr := push(input); pushErr != nil {
return nil, pushErr
}
}
case *ReadInputRequest_Id:
input, readErr := server.storage.ReadInputByID(ctx, tokenID, domain.ID(filter.Id))
if readErr != nil {
return nil, status.Errorf(codes.Internal, "error happened: %+v", readErr)
}
if pushErr := push(input); pushErr != nil {
return nil, pushErr
}
}
return &ReadInputResponse{Entries: entries}, nil
}
14 changes: 7 additions & 7 deletions pkg/storage/executor/internal/postgres/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ func (scope inputScope) ReadByFilter(token *types.Token, filter query.InputFilte
q := `SELECT "i"."id", "i"."data", "i"."created_at"
FROM "input" "i"
INNER JOIN "schema" "s" ON "s"."id" = "i"."schema_id"
WHERE "i"."schema_id" = $1 AND "s"."account_id"`
WHERE "i"."schema_id" = $1 AND "s"."account_id" = $2`
args := append(make([]interface{}, 0, 4), filter.SchemaID, token.User.AccountID)
// TODO go 1.10 builder := strings.Builder{}
builder := bytes.NewBuffer(make([]byte, 0, len(q)+39))
_, _ = builder.WriteString(q)
switch {
case !filter.From.IsZero() && !filter.To.IsZero():
_, _ = builder.WriteString(` AND "i"."created_at" BETWEEN $2 AND $3`)
case filter.From != nil && filter.To != nil:
_, _ = builder.WriteString(` AND "i"."created_at" BETWEEN $3 AND $4`)
args = append(args, filter.From, filter.To)
case !filter.From.IsZero():
_, _ = builder.WriteString(` AND "i"."created_at" >= $2`)
case filter.From != nil:
_, _ = builder.WriteString(` AND "i"."created_at" >= $3`)
args = append(args, filter.From)
case !filter.To.IsZero():
_, _ = builder.WriteString(` AND "i"."created_at" <= $2`)
case filter.To != nil:
_, _ = builder.WriteString(` AND "i"."created_at" <= $3`)
args = append(args, filter.To)
}
entities := make([]types.Input, 0, 8)
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/executor/internal/postgres/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func TestInputReader(t *testing.T) {
assert.NoError(t, err)
defer conn.Close()

testCases := []struct {
now := time.Now()

tests := []struct {
name string
mocker func(sqlmock.Sqlmock)
filter query.InputFilter
Expand All @@ -92,7 +94,7 @@ func TestInputReader(t *testing.T) {
WillReturnRows(
sqlmock.
NewRows([]string{"id", "data", "created_at"}).
AddRow(id, `{"input":["test"]}`, time.Now()),
AddRow(id, `{"input":["test"]}`, now),
)
},
filter: query.InputFilter{SchemaID: id},
Expand All @@ -106,10 +108,10 @@ func TestInputReader(t *testing.T) {
WillReturnRows(
sqlmock.
NewRows([]string{"id", "data", "created_at"}).
AddRow(id, `{"input":["test"]}`, time.Now()),
AddRow(id, `{"input":["test"]}`, now),
)
},
filter: query.InputFilter{SchemaID: id, From: time.Now()},
filter: query.InputFilter{SchemaID: id, From: &now},
},
{
name: `by schema ID and "to" date`,
Expand All @@ -120,10 +122,10 @@ func TestInputReader(t *testing.T) {
WillReturnRows(
sqlmock.
NewRows([]string{"id", "data", "created_at"}).
AddRow(id, `{"input":["test"]}`, time.Now()),
AddRow(id, `{"input":["test"]}`, now),
)
},
filter: query.InputFilter{SchemaID: id, To: time.Now()},
filter: query.InputFilter{SchemaID: id, To: &now},
},
{
name: `by schema ID and "from" and "to" dates`,
Expand All @@ -134,15 +136,15 @@ func TestInputReader(t *testing.T) {
WillReturnRows(
sqlmock.
NewRows([]string{"id", "data", "created_at"}).
AddRow(id, `{"input":["test"]}`, time.Now()),
AddRow(id, `{"input":["test"]}`, now),
)
},
filter: query.InputFilter{SchemaID: id, From: time.Now(), To: time.Now()},
filter: query.InputFilter{SchemaID: id, From: &now, To: &now},
},
}

var exec executor.InputReader = NewInputContext(ctx, conn)
for _, test := range testCases {
for _, test := range tests {
test.mocker(mock)
inputs, readErr := exec.ReadByFilter(token, test.filter)
assert.NoError(t, readErr, test.name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/query/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
// InputFilter TODO issue#173
type InputFilter struct {
SchemaID domain.ID
From time.Time
To time.Time
From *time.Time
To *time.Time
}

// WriteInput TODO issue#173
Expand Down

0 comments on commit ed50e1e

Please sign in to comment.