Skip to content

Commit

Permalink
sql/pgwire: send down parameter status updates
Browse files Browse the repository at this point in the history
Server parameters such as `application_name` require updates over
pgwire when they are charged. This was previously not done. This applies
to time zone changes as well.

In this PR, we introduce a listener-esque object on `sessionDataMutator`
that pgwire will add itself onto to send updates on these changes.

Coincidentally, this means a few logic tests changes because lib/pq will
encode this into the string.

Release note (sql change): This PR introduces a new pgwire update that
sends ParameterStatus messages when certain server parameters are
changed for the given session over pgwire.
  • Loading branch information
otan committed Nov 12, 2019
1 parent 7641335 commit 08c89c4
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 34 deletions.
3 changes: 0 additions & 3 deletions pkg/cmd/roachtest/pgjdbc_blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,9 +1270,6 @@ var pgjdbcBlackList20_1 = blacklist{
"org.postgresql.test.jdbc4.BlobTest.testGetBinaryStreamWithBoundaries": "26725",
"org.postgresql.test.jdbc4.BlobTest.testSetBlobWithStream": "26725",
"org.postgresql.test.jdbc4.BlobTest.testSetBlobWithStreamAndLength": "26725",
"org.postgresql.test.jdbc4.ClientInfoTest.testExplicitSetAppNameNotificationIsParsed": "40854",
"org.postgresql.test.jdbc4.ClientInfoTest.testSetAppName": "40854",
"org.postgresql.test.jdbc4.ClientInfoTest.testSetAppNameProps": "40854",
"org.postgresql.test.jdbc4.DatabaseMetaDataTest.testGetColumnsForAutoIncrement": "41870",
"org.postgresql.test.jdbc4.DatabaseMetaDataTest.testGetFunctionsWithBlankPatterns": "41872",
"org.postgresql.test.jdbc4.DatabaseMetaDataTest.testGetFunctionsWithSpecificTypes": "17511",
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ func (h ConnectionHandler) GetStatusParam(ctx context.Context, varName string) s
return defVal
}

// RegisterOnSessionDataChange adds a listener to execute when a change on the
// given key is made using the mutator object.
func (h ConnectionHandler) RegisterOnSessionDataChange(key string, f func(val string)) {
h.ex.dataMutator.RegisterOnSessionDataChange(key, f)
}

