Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup some SQL variables #598

Merged
merged 3 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 13 additions & 27 deletions core/hooks/commit_field_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ func (h *commitFieldChangesHook) Apply(ctx context.Context, rt *runtime.Runtime,
// first apply our deletes
// in pg9.6 we need to do this as one query per field type, in pg10 we can rewrite this to be a single query
for _, fds := range fieldDeletes {
err := models.BulkQuery(ctx, "deleting contact field values", tx, deleteContactFieldsSQL, fds)
err := models.BulkQuery(ctx, "deleting contact field values", tx, sqlDeleteContactFields, fds)
if err != nil {
return errors.Wrapf(err, "error deleting contact fields")
}
}

// then our updates
if len(fieldUpdates) > 0 {
err := models.BulkQuery(ctx, "updating contact field values", tx, updateContactFieldsSQL, fieldUpdates)
err := models.BulkQuery(ctx, "updating contact field values", tx, sqlUpdateContactFields, fieldUpdates)
if err != nil {
return errors.Wrapf(err, "error updating contact fields")
}
Expand All @@ -99,28 +99,14 @@ type FieldValue struct {
Text string `json:"text"`
}

const updateContactFieldsSQL = `
UPDATE
contacts_contact c
SET
fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb
FROM (
VALUES(:contact_id, :updates)
) AS
r(contact_id, updates)
WHERE
c.id = r.contact_id::int
`

const deleteContactFieldsSQL = `
UPDATE
contacts_contact c
SET
fields = fields - r.field_uuid
FROM (
VALUES(:contact_id, :field_uuid)
) AS
r(contact_id, field_uuid)
WHERE
c.id = r.contact_id::int
`
const sqlUpdateContactFields = `
UPDATE contacts_contact c
SET fields = COALESCE(fields,'{}'::jsonb) || r.updates::jsonb
FROM (VALUES(:contact_id, :updates)) AS r(contact_id, updates)
WHERE c.id = r.contact_id::int`

const sqlDeleteContactFields = `
UPDATE contacts_contact c
SET fields = fields - r.field_uuid
FROM (VALUES(:contact_id, :field_uuid)) AS r(contact_id, field_uuid)
WHERE c.id = r.contact_id::int`
19 changes: 6 additions & 13 deletions core/hooks/commit_language_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (h *commitLanguageChangesHook) Apply(ctx context.Context, rt *runtime.Runti
}

// do our update
return models.BulkQuery(ctx, "updating contact language", tx, updateContactLanguageSQL, updates)
return models.BulkQuery(ctx, "updating contact language", tx, sqlUpdateContactLanguage, updates)
}

// struct used for our bulk update
Expand All @@ -35,15 +35,8 @@ type languageUpdate struct {
Language null.String `db:"language"`
}

const updateContactLanguageSQL = `
UPDATE
contacts_contact c
SET
language = r.language
FROM (
VALUES(:id, :language)
) AS
r(id, language)
WHERE
c.id = r.id::int
`
const sqlUpdateContactLanguage = `
UPDATE contacts_contact c
SET language = r.language
FROM (VALUES(:id, :language)) AS r(id, language)
WHERE c.id = r.id::int`
19 changes: 6 additions & 13 deletions core/hooks/commit_name_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (h *commitNameChangesHook) Apply(ctx context.Context, rt *runtime.Runtime,
}

// do our update
return models.BulkQuery(ctx, "updating contact name", tx, updateContactNameSQL, updates)
return models.BulkQuery(ctx, "updating contact name", tx, sqlUpdateContactName, updates)
}

// struct used for our bulk insert
Expand All @@ -36,15 +36,8 @@ type nameUpdate struct {
Name null.String `db:"name"`
}

const updateContactNameSQL = `
UPDATE
contacts_contact c
SET
name = r.name
FROM (
VALUES(:id, :name)
) AS
r(id, name)
WHERE
c.id = r.id::int
`
const sqlUpdateContactName = `
UPDATE contacts_contact c
SET name = r.name
FROM (VALUES(:id, :name)) AS r(id, name)
WHERE c.id = r.id::int`
58 changes: 20 additions & 38 deletions core/models/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,55 +626,37 @@ type eligibleContact struct {
RelToValue *time.Time `db:"rel_to_value"`
}

const eligibleContactsForCreatedOnSQL = `
SELECT
c.id AS contact_id,
c.created_on AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE
`

const eligibleContactsForLastSeenOnSQL = `
SELECT
c.id AS contact_id,
c.last_seen_on AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL
`

