Skip to content

Commit

Permalink
feat(producer): add endpoint to flush jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Foyle committed Jul 6, 2019
1 parent 9559ffa commit 2d880fe
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions src/products/apps/producer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Queue = require('bull')
const express = require('express')

const redis = require("redis")
const app = express()
const port = 4000

Expand All @@ -27,34 +27,44 @@ const createQueue = (name) => {
const jobQueue = createQueue('get-product-events')

app.get('/', async (req, res) => {
// Parse the CSV rows
const products = await getProductsFromCSV(CSVLocation)
console.log(`${products.length} products parsed`)
try {
// Parse the CSV rows
const products = await getProductsFromCSV(CSVLocation)
console.log(`${products.length} products parsed`)

const missingCoreData = await analyseProductCoreData(products[0])
const missingCoreData = await analyseProductCoreData(products[0])

if (missingCoreData.length > 0) {
console.log(`Your CSV row is missing the following required Moltin product fields, any inserts will therefore fail:\n${missingCoreData}`)
}
// Check if there are fields in the row which don't exist
const extraFieldsToCreate = await analyseProducts(products[0])
if (missingCoreData.length > 0) {
console.log(`Your CSV row is missing the following required Moltin product fields, any inserts will therefore fail:\n${missingCoreData}`)
}
// Check if there are fields in the row which don't exist
const extraFieldsToCreate = await analyseProducts(products[0])

if (extraFieldsToCreate.length > 0) {
console.log('your Moltin account is missing some fields on products found in the CSV, creating those now')
let productsFlow = await findMoltinProductFlow()

if (extraFieldsToCreate.length > 0) {
console.log('your Moltin account is missing some fields on products found in the CSV, creating those now')
let productsFlow = await findMoltinProductFlow()
if (productsFlow === undefined) {
productsFlow = await createProductsFlow()
}

if (productsFlow === undefined) {
productsFlow = await createProductsFlow()
await createMissingFlowData(productsFlow.id, extraFieldsToCreate)
}

await createMissingFlowData(productsFlow.id, extraFieldsToCreate)
}
await addProductsToGetQueue(jobQueue, products)
const count = await jobQueue.count()
console.log(`job count is ${count}`)

await addProductsToGetQueue(jobQueue, products)
const count = await jobQueue.count()
console.log(`job count is ${count}`)
res.send('Done!')
} catch(e) {
res.send(JSON.stringify(e))
}
})

res.send('Done!')
app.get('/flushJobs', (req, res) => {
client = await redis.createClient()
await client.flushall()
res.send('all jobs have been flushed from Redis')
})

app.listen(port, () => console.log(`Producer app running on port ${port}`))

0 comments on commit 2d880fe

Please sign in to comment.