Skip to content
This repository has been archived by the owner on Dec 14, 2023. It is now read-only.

Cache api sessions for router/tunneler. Fixes #1364 #1365

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions controller/handler_edge_ctrl/common_tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/storage/boltz"
"github.com/sirupsen/logrus"
"go.etcd.io/bbolt"
"sync"
"time"
)
Expand Down Expand Up @@ -131,6 +132,18 @@ func (self *baseTunnelRequestContext) ensureApiSessionLocking(configTypes []stri
}
}

identityMgr := self.handler.getAppEnv().Managers.Identity
if cachedApiSessionId, _ := identityMgr.GetAnnotation(self.identity.Id, "apiSessionId"); cachedApiSessionId != nil {
apiSession, _ := self.handler.getAppEnv().Managers.ApiSession.Read(*cachedApiSessionId)
if apiSession != nil && apiSession.IdentityId == self.identity.Id {
self.apiSession = apiSession
if _, err := self.handler.getAppEnv().GetManagers().ApiSession.MarkActivityByTokens(self.apiSession.Token); err != nil {
logger.WithError(err).Error("unexpected error while marking api session activity")
}
return true
}
}

apiSession := &model.ApiSession{
Token: uuid.NewString(),
IdentityId: self.identity.Id,
Expand All @@ -139,14 +152,23 @@ func (self *baseTunnelRequestContext) ensureApiSessionLocking(configTypes []stri
IPAddress: self.handler.getChannel().Underlay().GetRemoteAddr().String(),
}

var err error
apiSession.Id, err = self.handler.getAppEnv().GetManagers().ApiSession.Create(apiSession, nil)
if err != nil {
self.err = internalError(err)
return false
}
err := self.handler.getAppEnv().GetDbProvider().GetDb().Update(func(tx *bbolt.Tx) error {
ctx := boltz.NewMutateContext(tx)

var err error
apiSession.Id, err = self.handler.getAppEnv().GetManagers().ApiSession.Create(ctx, apiSession, nil)
if err != nil {
return err
}

if err = identityMgr.Annotate(ctx, self.identity.Id, "apiSessionId", apiSession.Id); err != nil {
logger.WithError(err).Error("failed to cache new api session on router identity")
}

apiSession, err = self.handler.getAppEnv().GetManagers().ApiSession.ReadInTx(ctx.Tx(), apiSession.Id)
return err
})

apiSession, err = self.handler.getAppEnv().GetManagers().ApiSession.Read(apiSession.Id)
if err != nil {
self.err = internalError(err)
return false
Expand Down
2 changes: 1 addition & 1 deletion controller/internal/routes/authenticate_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (ro *AuthRouter) authHandler(ae *env.AppEnv, rc *response.RequestContext, h
sessionCerts = append(sessionCerts, sessionCert)
}

sessionId, err := ae.Managers.ApiSession.Create(newApiSession, sessionCerts)
sessionId, err := ae.Managers.ApiSession.Create(nil, newApiSession, sessionCerts)

if err != nil {
rc.RespondWithError(err)
Expand Down
55 changes: 31 additions & 24 deletions controller/model/api_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,41 @@ func (self *ApiSessionManager) newModelEntity() edgeEntity {
return &ApiSession{}
}

func (self *ApiSessionManager) Create(entity *ApiSession, sessionCerts []*ApiSessionCertificate) (string, error) {
entity.Id = cuid.New() //use cuids which are longer than shortids but are monotonic

var apiSessionId string
err := self.env.GetDbProvider().GetDb().Update(func(tx *bbolt.Tx) error {
var err error
ctx := boltz.NewMutateContext(tx)
apiSessionId, err = self.createEntityInTx(ctx, entity)

if err != nil {
func (self *ApiSessionManager) Create(ctx boltz.MutateContext, entity *ApiSession, sessionCerts []*ApiSessionCertificate) (string, error) {
if ctx == nil {
var apiSessionId string
err := self.env.GetDbProvider().GetDb().Update(func(tx *bbolt.Tx) error {
ctx = boltz.NewMutateContext(tx)
var err error
apiSessionId, err = self.CreateInCtx(ctx, entity, sessionCerts)
return err
})
if err != nil {
return "", err
}
return apiSessionId, nil
}

for _, sessionCert := range sessionCerts {
sessionCert.ApiSessionId = apiSessionId
_, err := self.env.GetManagers().ApiSessionCertificate.createEntityInTx(ctx, sessionCert)
return self.CreateInCtx(ctx, entity, sessionCerts)
}

if err != nil {
return err
}
}
return nil
})
func (self *ApiSessionManager) CreateInCtx(ctx boltz.MutateContext, entity *ApiSession, sessionCerts []*ApiSessionCertificate) (string, error) {
entity.Id = cuid.New() //use cuids which are longer than shortids but are monotonic
apiSessionId, err := self.createEntityInTx(ctx, entity)

if err != nil {
self.MarkActivityById(apiSessionId)
return "", err
}

for _, sessionCert := range sessionCerts {
sessionCert.ApiSessionId = apiSessionId
if _, err = self.env.GetManagers().ApiSessionCertificate.createEntityInTx(ctx, sessionCert); err != nil {
return "", err
}
}

self.MarkActivityById(apiSessionId)

return apiSessionId, err
}

Expand All @@ -98,7 +105,7 @@ func (self *ApiSessionManager) ReadByToken(token string) (*ApiSession, error) {
return modelApiSession, nil
}

func (self *ApiSessionManager) readInTx(tx *bbolt.Tx, id string) (*ApiSession, error) {
func (self *ApiSessionManager) ReadInTx(tx *bbolt.Tx, id string) (*ApiSession, error) {
modelApiSession := &ApiSession{}
if err := self.readEntityInTx(tx, id, modelApiSession); err != nil {
return nil, err
Expand Down Expand Up @@ -202,7 +209,7 @@ func (self *ApiSessionManager) Stream(query string, collect func(*ApiSession, er
for cursor := self.Store.IterateIds(tx, filter); cursor.IsValid(); cursor.Next() {
current := cursor.Current()

apiSession, err := self.readInTx(tx, string(current))
apiSession, err := self.ReadInTx(tx, string(current))
if err := collect(apiSession, err); err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +247,7 @@ func (self *ApiSessionManager) Query(query string) (*ApiSessionListResult, error

func (self *ApiSessionManager) VisitFingerprintsForApiSessionId(apiSessionId string, visitor func(fingerprint string) bool) error {
return self.GetDb().View(func(tx *bbolt.Tx) error {
apiSession, err := self.readInTx(tx, apiSessionId)
apiSession, err := self.ReadInTx(tx, apiSessionId)
if err != nil {
return errors.Wrapf(err, "could not query fingerprints by api session id [%s]", apiSessionId)
}
Expand Down Expand Up @@ -288,7 +295,7 @@ type ApiSessionListResult struct {
func (result *ApiSessionListResult) collect(tx *bbolt.Tx, ids []string, queryMetaData *models.QueryMetaData) error {
result.QueryMetaData = *queryMetaData
for _, key := range ids {
ApiSession, err := result.manager.readInTx(tx, key)
ApiSession, err := result.manager.ReadInTx(tx, key)
if err != nil {
return err
}
Expand Down
27 changes: 27 additions & 0 deletions controller/model/base_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"reflect"
)

const annotationsBucketName = "annotations"

type EntityManager interface {
models.EntityRetriever[models.Entity]
command.EntityDeleter
Expand Down Expand Up @@ -381,6 +383,31 @@ func (self *baseEntityManager) iterateRelatedEntitiesInTx(tx *bbolt.Tx, id, fiel
return nil
}

func (self *baseEntityManager) Annotate(ctx boltz.MutateContext, entityId string, key, value string) error {
entityBucket := self.GetStore().GetEntityBucket(ctx.Tx(), []byte(entityId))
if entityBucket == nil {
return boltz.NewNotFoundError(self.GetStore().GetEntityType(), "id", entityId)
}
annotationsBucket := entityBucket.GetOrCreatePath(annotationsBucketName)
Copy link
Member

@andrewpmartinez andrewpmartinez Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is a reserved bucket named annotations on every entity that stores a string to string kvs?

And this is currently only used to store API Session IDs for router/tunnelers to store their own, hopefully singular, API Session ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I started off making it more general, allowing arbitrary sets and gets, but that complicated the API. I was thinking if we ever need support for other types, we can add it then. Also when the great session refactor happens we may want to drop the API again.

I could also put the API just on the session store, but I don't think someone is going to accidentally use it, so I wasn't too worried about it.

annotationsBucket.SetString(key, value, nil)
return annotationsBucket.GetError()
}

func (self *baseEntityManager) GetAnnotation(entityId string, key string) (*string, error) {
var result *string
err := self.GetDb().View(func(tx *bbolt.Tx) error {
entityBucket := self.GetStore().GetEntityBucket(tx, []byte(entityId))
if entityBucket == nil {
return nil
}
if annotationsBucket := entityBucket.GetPath(annotationsBucketName); annotationsBucket != nil {
result = annotationsBucket.GetString(key)
}
return nil
})
return result, err
}

type AndFieldChecker struct {
first boltz.FieldChecker
second boltz.FieldChecker
Expand Down
2 changes: 1 addition & 1 deletion controller/model/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (ctx *TestContext) requireNewApiSession(identity *Identity) *ApiSession {
Identity: identity,
LastActivityAt: time.Now(),
}
_, err := ctx.managers.ApiSession.Create(entity, nil)
_, err := ctx.managers.ApiSession.Create(nil, entity, nil)
ctx.NoError(err)
return entity
}
Expand Down