const eligibleContactsForFieldSQL = `
SELECT
c.id AS contact_id,
(c.fields->$2->>'datetime')::timestamptz AS rel_to_value
FROM
contacts_contact c
INNER JOIN
contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE
gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL
`
const sqlEligibleContactsForCreatedOn = `
SELECT c.id AS contact_id, c.created_on AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE`

const sqlEligibleContactsForLastSeenOn = `
SELECT c.id AS contact_id, c.last_seen_on AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND c.last_seen_on IS NOT NULL`

const sqlEligibleContactsForField = `
SELECT c.id AS contact_id, (c.fields->$2->>'datetime')::timestamptz AS rel_to_value
FROM contacts_contact c
INNER JOIN contacts_contactgroup_contacts gc ON gc.contact_id = c.id
WHERE gc.contactgroup_id = $1 AND c.is_active = TRUE AND ARRAY[$2]::text[] <@ (extract_jsonb_keys(c.fields)) IS NOT NULL`

func campaignEventEligibleContacts(ctx context.Context, db Queryer, groupID GroupID, field *Field) ([]*eligibleContact, error) {
var query string
var params []interface{}

switch field.Key() {
case CreatedOnKey:
query = eligibleContactsForCreatedOnSQL
query = sqlEligibleContactsForCreatedOn
params = []interface{}{groupID}
case LastSeenOnKey:
query = eligibleContactsForLastSeenOnSQL
query = sqlEligibleContactsForLastSeenOn
params = []interface{}{groupID}
default:
query = eligibleContactsForFieldSQL
query = sqlEligibleContactsForField
params = []interface{}{groupID, field.UUID()}
}

Expand Down
13 changes: 4 additions & 9 deletions core/models/channel_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ func (c *ChannelConnection) ErrorReason() ConnectionError { return ConnectionErr
func (c *ChannelConnection) ErrorCount() int { return c.c.ErrorCount }
func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt }

const insertConnectionSQL = `
INSERT INTO
channels_channelconnection
const sqlInsertConnection = `
INSERT INTO channels_channelconnection
(
created_on,
modified_on,
Expand All @@ -122,7 +121,6 @@ INSERT INTO
contact_urn_id,
error_count
)

VALUES(
NOW(),
NOW(),
Expand All @@ -137,10 +135,7 @@ VALUES(
:contact_urn_id,
0
)
RETURNING
id,
NOW();
`
RETURNING id, NOW();`

// InsertIVRConnection creates a new IVR session for the passed in org, channel and contact, inserting it
func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelID ChannelID, startID StartID, contactID ContactID, urnID URNID,
Expand All @@ -159,7 +154,7 @@ func InsertIVRConnection(ctx context.Context, db *sqlx.DB, orgID OrgID, channelI
c.ExternalID = externalID
c.StartID = startID

rows, err := db.NamedQueryContext(ctx, insertConnectionSQL, c)
rows, err := db.NamedQueryContext(ctx, sqlInsertConnection, c)
if err != nil {
return nil, errors.Wrapf(err, "error inserting new channel connection")
}
Expand Down
13 changes: 5 additions & 8 deletions core/models/channel_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,15 @@ func (e *ChannelEvent) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &e.e)
}

const insertChannelEventSQL = `
INSERT INTO
channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id)
VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id)
RETURNING
id
`
const sqlInsertChannelEvent = `
INSERT INTO channels_channelevent(event_type, extra, occurred_on, created_on, channel_id, contact_id, contact_urn_id, org_id)
VALUES(:event_type, :extra, :occurred_on, :created_on, :channel_id, :contact_id, :contact_urn_id, :org_id)
RETURNING id`

// Insert inserts this channel event to our DB. The ID of the channel event will be
// set if no error is returned
func (e *ChannelEvent) Insert(ctx context.Context, db Queryer) error {
return BulkQuery(ctx, "insert channel event", db, insertChannelEventSQL, []interface{}{&e.e})
return BulkQuery(ctx, "insert channel event", db, sqlInsertChannelEvent, []interface{}{&e.e})
}

// NewChannelEvent creates a new channel event for the passed in parameters, returning it
Expand Down
15 changes: 6 additions & 9 deletions core/models/channel_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,10 @@ type ChannelLog struct {
// ID returns the id of this channel log
func (l *ChannelLog) ID() ChannelLogID { return l.l.ID }

const insertChannelLogSQL = `
INSERT INTO
channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id)
VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id)
RETURNING
id as id
`
const sqlInsertChannelLog = `
INSERT INTO channels_channellog( description, is_error, url, method, request, response, response_status, created_on, request_time, channel_id, connection_id)
VALUES(:description, :is_error, :url, :method, :request, :response, :response_status, :created_on, :request_time, :channel_id, :connection_id)
RETURNING id as id`

// NewChannelLog creates a new channel log
func NewChannelLog(trace *httpx.Trace, isError bool, desc string, channel *Channel, conn *ChannelConnection) *ChannelLog {
Expand Down Expand Up @@ -87,7 +84,7 @@ func InsertChannelLogs(ctx context.Context, db Queryer, logs []*ChannelLog) erro
ls[i] = &logs[i].l
}

err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, ls)
err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, ls)
if err != nil {
return errors.Wrapf(err, "error inserting channel log")
}
Expand Down Expand Up @@ -117,7 +114,7 @@ func InsertChannelLog(ctx context.Context, db Queryer,
l.ConnectionID = conn.ID()
}

