Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
sql: process list now shows client address
Browse files Browse the repository at this point in the history
Previously it was showing server address but should be the client:

https://dev.mysql.com/doc/refman/8.0/en/processlist-table.html

Also merged client user and address as Client struct in session.

Signed-off-by: Javi Fontan <jfontan@gmail.com>
  • Loading branch information
jfontan committed Oct 31, 2018
1 parent 4a751c0 commit 2eaa2da
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 24 deletions.
2 changes: 1 addition & 1 deletion auth/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func testAuthorization(
t.Run(fmt.Sprintf("%s-%s", c.user, c.query), func(t *testing.T) {
req := require.New(t)

session := sql.NewSession("localhost", c.user, uint32(i))
session := sql.NewSession("localhost", "client", c.user, uint32(i))
ctx := sql.NewContext(context.TODO(),
sql.WithSession(session),
sql.WithPid(uint64(i)))
Expand Down
2 changes: 1 addition & 1 deletion engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@ func insertRows(t *testing.T, table sql.Inserter, rows ...sql.Row) {
var pid uint64

func newCtx() *sql.Context {
session := sql.NewSession("address", "user", 1)
session := sql.NewSession("address", "client", "user", 1)
return sql.NewContext(
context.Background(),
sql.WithPid(atomic.AddUint64(&pid, 1)),
Expand Down
3 changes: 2 additions & 1 deletion server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type DoneFunc func()

// DefaultSessionBuilder is a SessionBuilder that returns a base session.
func DefaultSessionBuilder(c *mysql.Conn, addr string) sql.Session {
return sql.NewSession(addr, c.User, c.ConnectionID)
client := c.RemoteAddr().String()
return sql.NewSession(addr, client, c.User, c.ConnectionID)
}

// SessionManager is in charge of creating new sessions for the given
Expand Down
9 changes: 8 additions & 1 deletion server/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ func setupMemDB(require *require.Assertions) *sqle.Engine {
}

func TestHandlerOutput(t *testing.T) {
// This session builder is used as dummy mysql Conn is not complete and
// causes panic when accessing remote address.
testSessionBuilder := func(c *mysql.Conn, addr string) sql.Session {
client := "127.0.0.1:34567"
return sql.NewSession(addr, client, c.User, c.ConnectionID)
}

e := setupMemDB(require.New(t))
dummyConn := &mysql.Conn{ConnectionID: 1}
handler := NewHandler(e, NewSessionManager(DefaultSessionBuilder, opentracing.NoopTracer{}, "foo"))
handler := NewHandler(e, NewSessionManager(testSessionBuilder, opentracing.NoopTracer{}, "foo"))
handler.NewConnection(dummyConn)

type exptectedValues struct {
Expand Down
2 changes: 1 addition & 1 deletion sql/analyzer/check_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func CheckAuthorization(au auth.Auth) RuleFunc {
perm = auth.ReadPerm
}

err := au.Allowed(ctx.User(), perm)
err := au.Allowed(ctx.Client().User, perm)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion sql/expression/function/connection_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestConnectionID(t *testing.T) {
require := require.New(t)

session := sql.NewSession("", "", 2)
session := sql.NewSession("", "", "", 2)
ctx := sql.NewContext(context.Background(), sql.WithSession(session))

f := NewConnectionID()
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (p *ShowProcessList) RowIter(ctx *sql.Context) (sql.RowIter, error) {
time: int64(proc.Seconds()),
state: strings.Join(status, ", "),
command: proc.Type.String(),
host: ctx.Session.Address(),
host: ctx.Session.Client().Address,
info: proc.Query,
db: p.Database,
}.toRow()
Expand Down
8 changes: 5 additions & 3 deletions sql/plan/processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
func TestShowProcessList(t *testing.T) {
require := require.New(t)

addr := "127.0.0.1:34567"

n := NewShowProcessList()
p := sql.NewProcessList()
sess := sql.NewSession("0.0.0.0:1234", "foo", 1)
sess := sql.NewSession("0.0.0.0:3306", addr, "foo", 1)
ctx := sql.NewContext(context.Background(), sql.WithPid(1), sql.WithSession(sess))

ctx, err := p.AddProcess(ctx, sql.QueryProcess, "SELECT foo")
Expand Down Expand Up @@ -42,8 +44,8 @@ func TestShowProcessList(t *testing.T) {
require.NoError(err)

expected := []sql.Row{
{int64(1), "foo", "0.0.0.0:1234", "foo", "query", int64(0), "a(4/5), b(2/6)", "SELECT foo"},
{int64(2), "foo", "0.0.0.0:1234", "foo", "create_index", int64(0), "foo(1/2)", "SELECT bar"},
{int64(1), "foo", addr, "foo", "query", int64(0), "a(4/5), b(2/6)", "SELECT foo"},
{int64(2), "foo", addr, "foo", "create_index", int64(0), "foo(1/2)", "SELECT bar"},
}

require.ElementsMatch(expected, rows)
Expand Down
4 changes: 2 additions & 2 deletions sql/processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (pl *ProcessList) AddProcess(
Type: typ,
Query: query,
Progress: make(map[string]Progress),
User: ctx.Session.User(),
User: ctx.Session.Client().User,
StartedAt: time.Now(),
Kill: cancel,
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) {
p.Progress[name] = progress
}

// AddProgressItem adds a new item to track progress from to the proces with
// AddProgressItem adds a new item to track progress from to the process with
// the given pid. If the pid does not exist, it will do nothing.
func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
pl.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion sql/processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestProcessList(t *testing.T) {
require := require.New(t)

p := NewProcessList()
sess := NewSession("0.0.0.0:1234", "foo", 1)
sess := NewSession("0.0.0.0:3306", "127.0.0.1:34567", "foo", 1)
ctx := NewContext(context.Background(), WithPid(1), WithSession(sess))
ctx, err := p.AddProcess(ctx, QueryProcess, "SELECT foo")
require.NoError(err)
Expand Down
29 changes: 20 additions & 9 deletions sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ const (
QueryKey key = iota
)

// Client holds session user information.
type Client struct {
// User of the session.
User string
// Address of the client.
Address string
}

// Session holds the session data.
type Session interface {
// Address of the server.
Address() string
// User of the session.
User() string
Client() Client
// Set session configuration.
Set(key string, typ Type, value interface{})
// Get session configuration.
Expand All @@ -47,18 +55,18 @@ type Session interface {
type BaseSession struct {
id uint32
addr string
user string
client Client
mu sync.RWMutex
config map[string]TypedValue
warnings []*Warning
}

// User returns the current user of the session.
func (s *BaseSession) User() string { return s.user }

// Address returns the server address.
func (s *BaseSession) Address() string { return s.addr }

// User returns session's client information.
func (s *BaseSession) Client() Client { return s.client }

// Set implements the Session interface.
func (s *BaseSession) Set(key string, typ Type, value interface{}) {
s.mu.Lock()
Expand Down Expand Up @@ -171,11 +179,14 @@ func HasDefaultValue(s Session, key string) (bool, interface{}) {
}

// NewSession creates a new session with data.
func NewSession(address string, user string, id uint32) Session {
func NewSession(server, client, user string, id uint32) Session {
return &BaseSession{
id: id,
addr: address,
user: user,
id: id,
addr: server,
client: Client{
Address: client,
User: user,
},
config: DefaultSessionConfig(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions sql/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestSessionConfig(t *testing.T) {
require := require.New(t)

sess := NewSession("foo", "bar", 1)
sess := NewSession("foo", "baz", "bar", 1)
typ, v := sess.Get("foo")
require.Equal(Null, typ)
require.Equal(nil, v)
Expand All @@ -37,7 +37,7 @@ func TestSessionConfig(t *testing.T) {

func TestHasDefaultValue(t *testing.T) {
require := require.New(t)
sess := NewSession("foo", "bar", 1)
sess := NewSession("foo", "baz", "bar", 1)

for key := range DefaultSessionConfig() {
require.True(HasDefaultValue(sess, key))
Expand Down

0 comments on commit 2eaa2da

Please sign in to comment.