// ServeConn serves a client connection by reading commands from the stmtBuf
// embedded in the ConnHandler.
//
Expand Down Expand Up @@ -543,10 +549,10 @@ func (s *Server) newConnExecutor(
sdMutator.setCurTxnReadOnly = func(val bool) {
ex.state.readOnly = val
}
sdMutator.applicationNameChanged = func(newName string) {
sdMutator.RegisterOnSessionDataChange("application_name", func(newName string) {
ex.appStats = ex.server.sqlStats.getStatsForApplication(newName)
ex.applicationName.Store(newName)
}
})
// Initialize the session data from provided defaults. We need to do this early
// because other initializations below use the configured values.
if err := resetSessionVars(ctx, sdMutator); err != nil {
Expand Down
26 changes: 20 additions & 6 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1737,17 +1737,30 @@ type sessionDataMutator struct {
settings *cluster.Settings
// setCurTxnReadOnly is called when we execute SET transaction_read_only = ...
setCurTxnReadOnly func(val bool)
// applicationNamedChanged, if set, is called when the "application name"
// variable is updated.
applicationNameChanged func(newName string)
// onSessionDataChangeListeners stores all the observers to execute when
// session data is modified, keyed by the value to change on.
onSessionDataChangeListeners map[string][]func(val string)
}

// RegisterOnSessionDataChange adds a listener to execute when a change on the
// given key is made using the mutator object.
func (m *sessionDataMutator) RegisterOnSessionDataChange(key string, f func(val string)) {
if m.onSessionDataChangeListeners == nil {
m.onSessionDataChangeListeners = make(map[string][]func(val string))
}
m.onSessionDataChangeListeners[key] = append(m.onSessionDataChangeListeners[key], f)
}

func (m *sessionDataMutator) notifyOnDataChangeListeners(key string, val string) {
for _, f := range m.onSessionDataChangeListeners[key] {
f(val)
}
}

// SetApplicationName sets the application name.
func (m *sessionDataMutator) SetApplicationName(appName string) {
m.data.ApplicationName = appName
if m.applicationNameChanged != nil {
m.applicationNameChanged(appName)
}
m.notifyOnDataChangeListeners("application_name", appName)
}

func (m *sessionDataMutator) SetBytesEncodeFormat(val sessiondata.BytesEncodeFormat) {
Expand Down Expand Up @@ -1816,6 +1829,7 @@ func (m *sessionDataMutator) SetSearchPath(val sessiondata.SearchPath) {

func (m *sessionDataMutator) SetLocation(loc *time.Location) {
m.data.DataConversion.Location = loc
m.notifyOnDataChangeListeners("TimeZone", sessionDataTimeZoneFormat(loc))
}

func (m *sessionDataMutator) SetReadOnly(val bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/alter_column_type
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ rowid INT8 false unique_rowid() · {
query TTTT
SELECT * FROM t
----
some string short 0000-01-01 20:16:27 +0000 UTC 2018-05-23 22:16:27.658082 +0200 +0200
some string short 0000-01-01 20:16:27 +0000 UTC 2018-05-23 22:16:27.658082 +0200 CEST

statement ok
DROP TABLE t
Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/logictest/testdata/logic_test/datetime
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ SET TIME ZONE 'America/New_York'
query T
SELECT '2015-08-25 05:45:45-01:00'::timestamp::timestamptz
----
2015-08-25 05:45:45 -0400 -0400
2015-08-25 05:45:45 -0400 EDT

query T
SELECT '2015-08-25 05:45:45-01:00'::timestamptz::timestamp
Expand Down Expand Up @@ -1196,17 +1196,17 @@ SHOW TIME ZONE
query T
SELECT DATE '1999-01-01' + INTERVAL '4 minutes'
----
1999-01-01 00:04:00 +0000 UTC
1999-01-01 00:04:00 +0000 +0000

query T
SELECT INTERVAL '4 minutes' + DATE '1999-01-01'
----
1999-01-01 00:04:00 +0000 UTC
1999-01-01 00:04:00 +0000 +0000

query T
SELECT DATE '1999-01-01' - INTERVAL '4 minutes'
----
1998-12-31 23:56:00 +0000 UTC
1998-12-31 23:56:00 +0000 +0000

query B
SELECT DATE '1999-01-02' < TIMESTAMPTZ '1999-01-01'
Expand Down Expand Up @@ -1356,33 +1356,33 @@ SELECT date_trunc('month', ts) AS date_trunc_month_created_at FROM "topics";
query T
SELECT date_trunc('month', tstz) AS date_trunc_month_created_at FROM "topics";
----
2017-12-01 00:00:00 +0000 UTC
2017-12-01 00:00:00 +0000 +0000

query T
SELECT date_trunc('month', "date") AS date_trunc_month_created_at FROM "topics";
----
2017-12-01 00:00:00 +0000 UTC
2017-12-01 00:00:00 +0000 +0000

# Test date_trunc works when timestamp zone changes.
subtest regression_41663

query T
select date_trunc('day', '2011-01-01 22:30:00'::date);
----
2011-01-01 00:00:00 +0000 UTC
2011-01-01 00:00:00 +0000 +0000

query T
select date_trunc('day', '2011-01-01 22:30:00+01:00'::timestamptz);
----
2011-01-01 00:00:00 +0000 UTC
2011-01-01 00:00:00 +0000 +0000

statement ok
SET TIME ZONE 'Africa/Nairobi'

query T
select date_trunc('day', '2011-01-01 22:30:00'::date)
----
2011-01-01 00:00:00 +0300 +0300
2011-01-01 00:00:00 +0300 EAT

query T
select date_trunc('day', '2011-01-02 01:30:00'::timestamp)
Expand All @@ -1392,7 +1392,7 @@ select date_trunc('day', '2011-01-02 01:30:00'::timestamp)
query T
select date_trunc('day', '2011-01-01 22:30:00+01:00'::timestamptz)
----
2011-01-02 00:00:00 +0300 +0300
2011-01-02 00:00:00 +0300 EAT

statement ok
SET TIME ZONE -5
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/prepare
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ SET TIME ZONE 'EST';
query T
EXECUTE change_loc
----
2000-01-01 18:05:10.123 -0500 -0500
2000-01-01 18:05:10.123 -0500 EST

statement ok
SET TIME ZONE 'UTC';
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/timestamp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ SET TIME ZONE 'PST8PDT'
query TT
SELECT TIMESTAMP '2001-02-16 20:38:40' AT TIME ZONE 'MST', timezone(TIMESTAMP '2001-02-16 20:38:40', 'MST')
----
2001-02-16 19:38:40 -0800 -0800 2001-02-16 19:38:40 -0800 -0800
2001-02-16 19:38:40 -0800 PST 2001-02-16 19:38:40 -0800 PST

query TT
SELECT TIMESTAMP WITH TIME ZONE '2001-02-16 20:38:40-05' AT TIME ZONE 'MST', timezone(TIMESTAMP WITH TIME ZONE '2001-02-16 20:38:40-05', 'MST')
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,13 @@ func (c *conn) sendStatusParam(param, value string) error {
return c.msgBuilder.finishMsg(c.conn)
}

func (c *conn) bufferStatusParam(param, value string) error {
c.msgBuilder.initMsg(pgwirebase.ServerMsgParameterStatus)
c.msgBuilder.writeTerminatedString(param)
c.msgBuilder.writeTerminatedString(value)
return c.msgBuilder.finishMsg(&c.writerState.buf)
}

func (c *conn) sendInitialConnData(
ctx context.Context, sqlServer *sql.Server,
) (sql.ConnectionHandler, error) {
Expand All @@ -610,10 +617,17 @@ func (c *conn) sendInitialConnData(
// defaults with client-provided values.
// For details see: https://www.postgresql.org/docs/10/static/libpq-status.html
for _, param := range statusReportParams {
param := param
value := connHandler.GetStatusParam(ctx, param)
if err := c.sendStatusParam(param, value); err != nil {
return sql.ConnectionHandler{}, err
}
// `pgwire` also expects updates when these parameters change.
connHandler.RegisterOnSessionDataChange(param, func(val string) {
if err := c.bufferStatusParam(param, val); err != nil {
panic(fmt.Sprintf("unexpected error when trying to send status param update: %s", err.Error()))
}
})
}
// The two following status parameters have no equivalent session
// variable.
Expand Down
73 changes: 73 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/connection_params
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Change the application name.

send
Parse {"Query": "SET application_name = 'pgtest'"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParameterStatus","Name":"application_name","Value":"pgtest"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Change the time zone using an offset.
# TODO(#42404): postgres has a different output. This is what we have right now
# as code, but we need to dig into what/why we use this format in param status.
# postgres: {"Type":"ParameterStatus","Name":"TimeZone","Value":"\u003c+06\u003e-06"}

send
Parse {"Query": "SET TIME ZONE +6"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParameterStatus","Name":"TimeZone","Value":"6"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Parse {"Query": "SET TIME ZONE -11.5"}
Bind
Execute
Sync
----

# postgres: {"Type":"ParameterStatus","Name":"TimeZone","Value":"\u003c-11:30\u003e+11:30"}
until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParameterStatus","Name":"TimeZone","Value":"-11.5"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Change the time zone using a real string.

send
Parse {"Query": "SET TIME ZONE 'America/New_York'"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ParameterStatus","Name":"TimeZone","Value":"America/New_York"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
26 changes: 16 additions & 10 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,16 +702,7 @@ var varGen = map[string]sessionVar{
// See https://www.postgresql.org/docs/10/static/runtime-config-client.html#GUC-TIMEZONE
`timezone`: {
Get: func(evalCtx *extendedEvalContext) string {
// If the time zone is a "fixed offset" one, initialized from an offset
// and not a standard name, then we use a magic format in the Location's
// name. We attempt to parse that here and retrieve the original offset
// specified by the user.
locStr := evalCtx.SessionData.DataConversion.Location.String()
_, origRepr, parsed := timeutil.ParseFixedOffsetTimeZone(locStr)
if parsed {
return origRepr
}
return locStr
return sessionDataTimeZoneFormat(evalCtx.SessionData.DataConversion.Location)
},
GetStringVal: timeZoneVarGetStringVal,
Set: timeZoneVarSet,
Expand Down Expand Up @@ -868,6 +859,21 @@ func displayPgBool(val bool) func(_ *settings.Values) string {

var globalFalse = displayPgBool(false)

// sessionDataTimeZoneFormat returns the appropriate timezone format
// to output when the `timezone` is required output.
// If the time zone is a "fixed offset" one, initialized from an offset
// and not a standard name, then we use a magic format in the Location's
// name. We attempt to parse that here and retrieve the original offset
// specified by the user.
func sessionDataTimeZoneFormat(loc *time.Location) string {
locStr := loc.String()
_, origRepr, parsed := timeutil.ParseFixedOffsetTimeZone(locStr)
if parsed {
return origRepr
}
return locStr
}

func makeCompatBoolVar(varName string, displayValue, anyValAllowed bool) sessionVar {
displayValStr := formatBoolAsPostgresSetting(displayValue)
return sessionVar{
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/timeutil/time_zone_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TimeZoneStringToLocation(location string) (*time.Location, error) {
//
// The strings produced by FixedOffsetTimeZoneToLocation look like
// "<fixedOffsetPrefix><offset> (<origRepr>)".
// TODO(#42404): this is not the format given by the results in
// pgwire/testdata/connection_params.
func ParseFixedOffsetTimeZone(location string) (offset int, origRepr string, success bool) {
if !strings.HasPrefix(location, fixedOffsetPrefix) {
return 0, "", false
Expand Down

0 comments on commit 08c89c4

Please sign in to comment.