Skip to content

Commit

Permalink
Merge pull request rapidpro#598 from nyaruka/sql_cleanup
Browse files Browse the repository at this point in the history
Cleanup some SQL variables
  • Loading branch information
rowanseymour authored Mar 18, 2022
2 parents 8b58b4c + 58c901a commit a4478a9
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 182 deletions.
40 changes: 13 additions & 27 deletions core/hooks/commit_field_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ func (h *commitFieldChangesHook) Apply(ctx context.Context, rt *runtime.Runtime,
// first apply our deletes
// in pg9.6 we need to do this as one query per field type, in pg10 we can rewrite this to be a single query
for _, fds := range fieldDeletes {
err := models.BulkQuery(ctx, "deleting contact field values", tx, deleteContactFieldsSQL, fds)
err := models.BulkQuery(ctx, "deleting contact field values", tx, sqlDeleteContactFields, fds)
if err != nil {
return errors.Wrapf(err, "error deleting contact fields")
}
}

// then our updates
if len(fieldUpdates) > 0 {
err := models.BulkQuery(ctx, "updating contact field values", tx, updateContactFieldsSQL, fieldUpdates)
err := models.BulkQuery(ctx, "updating contact field values", tx, sqlUpdateContactFields, fieldUpdates)
if err != nil {
return errors.Wrapf(err, "error updating contact fields")
}
Expand All @@ -99,28 +99,14 @@ type FieldValue struct {
Text string `json:"text"`
}

const updateContactFieldsSQL = `
UPDATE
contacts_contact c
SET
fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb
FROM (
VALUES(:contact_id, :updates)
) AS
r(contact_id, updates)
WHERE
c.id = r.contact_id::int
`

const deleteContactFieldsSQL = `
UPDATE
contacts_contact c
SET
fields = fields - r.field_uuid
FROM (
VALUES(:contact_id, :field_uuid)
) AS
r(contact_id, field_uuid)
WHERE
c.id = r.contact_id::int
`
const sqlUpdateContactFields = `
UPDATE contacts_contact c
SET fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb
FROM (VALUES(:contact_id, :updates)) AS r(contact_id, updates)
WHERE c.id = r.contact_id::int`

const sqlDeleteContactFields = `
UPDATE contacts_contact c
SET fields = fields - r.field_uuid
FROM (VALUES(:contact_id, :field_uuid)) AS r(contact_id, field_uuid)
WHERE c.id = r.contact_id::int`
19 changes: 6 additions & 13 deletions core/hooks/commit_language_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (h *commitLanguageChangesHook) Apply(ctx context.Context, rt *runtime.Runti
}

// do our update
return models.BulkQuery(ctx, "updating contact language", tx, updateContactLanguageSQL, updates)
return models.BulkQuery(ctx, "updating contact language", tx, sqlUpdateContactLanguage, updates)
}

// struct used for our bulk update
Expand All @@ -35,15 +35,8 @@ type languageUpdate struct {
Language null.String `db:"language"`
}

const updateContactLanguageSQL = `
UPDATE
contacts_contact c
SET
language = r.language
FROM (
VALUES(:id, :language)
) AS
r(id, language)
WHERE
c.id = r.id::int
`
const sqlUpdateContactLanguage = `
UPDATE contacts_contact c
SET language = r.language
FROM (VALUES(:id, :language)) AS r(id, language)
WHERE c.id = r.id::int`
19 changes: 6 additions & 13 deletions core/hooks/commit_name_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (h *commitNameChangesHook) Apply(ctx context.Context, rt *runtime.Runtime,
}

// do our update
return models.BulkQuery(ctx, "updating contact name", tx, updateContactNameSQL, updates)
return models.BulkQuery(ctx, "updating contact name", tx, sqlUpdateContactName, updates)
}

// struct used for our bulk insert
Expand All @@ -36,15 +36,8 @@ type nameUpdate struct {
Name null.String `db:"name"`
}

const updateContactNameSQL = `
UPDATE
contacts_contact c
SET
name = r.name
FROM (
VALUES(:id, :name)
) AS
r(id, name)
WHERE
c.id = r.id::int
`
const sqlUpdateContactName = `
UPDATE contacts_contact c
SET name = r.name
FROM (VALUES(:id, :name)) AS r(id, name)
WHERE c.id = r.id::int`
58 changes: 20 additions & 38 deletions core/models/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,55 +626,37 @@ type eligibleContact struct {
RelToValue *time.Time `db:"rel_to_value"`
}

