Skip to content

Commit

Permalink
Merge pull request #6 from TeaDove/feature/pg-and-markov
Browse files Browse the repository at this point in the history
Feature/pg and markov
  • Loading branch information
TeaDove authored Jul 15, 2024
2 parents 16ec1b8 + 22583cb commit 1708b1f
Show file tree
Hide file tree
Showing 75 changed files with 2,353 additions and 3,075 deletions.
34 changes: 21 additions & 13 deletions core/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package container
import (
"context"

"github.com/teadove/fun_telegram/core/infrastructure/pg"
"github.com/teadove/fun_telegram/core/repository/db_repository"

"github.com/teadove/fun_telegram/core/supplier/ds_supplier"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/teadove/fun_telegram/core/presentation/telegram"
"github.com/teadove/fun_telegram/core/repository/ch_repository"
"github.com/teadove/fun_telegram/core/repository/mongo_repository"
"github.com/teadove/fun_telegram/core/repository/redis_repository"
"github.com/teadove/fun_telegram/core/service/analitics"
"github.com/teadove/fun_telegram/core/service/job"
Expand Down Expand Up @@ -43,29 +44,36 @@ func MustNewCombatContainer(ctx context.Context) Container {

locator := ip_locator.Supplier{}

dbRepository, err := mongo_repository.New()
shared.Check(ctx, err)

chRepository, err := ch_repository.New(ctx)
shared.Check(ctx, err)

dsSupplier, err := ds_supplier.New(ctx)
shared.Check(ctx, err)

resourceService, err := resource.New(ctx)
shared.Check(ctx, err)

analiticsService, err := analitics.New(dbRepository, chRepository, dsSupplier, resourceService)
db, err := pg.NewClientFromSettings()
if err != nil {
shared.FancyPanic(ctx, errors.Wrap(err, "failed to init pg client"))
}

dbRepository, err := db_repository.NewRepository(ctx, db)
if err != nil {
shared.FancyPanic(ctx, errors.Wrap(err, "failed to init pg repository"))
}

analiticsService, err := analitics.New(
dsSupplier,
resourceService,
dbRepository,
)
shared.Check(ctx, err)

protoClient, err := telegram.NewProtoClient(ctx)
shared.Check(ctx, err)

jobService, err := job.New(ctx, dbRepository, chRepository, map[string]job.ServiceChecker{
"MongoDB": {Checker: dbRepository.Ping, ForFrequent: true},
jobService, err := job.New(ctx, map[string]job.ServiceChecker{
"Telegram": {Checker: protoClient.Ping, ForFrequent: true},
"Redis": {Checker: persistentStorage.Ping, ForFrequent: true},
"ClickHouse": {Checker: chRepository.Ping, ForFrequent: true},
"Postgres": {Checker: dbRepository.Ping, ForFrequent: true},
"Kandinsky": {Checker: kandinskySupplier.Ping},
"IpLocator": {Checker: locator.Ping},
"DSSupplier": {Checker: dsSupplier.Ping},
Expand All @@ -78,10 +86,10 @@ func MustNewCombatContainer(ctx context.Context) Container {
persistentStorage,
kandinskySupplier,
&locator,
dbRepository,
analiticsService,
jobService,
resourceService,
dbRepository,
)

container := Container{telegramPresentation, jobService}
Expand Down
46 changes: 46 additions & 0 deletions core/infrastructure/pg/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pg

import (
"time"

"gorm.io/gorm/logger"
"gorm.io/gorm/schema"

"github.com/pkg/errors"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

func NewClientFromSettings() (*gorm.DB, error) {
pgConfig := postgres.Config{
DSN: "postgresql://main:main@localhost:5432/main",
PreferSimpleProtocol: true,
}

db, err := gorm.Open(postgres.New(pgConfig), &gorm.Config{
SkipDefaultTransaction: true,
TranslateError: true,
NowFunc: func() time.Time {
ti, _ := time.LoadLocation("utc")
return time.Now().In(ti)
},
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},

Logger: logger.Default.LogMode(logger.Warn),
})
if err != nil {
return nil, errors.Wrap(err, "failed to connect to database")
}

sqlDb, err := db.DB()
if err != nil {
return nil, errors.Wrap(err, "failed to get std db")
}

sqlDb.SetMaxIdleConns(10)
sqlDb.SetMaxOpenConns(30)

return db, nil
}
5 changes: 3 additions & 2 deletions core/presentation/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func captureInterrupt(ctx context.Context) {
Info().
Str("signal", sig.String()).
Msg("captured exit signal, exiting...")

pprof.StopCPUProfile()
os.Exit(0)
}
Expand All @@ -30,7 +31,7 @@ func captureInterrupt(ctx context.Context) {
func Run() {
ctx := shared.GetCtx()
captureInterrupt(ctx)
zerolog.Ctx(ctx).Info().Str("status", "app.starting").Send()
zerolog.Ctx(ctx).Info().Msg("app.starting")

combatContainer := container.MustNewCombatContainer(ctx)
go healthServer(combatContainer.JobService)
Expand All @@ -41,7 +42,7 @@ func Run() {
}
}()

zerolog.Ctx(ctx).Info().Str("status", "app.started").Send()
zerolog.Ctx(ctx).Info().Msg("app.started")

err := combatContainer.Presentation.Run()
shared.Check(ctx, err)
Expand Down
7 changes: 4 additions & 3 deletions core/presentation/telegram/disable_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"strings"

"github.com/teadove/fun_telegram/core/repository/db_repository"

"github.com/celestix/gotgproto/ext"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/teadove/fun_telegram/core/repository/mongo_repository"
"github.com/teadove/fun_telegram/core/repository/redis_repository"
)

Expand Down Expand Up @@ -57,8 +58,8 @@ func (r *Presentation) checkFromAdmin(ctx *ext.Context, update *ext.Update) (ok
return false, errors.New("user not found in members")
}

return userMember.Status == mongo_repository.Admin ||
userMember.Status == mongo_repository.Creator, nil
return userMember.Status == db_repository.Admin ||
userMember.Status == db_repository.Creator, nil
}

func (r *Presentation) checkFromOwner(ctx *ext.Context, update *ext.Update) (ok bool) {
Expand Down
53 changes: 28 additions & 25 deletions core/presentation/telegram/get_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,52 @@ import (
"strings"
"time"

"github.com/teadove/fun_telegram/core/repository/db_repository"

"github.com/celestix/gotgproto/types"
"github.com/gotd/td/telegram/peers/members"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/teadove/fun_telegram/core/repository/mongo_repository"
"go.mongodb.org/mongo-driver/mongo"
)

var ErrNotChatOrChannel = errors.New("is not chat or channel")

func tgStatusToRepositoryStatus(status members.Status) mongo_repository.MemberStatus {
func tgStatusToRepositoryStatus(status members.Status) db_repository.MemberStatus {
switch status {
case members.Left:
return mongo_repository.Left
return db_repository.Left
case members.Plain:
return mongo_repository.Plain
return db_repository.Plain
case members.Creator:
return mongo_repository.Creator
return db_repository.Creator
case members.Admin:
return mongo_repository.Admin
return db_repository.Admin
case members.Banned:
return mongo_repository.Banned
return db_repository.Banned
default:
return mongo_repository.Unknown
return db_repository.Unknown
}
}

func (r *Presentation) updateMembers(
ctx context.Context,
effectiveChat types.EffectiveChat,
) (mongo_repository.UsersInChat, error) {
) (db_repository.UsersInChat, error) {
t0 := time.Now()

zerolog.Ctx(ctx).Info().Str("status", "members.uploading").Send()
zerolog.Ctx(ctx).
Info().
Msg("members.uploading")

usersInChat := make(mongo_repository.UsersInChat, 0, 50)
usersInChat := make(db_repository.UsersInChat, 0, 50)

compileSlice := func(chatMember members.Member) error {
user := chatMember.User()
_, isBot := user.ToBot()
username, _ := user.Username()

userInChat := mongo_repository.UserInChat{
userInChat := db_repository.UserInChat{
TgId: user.ID(),
TgUsername: strings.ToLower(username),
TgName: GetNameFromPeerUser(&user),
Expand All @@ -56,7 +59,7 @@ func (r *Presentation) updateMembers(
}
usersInChat = append(usersInChat, userInChat)

err := r.mongoRepository.UserUpsert(ctx, &mongo_repository.User{
err := r.dbRepository.UserUpsert(ctx, &db_repository.User{
TgId: userInChat.TgId,
TgUsername: userInChat.TgUsername,
TgName: userInChat.TgName,
Expand All @@ -66,7 +69,7 @@ func (r *Presentation) updateMembers(
return errors.Wrap(err, "failed to upsert user")
}

err = r.mongoRepository.MemberUpsert(ctx, &mongo_repository.Member{
err = r.dbRepository.MemberUpsert(ctx, &db_repository.Member{
TgUserId: userInChat.TgId,
TgChatId: effectiveChat.GetID(),
Status: userInChat.Status,
Expand All @@ -77,9 +80,8 @@ func (r *Presentation) updateMembers(

zerolog.Ctx(ctx).
Debug().
Str("status", "member.uploaded").
Interface("user", userInChat).
Send()
Msg("member.uploaded")

return nil
}
Expand Down Expand Up @@ -111,36 +113,37 @@ func (r *Presentation) updateMembers(
return nil, errors.WithStack(ErrNotChatOrChannel)
}

err := r.mongoRepository.ChatUpsert(ctx, &mongo_repository.Chat{
TgId: effectiveChat.GetID(),
Title: chatTitle,
err := r.dbRepository.ChatUpsert(ctx, &db_repository.Chat{
WithCreatedAt: db_repository.WithCreatedAt{CreatedAt: time.Now().UTC()},
WithUpdatedAt: db_repository.WithUpdatedAt{UpdatedAt: time.Now().UTC()},
TgId: effectiveChat.GetID(),
Title: chatTitle,
})
if err != nil {
return nil, errors.Wrap(err, "failed to upsert chat in mongo repository")
}

err = r.mongoRepository.SetAllMembersAsLeft(ctx, effectiveChat.GetID(), t0.Add(-time.Hour))
err = r.dbRepository.MemberSetAsLeftBeforeTime(ctx, effectiveChat.GetID(), t0.Add(-time.Hour))
if err != nil {
return nil, errors.Wrap(err, "failed to set all members as left")
}

zerolog.Ctx(ctx).
Info().
Str("status", "members.uploaded").
Str("elapsed", time.Since(t0).String()).
Int("count", len(usersInChat)).
Send()
Msg("members.uploaded")

return usersInChat, nil
}

func (r *Presentation) getOrUpdateMembers(
ctx context.Context,
effectiveChat types.EffectiveChat,
) (mongo_repository.UsersInChat, error) {
) (db_repository.UsersInChat, error) {
needUpload := false

chat, err := r.mongoRepository.GetChat(ctx, effectiveChat.GetID())
chat, err := r.dbRepository.ChatSelectById(ctx, effectiveChat.GetID())
if err != nil {
if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, errors.Wrap(err, "failed to get chat from repository")
Expand All @@ -158,7 +161,7 @@ func (r *Presentation) getOrUpdateMembers(
return usersInChat, nil
}

usersInChat, err := r.mongoRepository.GetUsersInChat(ctx, effectiveChat.GetID())
usersInChat, err := r.dbRepository.UsersSelectInChat(ctx, effectiveChat.GetID())
if err != nil {
return nil, errors.Wrap(err, "failed to get users by chat id")
}
Expand Down
9 changes: 5 additions & 4 deletions core/presentation/telegram/kandinsky_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"strconv"
"time"

"github.com/teadove/fun_telegram/core/repository/db_repository"

"github.com/gotd/td/telegram/message"
"github.com/gotd/td/telegram/message/styling"
"github.com/gotd/td/tg"
"github.com/teadove/fun_telegram/core/repository/mongo_repository"
"golang.org/x/text/cases"
"golang.org/x/text/language"

Expand Down Expand Up @@ -60,15 +61,15 @@ func (r *Presentation) uploadKandinskyImage(
) bool {
err := r.analiticsService.KandinskyImageInsert(
ctx,
&mongo_repository.KandinskyImageDenormalized{
&db_repository.KandinskyImageDenormalized{
TgInputPhoto: tg.InputPhoto{
ID: tgPhoto.ID,
AccessHash: tgPhoto.AccessHash,
FileReference: tgPhoto.FileReference,
},
KandinskyInput: *kandinskyInput,
ImgContent: img,
Message: mongo_repository.Message{
Message: db_repository.Message{
TgChatID: tgChatId,
TgId: tgMsg.ID,
TgUserId: ctx.Self.ID,
Expand Down Expand Up @@ -301,7 +302,7 @@ func (r *Presentation) kandkinskyPaginateImagesCommandHandler(

images, err := r.analiticsService.KandinskyImagePaginate(
ctx,
&mongo_repository.KandinskyImagePaginateInput{
&db_repository.KandinskyImagePaginateInput{
TgChatId: update.EffectiveChat().GetID(),
Page: page,
PageSize: pageSize,
Expand Down
Loading

0 comments on commit 1708b1f

Please sign in to comment.