Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Dec 2, 2023
1 parent dcd4389 commit f31953a
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 32 deletions.
171 changes: 149 additions & 22 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"sync"
"time"

hlow "github.com/nikandfor/hacked/low"
"tlog.app/go/eazy"

Check failure on line 15 in agent/agent.go

View workflow job for this annotation

GitHub Actions / lint

`tlog.app/go/eazy` is not in the allowlist (depguard)
"tlog.app/go/errors"

Check failure on line 16 in agent/agent.go

View workflow job for this annotation

GitHub Actions / lint

`tlog.app/go/errors` is not in the allowlist (depguard)
"tlog.app/go/loc"

Check failure on line 17 in agent/agent.go

View workflow job for this annotation

GitHub Actions / lint

`tlog.app/go/loc` is not in the allowlist (depguard)

"tlog.app/go/tlog"

Check failure on line 19 in agent/agent.go

View workflow job for this annotation

GitHub Actions / lint

`tlog.app/go/tlog` is not in the allowlist (depguard)
"tlog.app/go/tlog/tlwire"
Expand All @@ -34,6 +37,7 @@ type (
KeyTimestamp string

Partition time.Duration
FileSize int64
BlockSize int64

Stderr io.Writer
Expand All @@ -46,6 +50,10 @@ type (
sum uint32

file *file

z *eazy.Writer
zbuf hlow.Buf
boff int64
}

file struct {
Expand Down Expand Up @@ -80,8 +88,10 @@ func New(path string) (*Agent, error) {
a := &Agent{
path: path,

Partition: 3 * time.Hour,
KeyTimestamp: tlog.KeyTimestamp,
Partition: 3 * time.Hour,
FileSize: eazy.GiB,
BlockSize: 16 * eazy.MiB,

Stderr: os.Stderr,
}
Expand Down Expand Up @@ -163,7 +173,6 @@ func (a *Agent) parseEventHeader(p []byte) (ts int64, labels []byte, err error)

func (a *Agent) file(ts int64, labels []byte, size int) (*file, *stream, error) {
sum := crc32.ChecksumIEEE(labels)
part := time.Unix(0, ts).Truncate(a.Partition).UnixNano()

var s *stream

Expand All @@ -179,25 +188,96 @@ func (a *Agent) file(ts int64, labels []byte, size int) (*file, *stream, error)
labels: labels,
sum: sum,
}

s.z = eazy.NewWriter(&s.zbuf, eazy.MiB, 1024)
s.z.AppendMagic = true

a.streams = append(a.streams, s)
}

if s.file == nil {
f, err := a.newFile(s, part, ts)
return s.file, s, nil
}

func (a *Agent) writeFile(s *stream, f *file, p []byte, ts int64) (n int, err error) {
tlog.Printw("write message", "i", geti(p))

s.zbuf = s.zbuf[:0]
n, err = s.z.Write(p)
if err != nil {
return 0, errors.Wrap(err, "eazy")
}

part := time.Unix(0, ts).Truncate(a.Partition).UnixNano()

if f == nil || f.part != part || f.off+int64(len(s.zbuf)) > a.FileSize && f.off != 0 {
f, err = a.newFile(s, part, ts)
if err != nil {
return nil, nil, errors.Wrap(err, "new file")
return 0, errors.Wrap(err, "new file")
}

s.file = f

s.z.Reset(&s.zbuf)
s.boff = 0
}

return s.file, s, nil
defer f.mu.Unlock()
f.mu.Lock()

// tlog.Printw("write file", "file", f.name, "off", tlog.NextAsHex, f.off, "boff", tlog.NextAsHex, s.boff, "block", tlog.NextAsHex, a.BlockSize)
nextBlock := false

if nextBlock := s.boff+int64(len(s.zbuf)) > a.BlockSize && s.boff != 0; nextBlock {
err = a.padFile(s, f)
tlog.Printw("pad file", "off", tlog.NextAsHex, f.off, "err", err)
if err != nil {
return 0, errors.Wrap(err, "pad file")
}

s.z.Reset(&s.zbuf)
s.boff = 0
}

if len(f.index) == 0 || nextBlock {
tlog.Printw("append index", "off", tlog.NextAsHex, f.off, "ts", ts/1e9)
f.index = append(f.index, ientry{
off: f.off,
ts: ts,
})
}

if s.boff == 0 {
s.zbuf = s.zbuf[:0]
n, err = s.z.Write(p)
if err != nil {
return 0, errors.Wrap(err, "eazy")
}
}

n, err = f.w.Write(s.zbuf)
// tlog.Printw("write message", "zst", tlog.NextAsHex, zst, "n", tlog.NextAsHex, n, "err", err)
if err != nil {
return n, err
}

f.off += int64(n)
s.boff += int64(n)

return len(p), nil
}

func (a *Agent) newFile(s *stream, part, ts int64) (*file, error) {
base := fmt.Sprintf("%08x/%08x_%08x.tlz", part/1e9, s.sum, ts/1e9)
// base := fmt.Sprintf("%08x/%08x_%08x.tlz", part/1e9, s.sum, ts/1e9)
base := fmt.Sprintf("%v/%08x_%08x.tlz",
time.Unix(0, part).UTC().Format("2006-01-02T15:04"),
s.sum,
ts/1e9,
)
fname := filepath.Join(a.path, base)
dir := filepath.Dir(fname)

tlog.Printw("new file", "file", base, "from", loc.Callers(1, 2))

err := os.MkdirAll(dir, 0o755)
if err != nil {
return nil, errors.Wrap(err, "mkdir")
Expand All @@ -209,7 +289,8 @@ func (a *Agent) newFile(s *stream, part, ts int64) (*file, error) {
}

f := &file{
w: w,
w: w,
name: fname,

prev: s.file,

Expand All @@ -225,25 +306,71 @@ func (a *Agent) newFile(s *stream, part, ts int64) (*file, error) {
return f, nil
}

func (a *Agent) writeFile(s *stream, f *file, p []byte, ts int64) (n int, err error) {
defer f.mu.Unlock()
f.mu.Lock()
func (a *Agent) padFile(s *stream, f *file) error {
if f.off%int64(a.BlockSize) == 0 {
s.boff = 0

st := f.off
return nil
}

n, err = f.w.Write(p)
if err != nil {
return
off := f.off + int64(a.BlockSize) - f.off%int64(a.BlockSize)

if s, ok := f.w.(interface {
Truncate(int64) error
io.Seeker
}); ok {
err := s.Truncate(off)
if err != nil {
return errors.Wrap(err, "truncate")
}

off, err = s.Seek(off, io.SeekStart)
if err != nil {
return errors.Wrap(err, "seek")
}

f.off = off
} else {
n, err := f.w.Write(make([]byte, off-f.off))
if err != nil {
return errors.Wrap(err, "write padding")
}

f.off += int64(n)
}

f.off += int64(n)
return nil
}

if len(f.index) == 0 || f.off >= f.index[len(f.index)-1].off+a.BlockSize {
f.index = append(f.index, ientry{
off: st,
ts: ts,
})
func geti(p []byte) (x int64) {
var d tlwire.LowDecoder

tag, els, i := d.Tag(p, 0)
if tag != tlwire.Map {
return -1
}

return
var k []byte
var sub int64
var end int

for el := 0; els == -1 || el < int(els); el++ {
if els == -1 && d.Break(p, &i) {
break
}

k, i = d.Bytes(p, i)
if len(k) == 0 {
return -1
}

tag, sub, end = d.SkipTag(p, i)
if tag == tlwire.Int && string(k) == "i" {
return sub
}

i = end
}

return -1
}
30 changes: 23 additions & 7 deletions cmd/tlog/tlogcmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func App() *cli.Command {

tlzCmd := &cli.Command{
Name: "tlz,eazy",
Description: "logs compressor/decompressor",
Description: "compressor/decompressor",
Flags: []*cli.Flag{
cli.NewFlag("output,o", "-", "output file (or stdout)"),
},
Expand All @@ -76,8 +76,8 @@ func App() *cli.Command {
Action: tlzRun,
Args: cli.Args{},
Flags: []*cli.Flag{
cli.NewFlag("block,b", 1*eazy.MiB, "compression block size (window)"),
cli.NewFlag("hash-table,ht,h", 1*1024, "hash table size"),
cli.NewFlag("block-size,block,bs,b", 1*eazy.MiB, "compression block size (window)"),
cli.NewFlag("hash-table,ht", 1*1024, "hash table size"),
},
}, {
Name: "decompress,d",
Expand All @@ -100,6 +100,10 @@ func App() *cli.Command {
Action: agentRun,
Flags: []*cli.Flag{
cli.NewFlag("db", "", "path to logs db"),
cli.NewFlag("db-partition", 3*time.Hour, "db partition size"),
cli.NewFlag("db-file-size", int64(eazy.GiB), "db file size"),
cli.NewFlag("db-block-size", int64(16*eazy.MiB), "db file block size"),

cli.NewFlag("clickdb", "", "clickhouse dsn"),

cli.NewFlag("listen,l", []string(nil), "listen url"),
Expand Down Expand Up @@ -219,10 +223,16 @@ func agentRun(c *cli.Command) (err error) {
}

if q := c.String("db"); q != "" {
a, err = agent.New(q)
x, err := agent.New(q)
if err != nil {
return errors.Wrap(err, "new agent")
}

x.Partition = c.Duration("db-partition")
x.FileSize = c.Int64("db-file-size")
x.BlockSize = c.Int64("db-block-size")

a = x
} else if q := c.String("clickdb"); q != "" {
opts, err := clickhouse.ParseDSN(q)
if err != nil {
Expand Down Expand Up @@ -338,8 +348,9 @@ func agentRun(c *cli.Command) (err error) {

defer closeWrap(c, &err, "close conn")

f := a.(tlio.Flusher)
defer doWrap(f.Flush, &err, "flush db")
if f, ok := a.(tlio.Flusher); ok {
defer doWrap(f.Flush, &err, "flush db")
}

rr := tlwire.NewStreamDecoder(c)

Expand Down Expand Up @@ -619,7 +630,12 @@ func tlzRun(c *cli.Command) (err error) {

_, err = d.Write(data)
if err != nil {
return errors.Wrap(err, "copy")
return errors.Wrap(err, "dumper")
}

err = d.Close()
if err != nil {
return errors.Wrap(err, "close dumper")
}
default:
return errors.New("unexpected command: %v", c.MainName())
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
nikand.dev/go/cli v0.0.0-20231112170903-c354aca481d7
nikand.dev/go/graceful v0.0.0-20231112170209-83e600cad2a7
tlog.app/go/eazy v0.1.1
nikand.dev/go/throttle v0.0.0-20231201183633-5cdc67b2c618
tlog.app/go/eazy v0.3.0-rc0.0.20231128195915-400a9a9e8d14
tlog.app/go/errors v0.9.0
tlog.app/go/loc v0.6.1
)
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,8 @@ nikand.dev/go/cli v0.0.0-20231112170903-c354aca481d7 h1:1I40D6BtUdwFVxWb2iWmAyt4
nikand.dev/go/cli v0.0.0-20231112170903-c354aca481d7/go.mod h1:j1PHh2w3QhXwwB81piujER/+4F9fydpUl56LjYbmw3w=
nikand.dev/go/graceful v0.0.0-20231112170209-83e600cad2a7 h1:fLiMDnIUq34ldRFwUK6FdU2gkodswcN+CynBL7jQRqc=
nikand.dev/go/graceful v0.0.0-20231112170209-83e600cad2a7/go.mod h1:MjJf65Xs0DhePtE05fcxfSU33wAC3QmSzlgXcs2j9Fo=
nikand.dev/go/throttle v0.0.0-20231201183633-5cdc67b2c618 h1:LIruT8q+HmR3VkbPcKq5IQ+jGgHYXP0N4xF86qdI9RQ=
nikand.dev/go/throttle v0.0.0-20231201183633-5cdc67b2c618/go.mod h1:pMSnSyAs8auAorT3rQU2lHSB6W2eXJqEfkiX1dh40WA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand All @@ -1649,10 +1651,10 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
tlog.app/go/eazy v0.1.1-0.20231112172055-8af7cd0b3803 h1:hdl5PtFtPWjIKWTa9DrEdTSdrgJ4488A/7NsvylV5xo=
tlog.app/go/eazy v0.1.1-0.20231112172055-8af7cd0b3803/go.mod h1:uKkUrF45gSJ7Rxz6R81d+AB9QqgWLOXChFYvEXjfz9Y=
tlog.app/go/eazy v0.1.1 h1:jWSuanI4gOUjo3MOZ2kRP3gXzI1RJGcCWwXh+92AF0U=
tlog.app/go/eazy v0.1.1/go.mod h1:uKkUrF45gSJ7Rxz6R81d+AB9QqgWLOXChFYvEXjfz9Y=
tlog.app/go/eazy v0.3.0-rc0.0.20231128195915-400a9a9e8d14 h1:SBm9Y5I2sFG9ZaZzYVQ4Vk16xUuSjWqTre5wSWEu6xg=
tlog.app/go/eazy v0.3.0-rc0.0.20231128195915-400a9a9e8d14/go.mod h1:2rvsqcrnBxZ6t8R3nYKYqVGxYqgsLaLil+6MFQ4rQDw=
tlog.app/go/errors v0.9.0 h1:Cho3l9BTqfQC+1vhD3JxNYoZLbZH2FTrpOl8hktxlpM=
tlog.app/go/errors v0.9.0/go.mod h1:QVvUyVpI/askfIqtxWlLWpZP+n8u/yocyU0XRizn+dM=
tlog.app/go/loc v0.6.1 h1:Eae0owcUBxpQm5KRuZHJFQKXlA7D8hGMw53tKdp0zMY=
Expand Down
10 changes: 10 additions & 0 deletions tlwire/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ const (
SemanticTlogBase = 10
)

// Meta.
const (
MetaMagic = iota
MetaVer

MetaTlogBase = 8
)

const Magic = "\xc0\x64tlog"

func init() {
if Break != TagDetMask {
panic(Break)
Expand Down
1 change: 1 addition & 0 deletions wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
NextAsType = FormatNext("%T")
)

// Wire semantic tags.
const (
WireLabel = tlwire.SemanticTlogBase + iota
WireID
Expand Down

0 comments on commit f31953a

Please sign in to comment.