const eligibleContactsForCreatedOnSQL = `
SELECT
c.id AS contact_id,
c.created_on AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE
`

const eligibleContactsForLastSeenOnSQL = `
SELECT
c.id AS contact_id,
c.last_seen_on AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL
`

const eligibleContactsForFieldSQL = `
SELECT
c.id AS contact_id,
(c.fields->$2->>'datetime')::timestamptz AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL
`
const sqlEligibleContactsForCreatedOn = `
SELECT c.id AS contact_id, c.created_on AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE`

const sqlEligibleContactsForLastSeenOn = `
SELECT c.id AS contact_id, c.last_seen_on AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL`

const sqlEligibleContactsForField = `
SELECT c.id AS contact_id, (c.fields->$2->>'datetime')::timestamptz AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL`

func campaignEventEligibleContacts(ctx context.Context, db Queryer, groupID GroupID, field *Field) ([]*eligibleContact, error) {
var query string
var params []interface{}

switch field.Key() {
case CreatedOnKey:
query = eligibleContactsForCreatedOnSQL
query = sqlEligibleContactsForCreatedOn
params = []interface{}{groupID}
case LastSeenOnKey:
query = eligibleContactsForLastSeenOnSQL
query = sqlEligibleContactsForLastSeenOn
params = []interface{}{groupID}
default:
query = eligibleContactsForFieldSQL
query = sqlEligibleContactsForField
params = []interface{}{groupID, field.UUID()}
}

Expand Down
13 changes: 4 additions & 9 deletions core/models/channel_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ func (c *ChannelConnection) ErrorReason() ConnectionError { return ConnectionErr
func (c *ChannelConnection) ErrorCount() int { return c.c.ErrorCount }
func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt }

const insertConnectionSQL = `
INSERT INTO
channels_channelconnection
const sqlInsertConnection = `
INSERT INTO channels_channelconnection
(
created_on,
modified_on,
Expand All @@ -122,7 +121,6 @@ INSERT INTO
contact_urn_id,
error_count
)
VALUES(
NOW(),
NOW(),
Expand All @@ -137,10 +135,7 @@ VALUES(
:contact_urn_id,
0
)
RETURNING
id,
NOW();
`
RETURNING id, NOW();`

// InsertIVRConnection creates a new IVR session for the passed in org, channel and contact, inserting it
func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelID ChannelID, startID StartID, contactID ContactID, urnID URNID,
Expand All @@ -159,7 +154,7 @@ func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelI
c.ExternalID = externalID
c.StartID = startID

rows, err := db.NamedQueryContext(ctx, insertConnectionSQL, c)
rows, err := db.NamedQueryContext(ctx, sqlInsertConnection, c)
if err != nil {
return nil, errors.Wrapf(err, "error inserting new channel connection")
}
Expand Down
13 changes: 5 additions & 8 deletions core/models/channel_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,15 @@ func (e *ChannelEvent) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &e.e)
}

const insertChannelEventSQL = `
INSERT INTO
channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id)
VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id)
RETURNING
id
`
const sqlInsertChannelEvent = `
INSERT INTO channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id)
VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id)
RETURNING id`

// Insert inserts this channel event to our DB. The ID of the channel event will be
// set if no error is returned
func (e *ChannelEvent) Insert(ctx context.Context, db Queryer) error {
return BulkQuery(ctx, "insert channel event", db, insertChannelEventSQL, []interface{}{&e.e})
return BulkQuery(ctx, "insert channel event", db, sqlInsertChannelEvent, []interface{}{&e.e})
}

