Skip to content

Commit

Permalink
Merge pull request #59 from basenana/fix/meilisearch
Browse files Browse the repository at this point in the history
fix: split doc and attr into different index in meilisearch
  • Loading branch information
zwwhdls authored Dec 17, 2024
2 parents bd105fc + e397d12 commit 782a1cc
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 110 deletions.
22 changes: 22 additions & 0 deletions pkg/models/doc/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package doc
import (
"fmt"
"time"

"github.com/basenana/friday/pkg/models"
"github.com/basenana/friday/pkg/utils"
)

type Document struct {
Expand All @@ -38,6 +41,25 @@ type Document struct {
ChangedAt time.Time `json:"changed_at"`
}

func (d *Document) NewTest() *Document {
return &Document{
EntryId: 1,
Name: "test",
Namespace: models.DefaultNamespaceValue,
ParentEntryID: utils.ToPtr(int64(1)),
Source: "test",
Content: "test",
Summary: "test",
WebUrl: "test",
HeaderImage: "test",
SubContent: "test",
Marked: utils.ToPtr(true),
Unread: utils.ToPtr(true),
CreatedAt: time.Now(),
ChangedAt: time.Now(),
}
}

type DocumentFilter struct {
Namespace string
Search string
Expand Down
129 changes: 68 additions & 61 deletions pkg/store/meili/meili.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/meilisearch/meilisearch-go"
Expand Down Expand Up @@ -66,95 +67,88 @@ func NewMeiliClient(conf config.Config) (store.DocStoreInterface, error) {
}

func (c *Client) init() error {
attrs, err := c.docIndex.GetFilterableAttributes()
if err != nil {
testDoc := (&doc.Document{}).NewTest()
// doc index
if err := c.initIndex(c.docIndex, DocFilterableAttrs, DocSortAttrs, func() error {
return c.CreateDocument(context.TODO(), testDoc)
}); err != nil {
return err
}
if !utils.Equal(DocFilterableAttrs, attrs) {
t, err := c.docIndex.UpdateFilterableAttributes(&DocFilterableAttrs)
if err != nil {
return err
}
if err = c.wait(context.TODO(), "document", t.TaskUID); err != nil {
return err
}
}

sortAttrs := DocSortAttrs
crtSortAttrs, err := c.docIndex.GetSortableAttributes()
if err != nil {
// attr index
if err := c.initIndex(c.attrIndex, DocAttrFilterableAttrs, DocAttrSortAttrs, func() error {
return c.CreateDocument(context.TODO(), testDoc)
}); err != nil {
return err
}
if !utils.Equal(sortAttrs, crtSortAttrs) {
t, err := c.docIndex.UpdateSortableAttributes(&sortAttrs)
if err != nil {

d, err := c.GetDocument(context.TODO(), testDoc.EntryId)
if (err != nil && strings.Contains(err.Error(), "not found")) || (err == nil && d == nil) {
return nil
} else if d != nil {
return c.DeleteDocument(context.TODO(), testDoc.EntryId)
}
return err
}

func (c *Client) initIndex(index meilisearch.IndexManager, filterableAttrs, sortableAttrs []string, noIndexFn func() error) error {
attrs, err := index.GetFilterableAttributes()
if err != nil {
if !strings.Contains(err.Error(), "not found") {
return err
}
if err = c.wait(context.TODO(), "document", t.TaskUID); err != nil {
if err := noIndexFn(); err != nil {
return err
}
}

// attr index
attrAttrs, err := c.attrIndex.GetFilterableAttributes()
if err != nil {
return err
}
if !utils.Equal(DocAttrFilterableAttrs, attrAttrs) {
t, err := c.docIndex.UpdateFilterableAttributes(&DocAttrFilterableAttrs)
if !utils.Equal(filterableAttrs, attrs) {
t, err := index.UpdateFilterableAttributes(&filterableAttrs)
if err != nil {
return err
}
if err = c.wait(context.TODO(), "attr", t.TaskUID); err != nil {
if err = c.wait(context.TODO(), index, t.TaskUID); err != nil {
return err
}
}
attrSortAttrs := DocAttrSortAttrs
crtAttrSortAttrs, err := c.docIndex.GetSortableAttributes()

crtSortAttrs, err := index.GetSortableAttributes()
if err != nil {
return err
}
if !utils.Equal(attrSortAttrs, crtAttrSortAttrs) {
t, err := c.docIndex.UpdateSortableAttributes(&attrSortAttrs)
if !utils.Equal(sortableAttrs, crtSortAttrs) {
t, err := index.UpdateSortableAttributes(&sortableAttrs)
if err != nil {
return err
}
if err = c.wait(context.TODO(), "attr", t.TaskUID); err != nil {
if err = c.wait(context.TODO(), index, t.TaskUID); err != nil {
return err
}
}
return nil
}

func (c *Client) index(kind string) meilisearch.IndexManager {
if kind == "attr" {
return c.attrIndex
}
return c.docIndex
}

func (c *Client) CreateDocument(ctx context.Context, doc *doc.Document) error {
newDoc := (&Document{}).FromModel(doc)
c.log.Debugf("store entryId %s", newDoc.EntryId)
task, err := c.index(newDoc.Kind).AddDocuments(newDoc, "id")
task, err := c.docIndex.AddDocuments(newDoc, "id")
if err != nil {
c.log.Error(err)
return err
}
if err := c.wait(ctx, newDoc.Kind, task.TaskUID); err != nil {
if err := c.wait(ctx, c.docIndex, task.TaskUID); err != nil {
c.log.Errorf("store document with entryId %s error: %s", newDoc.EntryId, err)
return err
}

// store document attr
newAttrs := (&DocumentAttrList{}).FromModel(doc)
c.log.Debugf("store doc of entryId %d attrs: %s", doc.EntryId, newAttrs.String())
t, err := c.index("attr").AddDocuments(newAttrs, "id")
t, err := c.attrIndex.AddDocuments(newAttrs, "id")
if err != nil {
c.log.Error(err)
return err
}
if err := c.wait(ctx, "attr", t.TaskUID); err != nil {
if err := c.wait(ctx, c.attrIndex, t.TaskUID); err != nil {
c.log.Errorf("store document attr of entryId %d error: %s", doc.EntryId, err)
return err
}
Expand All @@ -170,24 +164,24 @@ func (c *Client) UpdateDocument(ctx context.Context, doc *doc.Document) error {
for _, aq := range newAttrsQuery.AttrQueries {
filter = append(filter, aq.ToFilter())
}
t, err := c.index("attr").DeleteDocumentsByFilter(filter)
t, err := c.attrIndex.DeleteDocumentsByFilter(filter)
if err != nil {
c.log.Error(err)
return err
}
if err = c.wait(ctx, "attr", t.TaskUID); err != nil {
if err = c.wait(ctx, c.attrIndex, t.TaskUID); err != nil {
c.log.Errorf("delete document by filter error: %s", err)
return err
}
// store document attr
newAttrs := (&DocumentAttrList{}).FromModel(doc)
c.log.Debugf("store doc of entryId %d attrs: %s", doc.EntryId, newAttrs.String())
t, err = c.index("attr").AddDocuments(newAttrs, "id")
t, err = c.attrIndex.AddDocuments(newAttrs, "id")
if err != nil {
c.log.Error(err)
return err
}
if err := c.wait(ctx, "attr", t.TaskUID); err != nil {
if err := c.wait(ctx, c.attrIndex, t.TaskUID); err != nil {
c.log.Errorf("store document attr of entryId %d error: %s", doc.EntryId, err)
return err
}
Expand All @@ -198,7 +192,7 @@ func (c *Client) GetDocument(ctx context.Context, entryId int64) (*doc.Document,
namespace := models.GetNamespace(ctx)
query := (&DocumentQuery{}).OfEntryId(namespace.String(), entryId)
c.log.Debugf("get document by entryId: %d", entryId)
rep, err := c.index("document").Search("", query.ToRequest())
rep, err := c.docIndex.Search("", query.ToRequest())
if err != nil {
return nil, err
}
Expand All @@ -215,7 +209,7 @@ func (c *Client) GetDocument(ctx context.Context, entryId int64) (*doc.Document,
// get attrs
attrQuery := (&DocumentAttrQuery{}).OfEntryId(document.Namespace, document.EntryId)
c.log.Debugf("filter document attr: %s", attrQuery.String())
attrRep, err := c.index("attr").Search("", attrQuery.ToRequest())
attrRep, err := c.attrIndex.Search("", attrQuery.ToRequest())
if err != nil {
return nil, err
}
Expand All @@ -241,7 +235,7 @@ func (c *Client) FilterDocuments(ctx context.Context, filter *doc.DocumentFilter
attrQuery := (&DocumentAttrQueries{}).FromFilter(filter)
for _, aq := range *attrQuery {
c.log.Debugf("filter document attr: %s", aq.String())
attrRep, err := c.index("attr").Search("", aq.ToRequest())
attrRep, err := c.attrIndex.Search("", aq.ToRequest())
if err != nil {
return nil, err
}
Expand All @@ -263,11 +257,14 @@ func (c *Client) FilterDocuments(ctx context.Context, filter *doc.DocumentFilter
Option: "IN",
Value: entryIds,
})
} else {
// no result
return nil, nil
}
}

c.log.Debugf("search document: [%s] query: [%s]", query.Search, query.String())
rep, err := c.index("document").Search(query.Search, query.ToRequest())
rep, err := c.docIndex.Search(query.Search, query.ToRequest())
if err != nil {
return nil, err
}
Expand All @@ -282,16 +279,17 @@ func (c *Client) FilterDocuments(ctx context.Context, filter *doc.DocumentFilter
c.log.Errorf("unmarshal document error: %s", err)
continue
}
c.log.Debugf("get document: %s", document.String())

// get attrs
attrQuery := (&DocumentAttrQuery{}).OfEntryId(document.Namespace, document.EntryId)
c.log.Debugf("filter document attr: %s", attrQuery.String())
attrRep, err := c.index("attr").Search("", attrQuery.ToRequest())
attrRep, err := c.attrIndex.Search("", attrQuery.ToRequest())
if err != nil {
return nil, err
}

attrs := make([]*DocumentAttr, 0)
attrs := DocumentAttrList{}
for _, hit := range attrRep.Hits {
b, _ := json.Marshal(hit)
attr := &DocumentAttr{}
Expand All @@ -302,38 +300,47 @@ func (c *Client) FilterDocuments(ctx context.Context, filter *doc.DocumentFilter
}
attrs = append(attrs, attr)
}
c.log.Debugf("filter [%d] document attr: %s", len(attrs), attrs.String())
documents = append(documents, document.ToModel(attrs))
}
return documents, nil
}

func (c *Client) DeleteDocument(ctx context.Context, entryId int64) error {
c.log.Debugf("delete document by entryId: %d", entryId)
aq := &AttrQuery{
Attr: "entryId",
Option: "=",
Value: fmt.Sprintf("%d", entryId),
ns := models.GetNamespace(ctx)
dq := (&DocumentQuery{}).OfEntryId(ns.String(), entryId)
t, err := c.docIndex.DeleteDocumentsByFilter(dq.ToFilter())
if err != nil {
c.log.Error(err)
return err
}
t, err := c.index("attr").DeleteDocumentsByFilter(aq.ToFilter())
if err := c.wait(ctx, c.docIndex, t.TaskUID); err != nil {
c.log.Errorf("delete document by filter error: %s", err)
}

c.log.Debugf("delete document attr by entryId: %d", entryId)
aq := (&DocumentAttrQuery{}).OfEntryId(ns.String(), fmt.Sprintf("%d", entryId))
t, err = c.attrIndex.DeleteDocumentsByFilter(aq.ToFilter())
if err != nil {
c.log.Error(err)
return err
}
if err := c.wait(ctx, "attr", t.TaskUID); err != nil {
if err := c.wait(ctx, c.attrIndex, t.TaskUID); err != nil {
c.log.Errorf("delete document by filter error: %s", err)
}
return nil
}

func (c *Client) wait(ctx context.Context, kind string, taskUID int64) error {
func (c *Client) wait(ctx context.Context, index meilisearch.IndexManager, taskUID int64) error {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("context timeout")
case <-t.C:
t, err := c.index(kind).GetTask(taskUID)
t, err := index.GetTask(taskUID)
if err != nil {
c.log.Error(err)
return err
Expand Down
Loading

0 comments on commit 782a1cc

Please sign in to comment.