Skip to content

Commit

Permalink
add start/stop client feature
Browse files Browse the repository at this point in the history
  • Loading branch information
celestix committed May 21, 2023
1 parent 8489c17 commit 2efc808
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 65 deletions.
8 changes: 6 additions & 2 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
- `ext.Update.EffectiveMessage` is of type `*types.Message`
- Added a new optional field in ClientOpts, named `AutoFetchReply` (setting this field to true will automatically cast ReplyToMessage field)
- Save peers of logged in user in session, while logging in.
- Add `client.ExportSessionString()` and `client.CreateContext()` methods to `gotgproto.Client`.
- Added `client.ExportSessionString()`, `client.RefreshContext(ctx)` and `client.CreateContext()` methods to `gotgproto.Client`.
- Remove an unintentional display of session data in `Stdout`.
- Added `SystemLangCode` and `ClientLangCode` optional fields to `gotgproto.Client`.
- Moved helper methods errors to `errors` package (gotgproto/errors)
- Moved helper methods errors to `errors` package (gotgproto/errors)
- Added `gotgproto.Client.Stop()` to cancel the running context and stop the client.
- Added `dispatcher.StopClient` handler error, which if returned through a handler callback will result in stopping the client.
- Added `gotgproto.Client.Start()` to login and connect to telegram (It's already called by gotgproto.NewClient so no need to call it again. however, it should be used to re-establish a connection once it's closed via `gotgproto.Client.Stop()`)
- Fixed session database initialisation happening twice per login.
171 changes: 121 additions & 50 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"context"
"fmt"
"runtime"
"sync"

"github.com/anonyindian/gotgproto/dispatcher"
intErrors "github.com/anonyindian/gotgproto/errors"
"github.com/anonyindian/gotgproto/ext"
"github.com/anonyindian/gotgproto/functions"
"github.com/anonyindian/gotgproto/sessionMaker"
Expand Down Expand Up @@ -41,19 +43,29 @@ type Client struct {
Resolver dcs.Resolver
// Whether to show the copyright line in console or no.
DisableCopyright bool
// Logger is instance of zap.Logger. No logs by default.
Logger *zap.Logger

// Session info of the authenticated user, use sessionMaker.NewSession function to fill this field.
Session *sessionMaker.SessionName
sessionStorage session.Storage

// Self contains details of logged in user in the form of *tg.User.
Self *tg.User

// Code for the language used on the device's OS, ISO 639-1 standard.
SystemLangCode string
// Code for the language used on the client, ISO 639-1 standard.
ClientLangCode string

clientType ClientType
ctx context.Context
err error
started chan int
autoFetchReply bool

cancel context.CancelFunc
running bool
*telegram.Client
appId int
apiHash string
}

// Type of client to login to, can be of 2 types:
Expand Down Expand Up @@ -95,6 +107,7 @@ type ClientOpts struct {
ClientLangCode string
}