// NewChannelEvent creates a new channel event for the passed in parameters, returning it
Expand Down
15 changes: 6 additions & 9 deletions core/models/channel_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,10 @@ type ChannelLog struct {
// ID returns the id of this channel log
func (l *ChannelLog) ID() ChannelLogID { return l.l.ID }

const insertChannelLogSQL = `
INSERT INTO
channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id)
VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id)
RETURNING
id as id
`
const sqlInsertChannelLog = `
INSERT INTO channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id)
VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id)
RETURNING id as id`

// NewChannelLog creates a new channel log
func NewChannelLog(trace *httpx.Trace, isError bool, desc string, channel *Channel, conn *ChannelConnection) *ChannelLog {
Expand Down Expand Up @@ -87,7 +84,7 @@ func InsertChannelLogs(ctx context.Context, db Queryer, logs []*ChannelLog) erro
ls[i] = &logs[i].l
}

err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, ls)
err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, ls)
if err != nil {
return errors.Wrapf(err, "error inserting channel log")
}
Expand Down Expand Up @@ -117,7 +114,7 @@ func InsertChannelLog(ctx context.Context, db Queryer,
l.ConnectionID = conn.ID()
}

err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, []interface{}{l})
err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, []interface{}{l})
if err != nil {
return nil, errors.Wrapf(err, "error inserting channel log")
}
Expand Down
39 changes: 12 additions & 27 deletions core/models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,49 +53,34 @@ type ContactImport struct {
BatchStatuses string `db:"batch_statuses"`
}

var loadContactImportSQL = `
SELECT
i.id AS "id",
i.org_id AS "org_id",
i.status AS "status",
i.created_by_id AS "created_by_id",
i.finished_on AS "finished_on",
array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses"
FROM
contacts_contactimport i
LEFT OUTER JOIN
contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE
i.id = $1
GROUP BY
i.id`
var sqlLoadContactImport = `
SELECT i.id, i.org_id, i.status, i.created_by_id, i.finished_on, array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses"
FROM contacts_contactimport i
LEFT OUTER JOIN contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE i.id = $1
GROUP BY i.id`

// LoadContactImport loads a contact import by ID
func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) {
i := &ContactImport{}
err := db.GetContext(ctx, i, loadContactImportSQL, id)
err := db.GetContext(ctx, i, sqlLoadContactImport, id)
if err != nil {
return nil, errors.Wrapf(err, "error loading contact import id=%d", id)
}
return i, nil
}

var markContactImportFinishedSQL = `
UPDATE
contacts_contactimport
SET
status = $2,
finished_on = $3
WHERE
id = $1
`
var sqlMarkContactImportFinished = `
UPDATE contacts_contactimport
SET status = $2, finished_on = $3
WHERE id = $1`

func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error {
now := dates.Now()
i.Status = status
i.FinishedOn = &now

_, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn)
_, err := db.ExecContext(ctx, sqlMarkContactImportFinished, i.ID, i.Status, i.FinishedOn)
return errors.Wrap(err, "error marking import as finished")
}

Expand Down
18 changes: 10 additions & 8 deletions core/models/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ func IncidentWebhooksUnhealthy(ctx context.Context, db Queryer, rp *redis.Pool,
return id, nil
}

const insertIncidentSQL = `
INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id) VALUES($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING RETURNING id`
const sqlInsertIncident = `
INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id)
VALUES($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING
RETURNING id`

func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, incident *Incident) (IncidentID, error) {
var incidentID IncidentID
err := db.GetContext(ctx, &incidentID, insertIncidentSQL, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID)
err := db.GetContext(ctx, &incidentID, sqlInsertIncident, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID)
if err != nil && err != sql.ErrNoRows {
return NilIncidentID, errors.Wrap(err, "error inserting incident")
}
Expand All @@ -125,13 +127,13 @@ func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, inciden
return incidentID, nil
}

const selectOpenIncidentsSQL = `
const sqlSelectOpenIncidents = `
SELECT id, org_id, incident_type, scope, started_on, ended_on, channel_id
FROM notifications_incident
WHERE ended_on IS NULL AND incident_type = ANY($1)`
FROM notifications_incident
WHERE ended_on IS NULL AND incident_type = ANY($1)`

func GetOpenIncidents(ctx context.Context, db Queryer, types []IncidentType) ([]*Incident, error) {
rows, err := db.QueryxContext(ctx, selectOpenIncidentsSQL, pq.Array(types))
rows, err := db.QueryxContext(ctx, sqlSelectOpenIncidents, pq.Array(types))
if err != nil {
return nil, errors.Wrap(err, "error querying open incidents")
}
Expand Down
Loading

0 comments on commit a4478a9

Please sign in to comment.