diff --git a/eyeshade/consumer.js b/eyeshade/consumer.js index 661e5f319..527caa058 100644 --- a/eyeshade/consumer.js +++ b/eyeshade/consumer.js @@ -3,7 +3,6 @@ import { Runtime } from 'bat-utils/boot-runtime.js' import config from '../config.js' import suggestionsConsumer from './workers/suggestions.js' import voteConsumer from './workers/acvote.js' -import referralsConsumer from './workers/referrals.js' import settlementsConsumer from './workers/settlements.js' import { getCurrent } from './migrations/current.js' @@ -29,7 +28,6 @@ const runtime = new Runtime(config) extras.utils.setupKafkaCert() suggestionsConsumer(runtime) voteConsumer(runtime) -referralsConsumer(runtime) settlementsConsumer(runtime) runtime.kafka.consume().catch(console.error) export default runtime diff --git a/eyeshade/workers/referrals.js b/eyeshade/workers/referrals.js deleted file mode 100644 index 01ee4df02..000000000 --- a/eyeshade/workers/referrals.js +++ /dev/null @@ -1,102 +0,0 @@ -import { normalizeChannel } from 'bat-utils/lib/extras-utils.js' -import transaction from '../lib/transaction.js' -import referrals from '../lib/referrals.js' -import { getActiveCountryGroups } from '../lib/queries.js' -import countries from '../lib/countries.js' - -const getTransactionsById = ` -SELECT id -FROM transactions -WHERE id = any($1::UUID[])` - -export default function consumer (runtime) { - const { kafka, postgres, config } = runtime - kafka.on(referrals.topic, async (messages, client) => { - const inserting = {} - const { - rows: referralGroups - } = await postgres.query(getActiveCountryGroups(), [], client) - - const docs = await kafka.mapMessages(referrals, messages, async (ref) => { - const { - ownerId: owner, - channelId: publisher, - transactionId, - downloadId, - countryGroupId, - finalizedTimestamp - } = ref - const txId = transactionId || downloadId - - const { - probi - } = await countries.computeValue(runtime, countryGroupId, referralGroups) - - const _id = { - owner, - // take care of empty string case - publisher: publisher || null, - altcurrency: config.altcurrency || 'BAT' - } - const referral = { - _id, - probi, - firstId: new Date(finalizedTimestamp), - transactionId: txId - } - - const normalizedChannel = normalizeChannel(referral._id.publisher) - const id = transaction.id.referral(txId, normalizedChannel) - return { - id, - referral - } - }) - - // drop documents that do not start with 'removed' id - const ownerPrefix = 'publishers#uuid:' - const wasMissingPrefix = {} - const filteredDocs = docs.map((doc) => { - if (doc.referral._id.owner.slice(0, 16) !== ownerPrefix) { - doc.referral._id.owner = ownerPrefix + doc.referral._id.owner - wasMissingPrefix[doc.id] = true - } - return doc - }).filter((doc) => { - const { referral: { _id: { owner } } } = doc - const throwAway = owner && owner.slice(16) === 'removed' - if (throwAway) { - runtime.captureException(new Error('referral owner removed'), { - extra: doc - }) - } - return !throwAway - }) - const ids = filteredDocs.map(({ id }) => id) - const { - rows: previouslyInserted - } = await postgres.query(getTransactionsById, [ids]) - return Promise.all(filteredDocs.map(async ({ id: targetId, referral }) => { - // this part will still be checked serially and first, - // even if one of the referrals hits the await at the bottom - // AsyncFunction(s) run syncronously as long as possible. - // they do not start out async. which can be a little confusing - if (inserting[targetId]) { - return - } - inserting[targetId] = true - if (previouslyInserted.find(({ - id - }) => id === targetId)) { - if (wasMissingPrefix[targetId]) { - await postgres.query(` - update transactions - set to_account = $2 - where id = $1`, [targetId, referral._id.owner], client) - } - return - } - return transaction.insertFromReferrals(runtime, client, referral) - })) - }) -} diff --git a/eyeshade/workers/referrals.test.js b/eyeshade/workers/referrals.test.js deleted file mode 100644 index 8f9375c5c..000000000 --- a/eyeshade/workers/referrals.test.js +++ /dev/null @@ -1,153 +0,0 @@ -'use strict' -import test from 'ava' -import _ from 'underscore' -import { normalizeChannel, timeout } from 'bat-utils/lib/extras-utils.js' -import { Runtime } from 'bat-utils' -import config from '../../config.js' -import transaction from '../lib/transaction.js' -import referrals from '../lib/referrals.js' -import utils from '../../test/utils.js' -import referralsConsumer from './referrals.js' - -const { - ok, - cleanEyeshadePgDb, - agents, - readJSONFile -} = utils - -test.before(async (t) => { - Object.assign(t.context, { - runtime: new Runtime(config) - }) - referralsConsumer(t.context.runtime) - await t.context.runtime.kafka.consume().catch(console.error) -}) -test.beforeEach((t) => cleanEyeshadePgDb(t.context.runtime.postgres)) - -test('referral groups are returned correctly', async (t) => { - let body, fields - const requiredKeys = ['id'] - const json = normalizeGroups(readJSONFile('data', 'referral-groups', '0010.json')) - body = await getGroups() - t.deepEqual(json.map(j => _.pick(j, ['id'])), body, 'no fields results in only ids') - // one field - fields = ['codes'] - body = await getGroups({ fields }) - const codesSubset = json.map((j) => _.pick(j, requiredKeys.concat(fields))) - t.deepEqual(codesSubset, body, 'referral groups should be present') - - fields = ['codes', 'name', 'currency', 'activeAt'] - body = await getGroups({ fields }) - const codesNameSubset = json.map((j) => _.pick(j, requiredKeys.concat(fields))) - t.deepEqual(codesNameSubset, body, 'referral fields should be present') - const stringQuery = await getGroups({ fields: 'codes,name,currency,activeAt' }) - t.deepEqual(codesNameSubset, stringQuery, 'a string or array can be sent for query') - const whitespacedQuery = await getGroups({ fields: 'codes,name, currency, activeAt' }) - t.deepEqual(codesNameSubset, whitespacedQuery, 'works with whitespace') - const groupId1 = 'e48f310b-0e81-4b39-a836-4dda32d7df74' - const groupId2 = '6491bbe5-4d50-4c05-af5c-a2ac4a04d14e' - const australiaInGroup1 = ` - INSERT INTO geo_referral_countries - (group_id,name,country_code) - VALUES - ('${groupId1}','Australia','AU') - ` - await t.context.runtime.postgres.query(australiaInGroup1) - const unresolvedGroups = await getGroups({ fields }) - const howUnresolvedGroupsShouldLookBase = normalizeGroups(json) - howUnresolvedGroupsShouldLookBase.find(({ id }) => id === groupId1).codes.push('AU') - const howUnresolvedGroupsShouldLook = normalizeGroups(howUnresolvedGroupsShouldLookBase) - t.deepEqual(normalizeGroups(unresolvedGroups), howUnresolvedGroupsShouldLook, 'should add au to the group') - - const howResolvedGroupsShouldLook = normalizeGroups(howUnresolvedGroupsShouldLookBase) - const resolvedGroup = howResolvedGroupsShouldLook.find(({ id }) => id === groupId2) - const auIndex = resolvedGroup.codes.indexOf('AU') - resolvedGroup.codes.splice(auIndex, auIndex + 1) // throw away - const resolvedGroups = await getGroups({ fields, resolve: true }) - t.deepEqual(normalizeGroups(resolvedGroups), howResolvedGroupsShouldLook, 'should remove au from group 2') -}) - -async function getGroups (query = {}) { - const { - body - } = await agents.eyeshade.referrals.get('/v1/referrals/groups') - .query(query) - .expect(ok) - return normalizeGroups(body) -} - -function normalizeGroups (_body) { - const body = _body.slice(0).sort((a, b) => a.id > b.id ? 1 : -1) - for (let i = 0; i < body.length; i += 1) { - const group = body[i] - const { codes } = group - if (codes) { - group.codes = codes.slice(0).sort() - } - } - return body -} - -test('unable to insert a row with the same country code and created_at twice', async (t) => { - const { rows } = await t.context.runtime.postgres.query(` - select * - from geo_referral_countries - where country_code = 'US'`) - const us = rows[0] - await t.throwsAsync(async () => { - return t.context.runtime.postgres.query(` - insert into - geo_referral_countries(country_code, created_at, name, group_id) - values($1, $2, 'anyname', $3)`, ['US', +us.created_at, us.group_id]) - }, { instanceOf: Error }) -}) - -test('referrals should be insertable from the kafka queue', async (t) => { - const msgs = 10 - for (let i = 0; i < msgs; i += 1) { - const referral = utils.referral.create() - const buf = referrals.encode(referral) - await t.context.runtime.kafka.send(referrals.topic, buf) - } - await t.notThrowsAsync( - utils.transaction.ensureCount(t, msgs) - ) -}) - -test('messages are deduplicated', async t => { - const referralBase = JSON.stringify(utils.referral.create()) - const referral1 = JSON.parse(referralBase) - - const messages = [] - for (let i = 0; i < 5; i += 1) { - messages.push([]) - for (let j = 0; j < 10; j += 1) { - messages[i].push(referral1) - } - } - // a signal that messages have been processed - const endingReferral = utils.referral.create() - messages.push([endingReferral]) - - for (let i = 0; i < messages.length; i += 1) { - // send in blocks - await Promise.all(messages[i].map((msg) => ( - t.context.runtime.kafka.send( - referrals.topic, - referrals.encode(msg) - ) - ))) - await timeout(0) - } - const normalizedChannel = normalizeChannel(endingReferral.channelId) - const id = transaction.id.referral(endingReferral.transactionId, normalizedChannel) - await t.notThrowsAsync( - utils.transaction.ensureArrived(t, id) - ) - // 1 for the first transaction seen - // 1 for the ending transaction - await t.notThrowsAsync( - utils.transaction.ensureCount(t, 2) - ) -})