Skip to content

Commit

Permalink
core: cold GET vs upgrading rlock to wlock
Browse files Browse the repository at this point in the history
* remove all `sync.Cond` related state and logic
  * reduce low-level `lock-info` to just rc and bool (excl)
* poll for up to 'host-busy' timeout
* return 'err-busy' if unsuccessful

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Oct 4, 2024
1 parent e469684 commit 9857e78
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 115 deletions.
15 changes: 15 additions & 0 deletions ais/test/scripts/random-del.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

#
# NOTE: internal use (local playground only)
#

aaa=($(find /tmp/ais -type f | grep %ob))
echo "Listed: ${#aaa[@]} objects"

while true
do
sleep 1
bbb=${aaa[$RANDOM % ${#aaa[@]}]}
rm $bbb 2>/dev/null && echo "Deleted: $bbb"
done
88 changes: 50 additions & 38 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -89,6 +90,12 @@ type (
latestVer bool // QparamLatestVer || 'versioning.*_warm_get'
isIOErr bool // to count GET error as a "IO error"; see `Trunner._softErrs()`
}
_uplock struct {
config *cmn.Config
timeout time.Duration
elapsed time.Duration
sleep time.Duration
}

// textbook append: (packed) handle and control structure (see also `putA2I` arch below)
aoHdl struct {
Expand Down Expand Up @@ -546,6 +553,7 @@ func (goi *getOI) getObject() (ecode int, err error) {
// is under rlock
func (goi *getOI) get() (ecode int, err error) {
var (
uplock *_uplock
cs fs.CapStatus
doubleCheck bool
retried bool
Expand Down Expand Up @@ -624,30 +632,36 @@ do:
}
}

// cold-GET: upgrade rlock => wlock, call t.Backend.GetObjReader
// cold-GET: upgrade rlock => wlock and call t.Backend.GetObjReader
if cold {
var (
res core.GetReaderResult
ckconf = goi.lom.CksumConf()
backend = goi.t.Backend(goi.lom.Bck())
loaded bool
)
if cs.IsNil() {
cs = fs.Cap()
}
if cs.IsOOS() {
return http.StatusInsufficientStorage, cs.Err()
}
goi.lom.SetAtimeUnix(goi.atime)

// upgrade rlock => wlock
if loaded, err = goi._coldLock(); err != nil {
return 0, err
}
if loaded {
goto fin
// try upgrading rlock => wlock; poll for a while
if !goi.lom.UpgradeLock() {
if uplock == nil {
c := cmn.GCO.Get()
uplock = &_uplock{config: c, sleep: time.Second}
uplock.timeout = max(c.Timeout.MaxHostBusy.D()-c.Timeout.CplaneOperation.D(), 4*time.Second)

nlog.Warningln(uplockWarn, goi.lom.String())
}
if err := uplock.do(goi.lom); err != nil {
return http.StatusConflict, err
}
goto do
}

goi.lom.SetAtimeUnix(goi.atime)
// zero-out prev. version custom metadata, if any
goi.lom.SetCustomMD(nil)

Expand Down Expand Up @@ -702,34 +716,6 @@ fin:
return ecode, err
}

// upgrade rlock => wlock
// done early to prevent multiple cold-readers duplicating network/disk operation and overwriting each other
func (goi *getOI) _coldLock() (loaded bool, err error) {
var (
lom = goi.lom
now int64
)
outer:
for lom.UpgradeLock() {
if erl := lom.Load(true /*cache it*/, true /*locked*/); erl == nil {
// nothing to do
// (lock was upgraded by another goroutine that had also performed PUT on our behalf)
return true, nil
}
switch {
case now == 0:
now = mono.NanoTime()
fallthrough
case mono.Since(now) < max(cmn.Rom.CplaneOperation(), 2*time.Second):
nlog.Errorln("failed to load", lom.String(), "err:", err, "- retrying...")
default:
err = cmn.NewErrBusy("object", lom.Cname())
break outer
}
}
return
}

func (goi *getOI) _coldPut(res *core.GetReaderResult) (int, error) {
var (
t, lom = goi.t, goi.lom
Expand Down Expand Up @@ -1178,7 +1164,8 @@ func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string) error {
goi.t.reb.FilterAdd(*bname)
} else if !goi.cold { // GFN & cold-GET: must be already loaded w/ atime set
if err := goi.lom.Load(false /*cache it*/, true /*locked*/); err != nil {
nlog.Errorf("%s: GET post-transmission failure: %v", goi.t, err)
fs.CleanPathErr(err)
nlog.Errorln(goi.t.String(), "GET post-transmission failure:", err)
return errSendingResp
}
goi.lom.SetAtimeUnix(goi.atime)
Expand Down Expand Up @@ -1882,6 +1869,31 @@ func (t *target) putMirror(lom *core.LOM) {
xputlrep.Repl(lom)
}

//
// uplock
//

const uplockWarn = "conflict getting remote"

func (u *_uplock) do(lom *core.LOM) error {
if u.elapsed > u.timeout {
err := cmn.NewErrBusy("node", core.T.String(), uplockWarn+" '"+lom.Cname()+"'")
nlog.ErrorDepth(1, err)
return err
}

lom.Unlock(false)
runtime.Gosched()
time.Sleep(u.sleep)
lom.Lock(false) // all over again: try load and check all respective conditions

u.elapsed += u.sleep
if u.elapsed == 3*u.sleep && u.sleep < u.config.Timeout.CplaneOperation.D() {
u.sleep <<= 1
}
return nil
}

// TODO:
// - CopyBuffer
// - currently, only tar - add message pack (what else?)
Expand Down
2 changes: 1 addition & 1 deletion cmn/ver_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const GitHubHome = "https://github.com/NVIDIA/aistore"
// `jsp` formats its *signature* and other implementation details.

const (
VersionAIStore = "3.24.rc4"
VersionAIStore = "3.24"
VersionCLI = "1.13"
VersionLoader = "1.12"
VersionAuthN = "1.1"
Expand Down
89 changes: 13 additions & 76 deletions core/namelocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ type (
mu sync.Mutex
}
lockInfo struct {
wcond *sync.Cond // to synchronize "waiting room" upgrade logic
rc int32 // read-lock refcount
waiting int32 // waiting room count
exclusive bool // write-lock
upgraded bool // indication for the waiters that upgrade's done
rc int32 // read-lock refcount
exclusive bool // write-lock
}
)

Expand Down Expand Up @@ -73,31 +70,6 @@ func newNameLocker() (nl nameLocker) {
return
}

//////////////
// lockInfo //
//////////////

func (li *lockInfo) notify() {
if li.wcond == nil || li.waiting == 0 {
return
}
debug.Assert(li.rc >= li.waiting)
if li.upgraded {
// has been upgraded - wake up all waiters
li.wcond.Broadcast()
} else {
// wake up only the owner
li.wcond.Signal()
}
}

func (li *lockInfo) decWaiting() {
li.waiting--
if li.waiting == 0 {
li.wcond = nil
}
}

/////////
// nlc //
/////////
Expand Down Expand Up @@ -142,10 +114,6 @@ func (nlc *nlc) try(uname string, exclusive bool) bool {
if li.exclusive {
return false
}
// can't rlock if there's someone trying to upgrade
if li.waiting > 0 {
return false
}
li.rc++
return true
}
Expand All @@ -167,78 +135,47 @@ func (nlc *nlc) Lock(uname string, exclusive bool) {
}
}

// upgrade rlock -> wlock
// e.g. usage: simultaneous cold GET
// returns true if exclusively locked by _another_ thread
func (nlc *nlc) UpgradeLock(uname string) bool {
// lone reader: upgrade rlock -> wlock
// otherwise: fail
func (nlc *nlc) UpgradeLock(uname string) (wlocked bool) {
nlc.mu.Lock()
li, found := nlc.m[uname]
debug.Assert(found && !li.exclusive && li.rc > 0)
debug.Assertf(found && !li.exclusive && li.rc > 0, "found %t li.exclusive %t li.rc %d", found, li.exclusive, li.rc)

if li.rc == 1 {
li.rc = 0
li.exclusive = true
nlc.mu.Unlock()
return false
}

//
// TODO -- FIXME: consider removing this part, simplifying `wcond` out, returning EBUSY instead..
//

if li.wcond == nil {
li.wcond = sync.NewCond(&nlc.mu)
}
li.waiting++
// Wait here until all readers get in line
for li.rc != li.waiting {
li.wcond.Wait()

// Has been upgraded by smbd. else
if li.upgraded {
li.decWaiting()
nlc.mu.Unlock()
return true
}
wlocked = true
}
// Upgrading
li.upgraded = true
li.rc--
li.decWaiting()
li.exclusive = true
nlc.mu.Unlock()
return false
return wlocked
}

func (nlc *nlc) DowngradeLock(uname string) {
nlc.mu.Lock()
li, found := nlc.m[uname]
debug.Assert(found && li.exclusive)
debug.Assertf(found && li.exclusive, "found %t li.exclusive %t", found, li.exclusive)
li.rc++
li.exclusive = false
li.notify()
nlc.mu.Unlock()
}

func (nlc *nlc) Unlock(uname string, exclusive bool) {
nlc.mu.Lock()
li, found := nlc.m[uname]
debug.Assert(found)

if exclusive {
debug.Assert(li.exclusive)
if li.waiting > 0 {
li.exclusive = false
li.notify()
} else {
delete(nlc.m, uname)
}
delete(nlc.m, uname)
nlc.mu.Unlock()
return
}

li.rc--
if li.rc == 0 {
delete(nlc.m, uname)
}
li.notify()
nlc.mu.Unlock()
}

Expand Down

0 comments on commit 9857e78

Please sign in to comment.