Skip to content

Commit

Permalink
Merge pull request #9 from moltin/refactor/release
Browse files Browse the repository at this point in the history
fix/release
  • Loading branch information
Matt Foyle authored Jul 8, 2019
2 parents bf61944 + 7420318 commit b07107c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 60 deletions.
File renamed without changes.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
"pretest": "./node_modules/.bin/eslint --ignore-path .gitignore . --fix"
},
"bin": {
"@moltin/import": "bin/import",
"import": "bin/import"
"@moltin/importer": "bin/import",
"importer": "bin/import"
},
"publishConfig": {
"access": "public"
Expand Down
2 changes: 1 addition & 1 deletion src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async function collectAndWriteEnvVars(options) {
try {
let collectedOptions = await promptForMissingOptions(options)
const relativeCsvPath = resolvePath(collectedOptions.csvPath)
collectedOptions = addToOptions({ csvPath: relativeCsvPath }, collectedOptions)
collectedOptions = await addToOptions({ csvPath: relativeCsvPath }, collectedOptions)
const isRedisDefault = await promptForRedisDefault()

if (isRedisDefault === 'no') {
Expand Down
108 changes: 53 additions & 55 deletions src/products/apps/consumer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint no-console: ["error", { allow: ["warn", "log"] }] */

import {
addProductToUpdateQueue,
addProductToInsertQueue,
Expand All @@ -18,15 +17,12 @@ import {

import arenaConfig from './utils/arenaConfig'

require('dotenv').config()

const Queue = require('bull')
const Arena = require('bull-arena')
const express = require('express')

const redisUrl = `redis://${process.env.redisHost}:${process.env.redisPort}`
const jobQueue = new Queue('get-product-events', redisUrl)
const updateJobQueue = new Queue('update-product-events', redisUrl)
const insertJobQueue = new Queue('insert-product-events', redisUrl)

// jobQueue.on('global:completed', (jobId, result) => {
// console.log(`Job ${jobId} completed! Result: ${result}`)
// jobQueue.getJob(jobId).then((job) => {
Expand Down Expand Up @@ -62,57 +58,59 @@ const insertJobQueue = new Queue('insert-product-events', redisUrl)
// })
// })


const getJobProcessor = job => new Promise(async (resolve, reject) => {
try {
const {
data: { product },
} = job

const products = await getProduct(product.sku)

if (products.length > 0) {
const existingProduct = products[0]
product.id = existingProduct.id
await addProductToUpdateQueue(updateJobQueue, product)
resolve(`${product.id} was added to update queue`)
} else {
await addProductToInsertQueue(insertJobQueue, product)
resolve(`${product.sku} was added to insert queue`)
export default function consumer() {
const redisUrl = `redis://${process.env.redisHost}:${process.env.redisPort}`
const jobQueue = new Queue('get-product-events', redisUrl)
const updateJobQueue = new Queue('update-product-events', redisUrl)
const insertJobQueue = new Queue('insert-product-events', redisUrl)

const getJobProcessor = job => new Promise(async (resolve, reject) => {
try {
const {
data: { product },
} = job

const products = await getProduct(product.sku)
if (products.length > 0) {
const existingProduct = products[0]
product.id = existingProduct.id
await addProductToUpdateQueue(updateJobQueue, product)
resolve(`${product.id} was added to update queue`)
} else {
await addProductToInsertQueue(insertJobQueue, product)
resolve(`${product.sku} was added to insert queue`)
}
} catch (e) {
reject(JSON.stringify(e))
}
})

const updateJobProcessor = job => new Promise(async (resolve, reject) => {
try {
const id = await findProductId(job.data.updatedProduct)
const formattedProduct = await formatProductForUpdate(id, job.data.updatedProduct)
await updateProduct(formattedProduct)
resolve(`${formattedProduct.id} was updated`)
resolve()
} catch (errorMessage) {
console.log(errorMessage)
await handleFailedUpdateJob(updateJobQueue, job, errorMessage)
reject(new Error(JSON.stringify(errorMessage)))
}
} catch (e) {
reject(JSON.stringify(e))
}
})

const updateJobProcessor = job => new Promise(async (resolve, reject) => {
try {
const id = await findProductId(job.data.updatedProduct)
const formattedProduct = await formatProductForUpdate(id, job.data.updatedProduct)
await updateProduct(formattedProduct)
resolve(`${formattedProduct.id} was updated`)
resolve()
} catch (errorMessage) {
console.log(errorMessage)
await handleFailedUpdateJob(updateJobQueue, job, errorMessage)
reject(new Error(JSON.stringify(errorMessage)))
}
})

const insertProductProcessor = job => new Promise(async (resolve, reject) => {
try {
const formattedProduct = await formatProductForInsert(job.data.product)
await insertProduct(formattedProduct)
resolve(`${formattedProduct.sku} was inserted`)
} catch (errorMessage) {
console.log(JSON.stringify(errorMessage))
await handleFailedInsertJob(insertJobQueue, job, errorMessage)
reject(new Error(JSON.stringify(errorMessage)))
}
})
})

const insertProductProcessor = job => new Promise(async (resolve, reject) => {
try {
const formattedProduct = await formatProductForInsert(job.data.product)
await insertProduct(formattedProduct)
resolve(`${formattedProduct.sku} was inserted`)
} catch (errorMessage) {
await handleFailedInsertJob(insertJobQueue, job, errorMessage)
reject(new Error(JSON.stringify(errorMessage)))
}
})

export default function consumer() {
const arena = Arena(arenaConfig.config)
const arena = Arena(arenaConfig)

const app = express()
const port = 3000
Expand Down
4 changes: 2 additions & 2 deletions src/products/apps/utils/moltinUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ export async function insertProduct(product) {
}

export async function getProduct(product) {
Moltin.Products.Filter({ eq: { sku: product } })
const { data } = await Moltin.Products.Filter({ eq: { sku: product } })
.All()
.then(data => data.data)
return data
}

export async function createProductsFlow() {
Expand Down

0 comments on commit b07107c

Please sign in to comment.