From 2f6e0b0a396c689360e12db6f5aaa1b0df4f0884 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 10 Mar 2022 16:15:56 -0500 Subject: [PATCH 1/2] Cleanup some SQL variables --- core/hooks/commit_field_changes.go | 40 ++++++------------ core/hooks/commit_language_changes.go | 19 +++------ core/hooks/commit_name_changes.go | 19 +++------ core/models/campaigns.go | 58 +++++++++------------------ core/models/resthooks.go | 32 +++++---------- core/models/webhook_event.go | 11 +++-- testsuite/testsuite.go | 4 +- 7 files changed, 62 insertions(+), 121 deletions(-) diff --git a/core/hooks/commit_field_changes.go b/core/hooks/commit_field_changes.go index 8bb345fef..22cbea049 100644 --- a/core/hooks/commit_field_changes.go +++ b/core/hooks/commit_field_changes.go @@ -68,7 +68,7 @@ func (h *commitFieldChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, // first apply our deletes // in pg9.6 we need to do this as one query per field type, in pg10 we can rewrite this to be a single query for _, fds := range fieldDeletes { - err := models.BulkQuery(ctx, "deleting contact field values", tx, deleteContactFieldsSQL, fds) + err := models.BulkQuery(ctx, "deleting contact field values", tx, sqlDeleteContactFields, fds) if err != nil { return errors.Wrapf(err, "error deleting contact fields") } @@ -76,7 +76,7 @@ func (h *commitFieldChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, // then our updates if len(fieldUpdates) > 0 { - err := models.BulkQuery(ctx, "updating contact field values", tx, updateContactFieldsSQL, fieldUpdates) + err := models.BulkQuery(ctx, "updating contact field values", tx, sqpUpdateContactFields, fieldUpdates) if err != nil { return errors.Wrapf(err, "error updating contact fields") } @@ -99,28 +99,14 @@ type FieldValue struct { Text string `json:"text"` } -const updateContactFieldsSQL = ` -UPDATE - contacts_contact c -SET - fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb -FROM ( - VALUES(:contact_id, :updates) -) AS - r(contact_id, updates) -WHERE - c.id = r.contact_id::int -` - -const deleteContactFieldsSQL = ` -UPDATE - contacts_contact c -SET - fields = fields - r.field_uuid -FROM ( - VALUES(:contact_id, :field_uuid) -) AS - r(contact_id, field_uuid) -WHERE - c.id = r.contact_id::int -` +const sqpUpdateContactFields = ` +UPDATE contacts_contact c + SET fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb + FROM (VALUES(:contact_id, :updates)) AS r(contact_id, updates) + WHERE c.id = r.contact_id::int` + +const sqlDeleteContactFields = ` +UPDATE contacts_contact c + SET fields = fields - r.field_uuid + FROM (VALUES(:contact_id, :field_uuid)) AS r(contact_id, field_uuid) + WHERE c.id = r.contact_id::int` diff --git a/core/hooks/commit_language_changes.go b/core/hooks/commit_language_changes.go index bfd43b263..d73aead30 100644 --- a/core/hooks/commit_language_changes.go +++ b/core/hooks/commit_language_changes.go @@ -26,7 +26,7 @@ func (h *commitLanguageChangesHook) Apply(ctx context.Context, rt *runtime.Runti } // do our update - return models.BulkQuery(ctx, "updating contact language", tx, updateContactLanguageSQL, updates) + return models.BulkQuery(ctx, "updating contact language", tx, sqlUpdateContactLanguage, updates) } // struct used for our bulk update @@ -35,15 +35,8 @@ type languageUpdate struct { Language null.String `db:"language"` } -const updateContactLanguageSQL = ` - UPDATE - contacts_contact c - SET - language = r.language - FROM ( - VALUES(:id, :language) - ) AS - r(id, language) - WHERE - c.id = r.id::int -` +const sqlUpdateContactLanguage = ` +UPDATE contacts_contact c + SET language = r.language + FROM (VALUES(:id, :language)) AS r(id, language) + WHERE c.id = r.id::int` diff --git a/core/hooks/commit_name_changes.go b/core/hooks/commit_name_changes.go index d627f8c92..2804bf6b2 100644 --- a/core/hooks/commit_name_changes.go +++ b/core/hooks/commit_name_changes.go @@ -27,7 +27,7 @@ func (h *commitNameChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, } // do our update - return models.BulkQuery(ctx, "updating contact name", tx, updateContactNameSQL, updates) + return models.BulkQuery(ctx, "updating contact name", tx, sqlUpdateContactName, updates) } // struct used for our bulk insert @@ -36,15 +36,8 @@ type nameUpdate struct { Name null.String `db:"name"` } -const updateContactNameSQL = ` - UPDATE - contacts_contact c - SET - name = r.name - FROM ( - VALUES(:id, :name) - ) AS - r(id, name) - WHERE - c.id = r.id::int -` +const sqlUpdateContactName = ` +UPDATE contacts_contact c + SET name = r.name + FROM (VALUES(:id, :name)) AS r(id, name) + WHERE c.id = r.id::int` diff --git a/core/models/campaigns.go b/core/models/campaigns.go index 117bf912c..0e6e9a4a3 100644 --- a/core/models/campaigns.go +++ b/core/models/campaigns.go @@ -626,41 +626,23 @@ type eligibleContact struct { RelToValue *time.Time `db:"rel_to_value"` } -const eligibleContactsForCreatedOnSQL = ` -SELECT - c.id AS contact_id, - c.created_on AS rel_to_value -FROM - contacts_contact c -INNER JOIN - contacts_contactgroup_contacts gc ON gc.contact_id = c.id -WHERE - gc.contactgroup_id = $1 AND c.is_active = TRUE -` - -const eligibleContactsForLastSeenOnSQL = ` -SELECT - c.id AS contact_id, - c.last_seen_on AS rel_to_value -FROM - contacts_contact c -INNER JOIN - contacts_contactgroup_contacts gc ON gc.contact_id = c.id -WHERE - gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL -` - -const eligibleContactsForFieldSQL = ` -SELECT - c.id AS contact_id, - (c.fields->$2->>'datetime')::timestamptz AS rel_to_value -FROM - contacts_contact c -INNER JOIN - contacts_contactgroup_contacts gc ON gc.contact_id = c.id -WHERE - gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL -` +const sqlEligibleContactsForCreatedOn = ` + SELECT c.id AS contact_id, c.created_on AS rel_to_value + FROM contacts_contact c +INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id + WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE` + +const sqlEligibleContactsForLastSeenOn = ` + SELECT c.id AS contact_id, c.last_seen_on AS rel_to_value + FROM contacts_contact c +INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id + WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL` + +const sqlEligibleContactsForField = ` + SELECT c.id AS contact_id, (c.fields->$2->>'datetime')::timestamptz AS rel_to_value + FROM contacts_contact c +INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id + WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL` func campaignEventEligibleContacts(ctx context.Context, db Queryer, groupID GroupID, field *Field) ([]*eligibleContact, error) { var query string @@ -668,13 +650,13 @@ func campaignEventEligibleContacts(ctx context.Context, db Queryer, groupID Grou switch field.Key() { case CreatedOnKey: - query = eligibleContactsForCreatedOnSQL + query = sqlEligibleContactsForCreatedOn params = []interface{}{groupID} case LastSeenOnKey: - query = eligibleContactsForLastSeenOnSQL + query = sqlEligibleContactsForLastSeenOn params = []interface{}{groupID} default: - query = eligibleContactsForFieldSQL + query = sqlEligibleContactsForField params = []interface{}{groupID, field.UUID()} } diff --git a/core/models/resthooks.go b/core/models/resthooks.go index 970c05a76..377b756e6 100644 --- a/core/models/resthooks.go +++ b/core/models/resthooks.go @@ -91,7 +91,7 @@ func UnsubscribeResthooks(ctx context.Context, tx *sqlx.Tx, unsubs []*ResthookUn is[i] = unsubs[i] } - err := BulkQuery(ctx, "unsubscribing resthooks", tx, unsubscribeResthooksSQL, is) + err := BulkQuery(ctx, "unsubscribing resthooks", tx, sqlUnsubscribeResthooks, is) if err != nil { return errors.Wrapf(err, "error unsubscribing from resthooks") } @@ -105,24 +105,12 @@ type ResthookUnsubscribe struct { URL string `db:"url"` } -const unsubscribeResthooksSQL = ` -UPDATE - api_resthooksubscriber -SET - is_active = FALSE, - modified_on = NOW() -WHERE - id = ANY( - SELECT - s.id - FROM - api_resthooksubscriber s - JOIN api_resthook r ON s.resthook_id = r.id, - (VALUES(:org_id, :slug, :url)) AS u(org_id, slug, url) - WHERE - s.is_active = TRUE AND - r.org_id = u.org_id::int AND - r.slug = u.slug AND - s.target_url = u.url - ) -` +const sqlUnsubscribeResthooks = ` +UPDATE api_resthooksubscriber + SET is_active = FALSE, modified_on = NOW() + WHERE id = ANY( + SELECT s.id + FROM api_resthooksubscriber s + JOIN api_resthook r ON s.resthook_id = r.id, (VALUES(:org_id, :slug, :url)) AS u(org_id, slug, url) + WHERE s.is_active = TRUE AND r.org_id = u.org_id::int AND r.slug = u.slug AND s.target_url = u.url +)` diff --git a/core/models/webhook_event.go b/core/models/webhook_event.go index e11ded7ad..7fa776c56 100644 --- a/core/models/webhook_event.go +++ b/core/models/webhook_event.go @@ -33,11 +33,10 @@ func NewWebhookEvent(orgID OrgID, resthookID ResthookID, data string, createdOn return event } -const insertWebhookEventsSQL = ` -INSERT INTO api_webhookevent(data, resthook_id, org_id, created_on, action) - VALUES(:data, :resthook_id, :org_id, :created_on, 'POST') -RETURNING id -` +const sqlInsertWebhookEvents = ` +INSERT INTO api_webhookevent(data, resthook_id, org_id, created_on, action) + VALUES (:data, :resthook_id, :org_id, :created_on, 'POST') + RETURNING id` // InsertWebhookEvents inserts the passed in webhook events, assigning them ids func InsertWebhookEvents(ctx context.Context, db Queryer, events []*WebhookEvent) error { @@ -50,5 +49,5 @@ func InsertWebhookEvents(ctx context.Context, db Queryer, events []*WebhookEvent is[i] = &events[i].e } - return BulkQuery(ctx, "inserted webhook events", db, insertWebhookEventsSQL, is) + return BulkQuery(ctx, "inserted webhook events", db, sqlInsertWebhookEvents, is) } diff --git a/testsuite/testsuite.go b/testsuite/testsuite.go index 18d7f9053..b68055c84 100644 --- a/testsuite/testsuite.go +++ b/testsuite/testsuite.go @@ -178,7 +178,7 @@ func resetStorage() { must(os.RemoveAll(SessionStorageDir)) } -var resetDataSQL = ` +var sqlResetTestData = ` UPDATE contacts_contact SET current_flow_id = NULL; DELETE FROM notifications_notification; @@ -226,7 +226,7 @@ ALTER SEQUENCE campaigns_campaignevent_id_seq RESTART WITH 30000;` // undo changes made to the contact data in the test database dump. func resetData() { db := getDB() - db.MustExec(resetDataSQL) + db.MustExec(sqlResetTestData) // because groups have changed models.FlushCache() From 58c901a638afb850a1042f096f501da4aaaac74f Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 10 Mar 2022 16:46:56 -0500 Subject: [PATCH 2/2] More cleanup --- core/hooks/commit_field_changes.go | 4 +-- core/models/channel_connection.go | 13 +++------- core/models/channel_event.go | 13 ++++------ core/models/channel_logs.go | 15 +++++------- core/models/imports.go | 39 +++++++++--------------------- core/models/incident.go | 18 ++++++++------ core/models/webhook_event.go | 4 +-- 7 files changed, 41 insertions(+), 65 deletions(-) diff --git a/core/hooks/commit_field_changes.go b/core/hooks/commit_field_changes.go index 22cbea049..732345c14 100644 --- a/core/hooks/commit_field_changes.go +++ b/core/hooks/commit_field_changes.go @@ -76,7 +76,7 @@ func (h *commitFieldChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, // then our updates if len(fieldUpdates) > 0 { - err := models.BulkQuery(ctx, "updating contact field values", tx, sqpUpdateContactFields, fieldUpdates) + err := models.BulkQuery(ctx, "updating contact field values", tx, sqlUpdateContactFields, fieldUpdates) if err != nil { return errors.Wrapf(err, "error updating contact fields") } @@ -99,7 +99,7 @@ type FieldValue struct { Text string `json:"text"` } -const sqpUpdateContactFields = ` +const sqlUpdateContactFields = ` UPDATE contacts_contact c SET fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb FROM (VALUES(:contact_id, :updates)) AS r(contact_id, updates) diff --git a/core/models/channel_connection.go b/core/models/channel_connection.go index d02beb1f2..2d6d97037 100644 --- a/core/models/channel_connection.go +++ b/core/models/channel_connection.go @@ -105,9 +105,8 @@ func (c *ChannelConnection) ErrorReason() ConnectionError { return ConnectionErr func (c *ChannelConnection) ErrorCount() int { return c.c.ErrorCount } func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt } -const insertConnectionSQL = ` -INSERT INTO - channels_channelconnection +const sqlInsertConnection = ` +INSERT INTO channels_channelconnection ( created_on, modified_on, @@ -122,7 +121,6 @@ INSERT INTO contact_urn_id, error_count ) - VALUES( NOW(), NOW(), @@ -137,10 +135,7 @@ VALUES( :contact_urn_id, 0 ) -RETURNING - id, - NOW(); -` +RETURNING id, NOW();` // InsertIVRConnection creates a new IVR session for the passed in org, channel and contact, inserting it func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelID ChannelID, startID StartID, contactID ContactID, urnID URNID, @@ -159,7 +154,7 @@ func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelI c.ExternalID = externalID c.StartID = startID - rows, err := db.NamedQueryContext(ctx, insertConnectionSQL, c) + rows, err := db.NamedQueryContext(ctx, sqlInsertConnection, c) if err != nil { return nil, errors.Wrapf(err, "error inserting new channel connection") } diff --git a/core/models/channel_event.go b/core/models/channel_event.go index 12164dd9f..2c1281266 100644 --- a/core/models/channel_event.go +++ b/core/models/channel_event.go @@ -78,18 +78,15 @@ func (e *ChannelEvent) UnmarshalJSON(b []byte) error { return json.Unmarshal(b, &e.e) } -const insertChannelEventSQL = ` -INSERT INTO - channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id) - VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id) -RETURNING - id -` +const sqlInsertChannelEvent = ` +INSERT INTO channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id) + VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id) + RETURNING id` // Insert inserts this channel event to our DB. The ID of the channel event will be // set if no error is returned func (e *ChannelEvent) Insert(ctx context.Context, db Queryer) error { - return BulkQuery(ctx, "insert channel event", db, insertChannelEventSQL, []interface{}{&e.e}) + return BulkQuery(ctx, "insert channel event", db, sqlInsertChannelEvent, []interface{}{&e.e}) } // NewChannelEvent creates a new channel event for the passed in parameters, returning it diff --git a/core/models/channel_logs.go b/core/models/channel_logs.go index a0ff14b9c..385992d6c 100644 --- a/core/models/channel_logs.go +++ b/core/models/channel_logs.go @@ -35,13 +35,10 @@ type ChannelLog struct { // ID returns the id of this channel log func (l *ChannelLog) ID() ChannelLogID { return l.l.ID } -const insertChannelLogSQL = ` -INSERT INTO - channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id) - VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id) -RETURNING - id as id -` +const sqlInsertChannelLog = ` +INSERT INTO channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id) + VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id) + RETURNING id as id` // NewChannelLog creates a new channel log func NewChannelLog(trace *httpx.Trace, isError bool, desc string, channel *Channel, conn *ChannelConnection) *ChannelLog { @@ -87,7 +84,7 @@ func InsertChannelLogs(ctx context.Context, db Queryer, logs []*ChannelLog) erro ls[i] = &logs[i].l } - err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, ls) + err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, ls) if err != nil { return errors.Wrapf(err, "error inserting channel log") } @@ -117,7 +114,7 @@ func InsertChannelLog(ctx context.Context, db Queryer, l.ConnectionID = conn.ID() } - err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, []interface{}{l}) + err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, []interface{}{l}) if err != nil { return nil, errors.Wrapf(err, "error inserting channel log") } diff --git a/core/models/imports.go b/core/models/imports.go index 51785416c..bd2ada923 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -53,49 +53,34 @@ type ContactImport struct { BatchStatuses string `db:"batch_statuses"` } -var loadContactImportSQL = ` -SELECT - i.id AS "id", - i.org_id AS "org_id", - i.status AS "status", - i.created_by_id AS "created_by_id", - i.finished_on AS "finished_on", - array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses" -FROM - contacts_contactimport i -LEFT OUTER JOIN - contacts_contactimportbatch b ON b.contact_import_id = i.id -WHERE - i.id = $1 -GROUP BY - i.id` +var sqlLoadContactImport = ` + SELECT i.id, i.org_id, i.status, i.created_by_id, i.finished_on, array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses" + FROM contacts_contactimport i +LEFT OUTER JOIN contacts_contactimportbatch b ON b.contact_import_id = i.id + WHERE i.id = $1 + GROUP BY i.id` // LoadContactImport loads a contact import by ID func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) { i := &ContactImport{} - err := db.GetContext(ctx, i, loadContactImportSQL, id) + err := db.GetContext(ctx, i, sqlLoadContactImport, id) if err != nil { return nil, errors.Wrapf(err, "error loading contact import id=%d", id) } return i, nil } -var markContactImportFinishedSQL = ` -UPDATE - contacts_contactimport -SET - status = $2, - finished_on = $3 -WHERE - id = $1 -` +var sqlMarkContactImportFinished = ` +UPDATE contacts_contactimport + SET status = $2, finished_on = $3 + WHERE id = $1` func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error { now := dates.Now() i.Status = status i.FinishedOn = &now - _, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn) + _, err := db.ExecContext(ctx, sqlMarkContactImportFinished, i.ID, i.Status, i.FinishedOn) return errors.Wrap(err, "error marking import as finished") } diff --git a/core/models/incident.go b/core/models/incident.go index fcd6fe644..e9b5299ab 100644 --- a/core/models/incident.go +++ b/core/models/incident.go @@ -97,13 +97,15 @@ func IncidentWebhooksUnhealthy(ctx context.Context, db Queryer, rp *redis.Pool, return id, nil } -const insertIncidentSQL = ` -INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id) VALUES($1, $2, $3, $4, $5) -ON CONFLICT DO NOTHING RETURNING id` +const sqlInsertIncident = ` +INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id) + VALUES($1, $2, $3, $4, $5) +ON CONFLICT DO NOTHING + RETURNING id` func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, incident *Incident) (IncidentID, error) { var incidentID IncidentID - err := db.GetContext(ctx, &incidentID, insertIncidentSQL, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID) + err := db.GetContext(ctx, &incidentID, sqlInsertIncident, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID) if err != nil && err != sql.ErrNoRows { return NilIncidentID, errors.Wrap(err, "error inserting incident") } @@ -125,13 +127,13 @@ func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, inciden return incidentID, nil } -const selectOpenIncidentsSQL = ` +const sqlSelectOpenIncidents = ` SELECT id, org_id, incident_type, scope, started_on, ended_on, channel_id -FROM notifications_incident -WHERE ended_on IS NULL AND incident_type = ANY($1)` + FROM notifications_incident + WHERE ended_on IS NULL AND incident_type = ANY($1)` func GetOpenIncidents(ctx context.Context, db Queryer, types []IncidentType) ([]*Incident, error) { - rows, err := db.QueryxContext(ctx, selectOpenIncidentsSQL, pq.Array(types)) + rows, err := db.QueryxContext(ctx, sqlSelectOpenIncidents, pq.Array(types)) if err != nil { return nil, errors.Wrap(err, "error querying open incidents") } diff --git a/core/models/webhook_event.go b/core/models/webhook_event.go index 7fa776c56..5991e8a6e 100644 --- a/core/models/webhook_event.go +++ b/core/models/webhook_event.go @@ -34,8 +34,8 @@ func NewWebhookEvent(orgID OrgID, resthookID ResthookID, data string, createdOn } const sqlInsertWebhookEvents = ` -INSERT INTO api_webhookevent(data, resthook_id, org_id, created_on, action) - VALUES (:data, :resthook_id, :org_id, :created_on, 'POST') +INSERT INTO api_webhookevent(data, resthook_id, org_id, created_on, action) + VALUES(:data, :resthook_id, :org_id, :created_on, 'POST') RETURNING id` // InsertWebhookEvents inserts the passed in webhook events, assigning them ids