// NewClient creates a new gotgproto client and logs in to telegram.
func NewClient(appId int, apiHash string, cType ClientType, opts *ClientOpts) (*Client, error) {
if opts == nil {
opts = &ClientOpts{
Expand All @@ -115,45 +128,60 @@ func NewClient(appId int, apiHash string, cType ClientType, opts *ClientOpts) (*

d := dispatcher.NewNativeDispatcher(opts.AutoFetchReply)

client := telegram.NewClient(appId, apiHash, telegram.Options{
DCList: opts.DCList,
UpdateHandler: d,
SessionStorage: sessionStorage,
Logger: opts.Logger,
Device: telegram.DeviceConfig{
DeviceModel: "GoTGProto",
SystemVersion: runtime.GOOS,
AppVersion: VERSION,
SystemLangCode: opts.SystemLangCode,
LangCode: opts.ClientLangCode,
},
})

ctx := context.Background()

c := &Client{
// client := telegram.NewClient(appId, apiHash, telegram.Options{
// DCList: opts.DCList,
// UpdateHandler: d,
// SessionStorage: sessionStorage,
// Logger: opts.Logger,
// Device: telegram.DeviceConfig{
// DeviceModel: "GoTGProto",
// SystemVersion: runtime.GOOS,
// AppVersion: VERSION,
// SystemLangCode: opts.SystemLangCode,
// LangCode: opts.ClientLangCode,
// },
// })

ctx, cancel := context.WithCancel(context.Background())

c := Client{
Resolver: opts.Resolver,
PublicKeys: opts.PublicKeys,
DC: opts.DC,
DCList: opts.DCList,
DisableCopyright: opts.DisableCopyright,
Session: opts.Session,
Logger: opts.Logger,
SystemLangCode: opts.SystemLangCode,
ClientLangCode: opts.ClientLangCode,
Dispatcher: d,
Client: client,
sessionStorage: sessionStorage,
clientType: cType,
ctx: ctx,
started: make(chan int),
autoFetchReply: opts.AutoFetchReply,
cancel: cancel,
appId: appId,
apiHash: apiHash,
}

c.printCredit()

go func(c *Client) {
c.err = client.Run(ctx, c.initialize)
}(c)
// wait till client starts
<-c.started
return c, nil
return &c, c.Start()
}

func (c *Client) initTelegramClient() {
c.Client = telegram.NewClient(c.appId, c.apiHash, telegram.Options{
DCList: c.DCList,
UpdateHandler: c.Dispatcher,
SessionStorage: c.sessionStorage,
Logger: c.Logger,
Device: telegram.DeviceConfig{
DeviceModel: "GoTGProto",
SystemVersion: runtime.GOOS,
AppVersion: VERSION,
SystemLangCode: c.SystemLangCode,
LangCode: c.ClientLangCode,
},
})
}

func (c *Client) login() error {
Expand Down Expand Up @@ -192,37 +220,39 @@ Licensed under the terms of GNU General Public License v3
}
}

func (c *Client) initialize(ctx context.Context) error {
err := c.login()
if err != nil {
return err
}

self, err := c.Client.Self(ctx)
if err != nil {
return err
}
c.Self = self

c.Dispatcher.Initialize(ctx, c.Client, self)
func (c *Client) initialize(wg *sync.WaitGroup) func(ctx context.Context) error {
return func(ctx context.Context) error {
err := c.login()
if err != nil {
return err
}
self, err := c.Client.Self(ctx)
if err != nil {
return err
}

if c.Session.GetName() == "" {
storage.Load("new.session", false)
}
c.Self = self

storage.AddPeer(self.ID, self.AccessHash, storage.TypeUser, self.Username)
c.Dispatcher.Initialize(ctx, c.Stop, c.Client, self)

// notify channel that client is up
close(c.started)
storage.AddPeer(self.ID, self.AccessHash, storage.TypeUser, self.Username)

<-c.ctx.Done()
return c.ctx.Err()
// notify channel that client is up
wg.Done()
c.running = true
<-c.ctx.Done()
return c.ctx.Err()
}
}

// EncodeSessionToString encodes the client session to a string in base64.
//
// Note: You must not share this string with anyone, it contains auth details for your logged in account.
func (c *Client) ExportStringSession() (string, error) {
return functions.EncodeSessionToString(storage.GetSession())
}

// Idle keeps the current goroutined blocked until the client is stopped.
func (c *Client) Idle() error {
<-c.ctx.Done()
return c.err
Expand All @@ -244,3 +274,44 @@ func (c *Client) CreateContext() *ext.Context {
c.autoFetchReply,
)
}

// Stop cancels the context.Context being used for the client
// and stops it.
//
// Notes:
//
// 1.) Client.Idle() will exit if this method is called.
//
// 2.) You can call Client.Start() to start the client again
// if it was stopped using this method.
func (c *Client) Stop() {
c.cancel()
c.running = false
}

// Start connects the client to telegram servers and logins.
// It will return error if the client is already running.
func (c *Client) Start() error {
if c.running {
return intErrors.ErrClientAlreadyRunning
}
if c.ctx.Err() == context.Canceled {
c.ctx, c.cancel = context.WithCancel(context.Background())
}
c.initTelegramClient()
wg := sync.WaitGroup{}
wg.Add(1)
go func(c *Client) {
c.err = c.Run(c.ctx, c.initialize(&wg))
}(c)
// wait till client starts
wg.Wait()
return c.err
}

// RefreshContext casts the new context.Context and telegram session
// to ext.Context (It may be used after doing Stop and Start calls respectively.)
func (c *Client) RefreshContext(ctx *ext.Context) {
(*ctx).Context = c.ctx
(*ctx).Raw = c.API()
}
18 changes: 13 additions & 5 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@ import (
)

var (
// EndGroups stops iterating over handlers groups if return through handler callback function.
// StopClient cancels the context and stops the client if returned through handler callback function.
StopClient = errors.New("disconnect")

// EndGroups stops iterating over handlers groups if returned through handler callback function.
EndGroups = errors.New("stopped")
// ContinueGroups continues iterating over handlers groups if return through handler callback function.
// ContinueGroups continues iterating over handlers groups if returned through handler callback function.
ContinueGroups = errors.New("continued")
// SkipCurrentGroup skips current group and continues iterating over handlers groups if return through handler callback function.
// SkipCurrentGroup skips current group and continues iterating over handlers groups if returned through handler callback function.
SkipCurrentGroup = errors.New("skipped")
)

type Dispatcher interface {
Initialize(context.Context, *telegram.Client, *tg.User)
Initialize(context.Context, context.CancelFunc, *telegram.Client, *tg.User)
Handle(context.Context, tg.UpdatesClass) error
AddHandler(Handler)
AddHandlerToGroup(Handler, int)
}

type NativeDispatcher struct {
cancel context.CancelFunc
client *tg.Client
self *tg.User
sender *message.Sender
Expand Down Expand Up @@ -66,10 +70,11 @@ func (u *entities) short() {
u.Channels = make(map[int64]*tg.Channel, 0)
}

func (dp *NativeDispatcher) Initialize(ctx context.Context, client *telegram.Client, self *tg.User) {
func (dp *NativeDispatcher) Initialize(ctx context.Context, cancel context.CancelFunc, client *telegram.Client, self *tg.User) {
dp.client = client.API()
dp.sender = message.NewSender(dp.client)
dp.self = self
dp.cancel = cancel
}

// Handle function handles all the incoming updates, map entities and dispatches updates for further handling.
Expand Down Expand Up @@ -148,6 +153,9 @@ func (dp *NativeDispatcher) handleUpdate(ctx context.Context, e tg.Entities, upd
return err
} else if errors.Is(err, SkipCurrentGroup) {
break
} else if errors.Is(err, StopClient) {
dp.cancel()
return nil
} else {
err = dp.Error(c, u, err.Error())
switch err {
Expand Down
4 changes: 4 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package errors

import "errors"

var (
ErrClientAlreadyRunning = errors.New("client is already running")
)

var (
ErrPeerNotFound = errors.New("peer not found")
ErrNotChat = errors.New("not chat")
Expand Down
27 changes: 19 additions & 8 deletions sessionMaker/sessionName.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type SessionName struct {
name string
sessionType SessionType
data []byte
err error
}

// SessionType is the type of session you want to log in through.
Expand All @@ -32,19 +34,15 @@ const (

// NewSession creates a new session with provided name string and SessionType.
func NewSession(sessionName string, sessionType SessionType) *SessionName {
return &SessionName{
s := SessionName{
name: sessionName,
sessionType: sessionType,
}
s.data, s.err = s.load()
return &s
}

// GetName is used for retrieving name of the session.
func (s *SessionName) GetName() string {
return s.name
}

// GetData is used for retrieving session data through provided SessionName type.
func (s *SessionName) GetData() ([]byte, error) {
func (s *SessionName) load() ([]byte, error) {
switch s.sessionType {
case PyrogramSession:
storage.Load("pyrogram.session", false)
Expand Down Expand Up @@ -81,8 +79,21 @@ func (s *SessionName) GetData() ([]byte, error) {
// })
return sd.Data, err
default:
if s.name == "" {
s.name = "new"
}
storage.Load(fmt.Sprintf("%s.session", s.name), false)
sFD := storage.GetSession()
return sFD.Data, nil
}
}

// GetName is used for retrieving name of the session.
func (s *SessionName) GetName() string {
return s.name
}

// GetData is used for retrieving session data through provided SessionName type.
func (s *SessionName) GetData() ([]byte, error) {
return s.data, s.err
}

0 comments on commit 2efc808

Please sign in to comment.