Skip to content

Commit

Permalink
Merge pull request #10 from OGKevin/reconnect-on-ping-broken-pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
OGKevin-bot authored Apr 22, 2019
2 parents 65811b2 + e38c39a commit 2d9f6dd
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 28 deletions.
40 changes: 38 additions & 2 deletions sonic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type Client struct {
s net.Conn
i net.Conn

ctx context.Context

address string
password string

IngestService *IngestService
SearchService *SearchService
}
Expand Down Expand Up @@ -119,19 +124,50 @@ func NewClientWithPassword(address, password string, ctx context.Context) (*Clie
}

client := Client{i: i, s: s}
client.IngestService, err = newIngestService(&client, password)

client.password = password
client.address = address
client.ctx = ctx

client.IngestService, err = newIngestService(&client)
if err != nil {
return nil, errors.Wrap(err, "could not create ingest service")
}

client.SearchService, err = newSearchService(&client, password, ctx)
client.SearchService, err = newSearchService(&client)
if err != nil {
return nil, errors.Wrap(err, "could not create search service")
}

return &client, nil
}

func (c *Client) reconnect() error {
var err error

c.i, err = net.Dial("tcp", c.address)
if err != nil {
return errors.Wrapf(err, "could not open connection to %q", c.address)
}

c.s, err = net.Dial("tcp", c.address)
if err != nil {
return errors.Wrapf(err, "could not open connection to %q", c.address)
}

err = c.IngestService.connect()
if err != nil {
return errors.Wrap(err, "could not create ingest service")
}

err = c.SearchService.connect()
if err != nil {
return errors.Wrap(err, "could not create search service")
}

return nil
}

// Data builder pattern code
type DataBuilder struct {
data *Data
Expand Down
17 changes: 17 additions & 0 deletions sonic/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ func TestClient_Close(t *testing.T) {
}
}

func TestReconnect(t *testing.T) {
c, err := NewClientWithPassword("localhost:1491", "SecretPassword", context.Background())
if !assert.NoError(t, err ) {
return
}

err = c.reconnect()
if !assert.NoError(t, err) {
return
}

err = c.SearchService.Ping()
if !assert.NoError(t, err) {
return
}
}

func ExampleNewClientWithPassword() {
c, err := NewClientWithPassword("localhost:1491", "SecretPassword", context.Background())
if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions sonic/ingest_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,39 @@ type IngestService struct {
s *bufio.Scanner
}

func newIngestService(c *Client, password string) (*IngestService, error) {
s := bufio.NewScanner(c.i)
func newIngestService(c *Client) (*IngestService, error) {

_, err := io.WriteString(c.i, fmt.Sprintf("START ingest %s\n", password))
i := &IngestService{c: c}

return i, errors.Wrap(i.connect(), "could not connect to ingest service")
}

func (i *IngestService) connect() error {
s := bufio.NewScanner(i.c.i)
i.s =s

_, err := io.WriteString(i.c.i, fmt.Sprintf("START ingest %s\n", i.c.password))
if err != nil {
return nil, errors.Wrap(err, "could not start ingest connection")
return errors.Wrap(err, "could not start ingest connection")
}

s.Scan()

parse:
w := bufio.NewScanner(bytes.NewBuffer(s.Bytes()))
i.s.Scan()
w := bufio.NewScanner(bytes.NewBuffer(i.s.Bytes()))
w.Split(bufio.ScanWords)
w.Scan()

switch w.Text() {
case "STARTED":
case "CONNECTED":
s.Scan()
case "CONNECTED", "":
goto parse
case "ENDED":
return nil, errors.Errorf("failed to start ingest session: %q", s.Text())
return errors.Errorf("failed to start ingest session: %q", i.s.Text())
default:
return nil, errors.Errorf("could not determine how to interpret %q response", s.Text())
return errors.Errorf("could not determine how to interpret %q response", i.s.Text())
}

return &IngestService{c: c, s: s}, nil
return nil
}

// Push search data in the index
Expand Down
56 changes: 42 additions & 14 deletions sonic/search_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"log"
"sync"
"syscall"
"time"
)

Expand All @@ -27,36 +28,50 @@ type SearchService struct {
ctx context.Context
}

func newSearchService(c *Client, password string, ctx context.Context) (*SearchService, error) {
s := bufio.NewScanner(c.s)
func newSearchService(c *Client) (*SearchService, error) {
ss := &SearchService{c: c, pending: make(map[string]chan string), ctx: c.ctx}

_, err := io.WriteString(c.s, fmt.Sprintf("START search %s\n", password))
err := ss.connect()
if err != nil {
return nil, errors.Wrap(err, "could not start search connection")
return nil, errors.Wrap(err, "could not connect to search service")
}

s.Scan()
go ss.keepAlive()

return ss, nil
}

func (s *SearchService) connect() error {
s.sl.Lock()
defer s.sl.Unlock()

scanner := bufio.NewScanner(s.c.s)
s.s = scanner

_, err := io.WriteString(s.c.s, fmt.Sprintf("START search %s\n", s.c.password))
if err != nil {
return errors.Wrap(err, "could not start search connection")
}

s.s.Scan()

parse:
w := bufio.NewScanner(bytes.NewBuffer(s.Bytes()))
w := bufio.NewScanner(bytes.NewBuffer(s.s.Bytes()))
w.Split(bufio.ScanWords)
w.Scan()

switch w.Text() {
case "STARTED":
case "CONNECTED":
s.Scan()
s.s.Scan()
goto parse
case "ENDED":
return nil, errors.Errorf("failed to start search session: %q", s.Text())
return errors.Errorf("failed to start search session: %q", s.s.Text())
default:
return nil, errors.Errorf("could not determine how to interpret %q response", s.Text())
return errors.Errorf("could not determine how to interpret %q response", s.s.Text())
}

ss := &SearchService{c: c, s: s, pending: make(map[string]chan string), ctx: ctx}
go ss.keepAlive()

return ss, nil
return nil
}

func (s *SearchService) keepAlive() {
Expand Down Expand Up @@ -139,12 +154,25 @@ func (s *SearchService) Suggest(data *Data, limit int) (chan string, error) {
}

func (s *SearchService) Ping() error {
reconnect := false

s.sl.Lock()
defer s.sl.Unlock()
ping:

_, err := io.WriteString(s.c.s, fmt.Sprintf("%s\n", "PING"))
if err != nil {
s.sl.Unlock()
if err == syscall.EPIPE && !reconnect {
reconnect = true
err := s.c.reconnect()
if err != nil {
return errors.Wrap(err, "could not reconnect to sonic")
}
goto ping
}
return errors.Wrap(err, "pinging sonic failed")
}
s.sl.Unlock()

return nil
}
Expand Down
8 changes: 8 additions & 0 deletions sonic/search_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func TestSearchService_Query(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if !assert.NoError(t, c.reconnect()) {
return
}

if !assert.NoError(t, c.SearchService.Ping()) {
return
}
Expand Down Expand Up @@ -124,6 +128,10 @@ func TestSearchService_Suggest(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if !assert.NoError(t, c.reconnect()) {
return
}

if !assert.NoError(t, c.SearchService.Ping()) {
return
}
Expand Down

0 comments on commit 2d9f6dd

Please sign in to comment.