Skip to content

Commit

Permalink
feat: add crawler implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Nov 29, 2023
1 parent fe34377 commit 36d4f29
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 27 deletions.
15 changes: 13 additions & 2 deletions workers/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package main

import (
"errors"
"fmt"
poolparty "github.com/we-are-discussing-rest/pool-party"
"github.com/we-are-discussing-rest/web-crawler/workers/internal"
"github.com/we-are-discussing-rest/web-crawler/workers/repository"
"log/slog"
"sync/atomic"
"time"
)

type Crawler struct {
Expand All @@ -16,6 +17,8 @@ type Crawler struct {
type CrawlerOpts struct {
urls []string
store repository.Repository
queue repository.Repository
currDepth *atomic.Uint64
workerPool *poolparty.Pool
logger *slog.Logger
}
Expand All @@ -25,14 +28,17 @@ func NewCrawler(opts CrawlerOpts) *Crawler {
opts: CrawlerOpts{
urls: opts.urls,
store: opts.store,
queue: opts.queue,
workerPool: opts.workerPool,
logger: opts.logger,
currDepth: opts.currDepth,
},
}
}

func (c *Crawler) Crawl() {
for _, url := range c.opts.urls {
time.Sleep(3 * time.Second)
c.opts.workerPool.Send(func() {
c.opts.logger.Info("starting a crawl", "url", url)

Expand Down Expand Up @@ -70,8 +76,13 @@ func (c *Crawler) Crawl() {
}

for _, link := range links {
fmt.Println(link)
queueErr := c.opts.queue.Insert(link)
if queueErr != nil {
c.opts.logger.Error("error writing to queue", "error", queueErr)
return
}
}
})
}
c.opts.currDepth.Add(1)
}
5 changes: 2 additions & 3 deletions workers/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.21.3

require (
github.com/go-redis/redis/v8 v8.11.5
github.com/we-are-discussing-rest/web-crawler v0.0.0-20231119205029-2f3772a5f371
golang.org/x/net v0.18.0
modernc.org/sqlite v1.27.0
)

require (
Expand All @@ -25,13 +25,12 @@ require (
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.7.2 // indirect
modernc.org/opt v0.1.3 // indirect
modernc.org/sqlite v1.27.0 // indirect
modernc.org/strutil v1.1.3 // indirect
modernc.org/token v1.0.1 // indirect
)

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/we-are-discussing-rest/pool-party v0.1.2
github.com/we-are-discussing-rest/pool-party v0.1.3
)
22 changes: 18 additions & 4 deletions workers/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/we-are-discussing-rest/pool-party v0.1.2 h1:T4XGHJg27hgp/xB+WYCNbhOsKqWsjfAPjMCdzZd69Fs=
github.com/we-are-discussing-rest/pool-party v0.1.2/go.mod h1:YO0lYVZBDn0AgyO873RfJylRtm6vl/amlL0TpnKkbVM=
github.com/we-are-discussing-rest/web-crawler v0.0.0-20231119205029-2f3772a5f371 h1:ox8kDW3yjYhe99yfyYnlcje7yCiKowy6me6Aa92kGWk=
github.com/we-are-discussing-rest/web-crawler v0.0.0-20231119205029-2f3772a5f371/go.mod h1:RJGcJ5Q+SfCxS/cxjyGzNTjy+uVNQdYpy5VTFA9ewoo=
github.com/we-are-discussing-rest/pool-party v0.1.3 h1:F0sSgG+ObWTWTZHzHQmJua67EyUcY1Rd8VBS/u+S89g=
github.com/we-are-discussing-rest/pool-party v0.1.3/go.mod h1:YO0lYVZBDn0AgyO873RfJylRtm6vl/amlL0TpnKkbVM=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -67,6 +73,10 @@ modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw=
modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0=
modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw=
modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY=
modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk=
modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM=
modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM=
modernc.org/libc v1.29.0 h1:tTFRFq69YKCF2QyGNuRUQxKBm1uZZLubf6Cjh/pVHXs=
modernc.org/libc v1.29.0/go.mod h1:DaG/4Q3LRRdqpiLyP0C2m1B8ZMGkQ+cCgOIjEtQlYhQ=
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
Expand All @@ -79,5 +89,9 @@ modernc.org/sqlite v1.27.0 h1:MpKAHoyYB7xqcwnUwkuD+npwEa0fojF0B5QRbN+auJ8=
modernc.org/sqlite v1.27.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/tcl v1.15.2 h1:C4ybAYCGJw968e+Me18oW55kD/FexcHbqH2xak1ROSY=
modernc.org/tcl v1.15.2/go.mod h1:3+k/ZaEbKrC8ePv8zJWPtBSW0V7Gg9g8rkmhI1Kfs3c=
modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg=
modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
modernc.org/z v1.7.3 h1:zDJf6iHjrnB+WRD88stbXokugjyc0/pB91ri1gO6LZY=
modernc.org/z v1.7.3/go.mod h1:Ipv4tsdxZRbQyLq9Q1M6gdbkxYzdlrciF2Hi/lS7nWE=
44 changes: 27 additions & 17 deletions workers/main.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
package main

import (
"context"
"github.com/go-redis/redis/v8"
poolparty "github.com/we-are-discussing-rest/pool-party"
"github.com/we-are-discussing-rest/web-crawler/workers/repository"
"github.com/we-are-discussing-rest/web-crawler/workers/utils"
"log/slog"
"os"
"sync/atomic"
)

func main() {
ctx := context.Background()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
store := repository.NewSqliteRepo(utils.Lookup("SQLITE_URL", "data.db"), logger)

p := poolparty.NewPool(5)
c := NewCrawler(CrawlerOpts{
urls: []string{"https://example.com"},
store: store,
workerPool: p,
logger: logger,
queue := repository.NewRedisRepo(logger, &redis.Options{
Addr: utils.Lookup("REDIS_HOST", "localhost:6379"),
Username: utils.Lookup("REDIS_USER", ""),
Password: utils.Lookup("REDIS_PW", ""),
})
p := poolparty.NewPool(20)
p.Start()

ct := NewCrawler(CrawlerOpts{
urls: []string{"https://en.wikipedia.org/wiki/B-tree"},
store: store,
workerPool: p,
logger: logger,
})
m := MapQueues(queue, logger, ctx)

p.Start()
c.Crawl()
ct.Crawl()
for _, v := range m {
am, err := queue.GetAllMessages(v, ctx)
if err != nil {
return
}

NewCrawler(CrawlerOpts{
currDepth: &atomic.Uint64{},
urls: am,
store: store,
queue: queue,
workerPool: p,
logger: logger,
}).Crawl()
}

p.Stop()
defer p.Stop()
}
12 changes: 12 additions & 0 deletions workers/queue.go
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
package main

import (
"context"
"github.com/we-are-discussing-rest/web-crawler/workers/repository"
"log/slog"
)

func MapQueues(queue *repository.RedisRepo, logger *slog.Logger, ctx context.Context) []string {
keys := queue.GetAllKeys(ctx)
logger.Info("got mapping for all queues")
return keys
}
30 changes: 29 additions & 1 deletion workers/repository/redis_queue_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package repository

import (
"context"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/we-are-discussing-rest/web-crawler/workers/utils"
"log/slog"
)

var (
NoMessageError = errors.New("no messages in the queue")
)

type RedisRepo struct {
*redis.Client
logger *slog.Logger
Expand Down Expand Up @@ -53,5 +58,28 @@ func (r *RedisRepo) Remove(data string) error {
}

func (r *RedisRepo) Get(data string) (string, error) {
return "", nil
r.logger.Info("dequeue", "key", data)
val := r.Client.RPop(r.Context(), data)
if val.Val() == "" {
return "", NoMessageError
}

return val.Val(), nil
}

func (r *RedisRepo) GetAllMessages(data string, ctx context.Context) ([]string, error) {
r.logger.Info("getting queue len", "key", data)
ql := r.Client.LLen(ctx, data)
val := r.Client.RPopCount(ctx, data, int(ql.Val()))
if len(val.Val()) == 0 {
return nil, NoMessageError
}

return val.Val(), nil
}

func (r *RedisRepo) GetAllKeys(ctx context.Context) []string {
ret := r.Client.Keys(ctx, "*")

return ret.Val()
}

0 comments on commit 36d4f29

Please sign in to comment.