err := BulkQuery(ctx, "insert channel log", db, insertChannelLogSQL, []interface{}{l})
err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, []interface{}{l})
if err != nil {
return nil, errors.Wrapf(err, "error inserting channel log")
}
Expand Down
39 changes: 12 additions & 27 deletions core/models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,49 +53,34 @@ type ContactImport struct {
BatchStatuses string `db:"batch_statuses"`
}

var loadContactImportSQL = `
SELECT
i.id AS "id",
i.org_id AS "org_id",
i.status AS "status",
i.created_by_id AS "created_by_id",
i.finished_on AS "finished_on",
array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses"
FROM
contacts_contactimport i
LEFT OUTER JOIN
contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE
i.id = $1
GROUP BY
i.id`
var sqlLoadContactImport = `
SELECT i.id, i.org_id, i.status, i.created_by_id, i.finished_on, array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses"
FROM contacts_contactimport i
LEFT OUTER JOIN contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE i.id = $1
GROUP BY i.id`

// LoadContactImport loads a contact import by ID
func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) {
i := &ContactImport{}
err := db.GetContext(ctx, i, loadContactImportSQL, id)
err := db.GetContext(ctx, i, sqlLoadContactImport, id)
if err != nil {
return nil, errors.Wrapf(err, "error loading contact import id=%d", id)
}
return i, nil
}

var markContactImportFinishedSQL = `
UPDATE
contacts_contactimport
SET
status = $2,
finished_on = $3
WHERE
id = $1
`
var sqlMarkContactImportFinished = `
UPDATE contacts_contactimport
SET status = $2, finished_on = $3
WHERE id = $1`

func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error {
now := dates.Now()
i.Status = status
i.FinishedOn = &now

_, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn)
_, err := db.ExecContext(ctx, sqlMarkContactImportFinished, i.ID, i.Status, i.FinishedOn)
return errors.Wrap(err, "error marking import as finished")
}

Expand Down
18 changes: 10 additions & 8 deletions core/models/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ func IncidentWebhooksUnhealthy(ctx context.Context, db Queryer, rp *redis.Pool,
return id, nil
}

const insertIncidentSQL = `
INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id) VALUES($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING RETURNING id`
const sqlInsertIncident = `
INSERT INTO notifications_incident(org_id, incident_type, scope, started_on, channel_id)
VALUES($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING
RETURNING id`

func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, incident *Incident) (IncidentID, error) {
var incidentID IncidentID
err := db.GetContext(ctx, &incidentID, insertIncidentSQL, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID)
err := db.GetContext(ctx, &incidentID, sqlInsertIncident, incident.OrgID, incident.Type, incident.Scope, incident.StartedOn, incident.ChannelID)
if err != nil && err != sql.ErrNoRows {
return NilIncidentID, errors.Wrap(err, "error inserting incident")
}
Expand All @@ -125,13 +127,13 @@ func getOrCreateIncident(ctx context.Context, db Queryer, oa *OrgAssets, inciden
return incidentID, nil
}

const selectOpenIncidentsSQL = `
const sqlSelectOpenIncidents = `
SELECT id, org_id, incident_type, scope, started_on, ended_on, channel_id
FROM notifications_incident
WHERE ended_on IS NULL AND incident_type = ANY($1)`
FROM notifications_incident
WHERE ended_on IS NULL AND incident_type = ANY($1)`

func GetOpenIncidents(ctx context.Context, db Queryer, types []IncidentType) ([]*Incident, error) {
rows, err := db.QueryxContext(ctx, selectOpenIncidentsSQL, pq.Array(types))
rows, err := db.QueryxContext(ctx, sqlSelectOpenIncidents, pq.Array(types))
if err != nil {
return nil, errors.Wrap(err, "error querying open incidents")
}
Expand Down
Loading