Refactored queue files and moved scoring queue out of commons queue

This commit is contained in:
Michael Schmatz 2014-02-06 14:32:35 -08:00
parent 609547a37c
commit 24043eaee9
4 changed files with 37 additions and 41 deletions

View file

@ -7,22 +7,16 @@ aws = require 'aws-sdk'
db = require './../routes/db'
mongoose = require 'mongoose'
events = require 'events'
queueClient = undefined
module.exports.scoringTaskQueue = undefined
module.exports.queueClient = undefined
module.exports.sendwithusQueue = undefined
module.exports.initializeScoringTaskQueue = (cb) ->
queueClient = module.exports.generateQueueClient() unless queueClient?
queueClient.registerQueue "scoring", {}, (err,data) ->
if err?
winston.error "There was an error registering the scoring queue."
throw new Error "There was an error registering the scoring queue."
module.exports.scoringTaskQueue = data
cb? err, module.exports.scoringQueue
module.exports.initializeQueueClient = (cb) ->
module.exports.queueClient = module.exports.generateQueueClient() unless queueClient?
cb?()
module.exports.initializeSendwithusQueue = (cb) ->
queueClient = module.exports.generateQueueClient() unless queueClient?
@ -131,23 +125,26 @@ class SQSQueue extends events.EventEmitter
@receiveMessage (err, data) ->
asyncCallback(null)
class SQSMessage extends MessageObject
constructor: (@originalMessage, @parentQueue) ->
class SQSMessage extends MessageObject
constructor: (@originalMessage, @parentQueue) ->
isEmpty: -> not @originalMessage.Messages?[0]?
isEmpty: -> not @originalMessage.Messages?[0]?
getBody: -> @originalMessage.Messages[0].Body
getBody: -> @originalMessage.Messages[0].Body
getID: -> @originalMessage.Messages[0].MessageId
getID: -> @originalMessage.Messages[0].MessageId
removeFromQueue: (callback) -> parentQueue.deleteMessage @getReceiptHandle(), callback
removeFromQueue: (callback) -> parentQueue.deleteMessage @getReceiptHandle(), callback
requeue: (callback) -> parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
requeue: (callback) -> parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle

View file

@ -10,22 +10,33 @@ events = require 'events'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
scoringTaskQueue = undefined
connectToScoringQueue = ->
unless queues.scoringTaskQueue
queues.initializeScoringTaskQueue (err, data) ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (err,data) ->
if err?
winston.error "There was an error registering the scoring queue."
throw new Error "There was an error registering the scoring queue."
scoringTaskQueue = data
winston.info "Connected to scoring task queue!" unless err?
module.exports.setup = (app) ->
connectToScoringQueue()
module.exports.dispatchTaskToConsumer = (req, res) ->
queues.scoringTaskQueue.receiveMessage (err, message) ->
scoringTaskQueue.receiveMessage (err, message) ->
###Using Test Message
{ "sessionID": "52dfeb17c8b5f435c7000025" }
###
if message.isEmpty()
#TODO: Set message code as 504 Gateway Timeout
sendResponseObject req, res, {"error":"No messages were received."}
else
messageBody = JSON.parse message.getBody()
try
messageBody = JSON.parse message.getBody()
catch
return sendResponseObject req, res, {"error":"There was an error parsing the task."}
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
if taskConstructionError?
sendResponseObject req, res, {"error":taskConstructionError}
@ -34,21 +45,8 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
module.exports.processTaskResult = (req, res) ->
clientResponseObject = JSON.parse req.body
#clientResponseObject = JSON.parse req.body
res.end("You posted an object to score!")
###
sampleClientResponseObject =
"processorUserID": "51eb2714fa058cb20d0006ef" #user ID of the person processing
"processingTime": 2745 #time in milliseconds
"processedSessionID": "52dfeb17c8b5f435c7000025" #the processed session
"processedSessionChangedTime": ISODate("2014-01-22T16:28:12.450Z") #to see if the session processed is the one in the database
"playerResults": [
{"ID":"51eb2714fa058cb20d0006ef", "team":"humans","metrics": {"reachedGoal":false, "rank":2}}
{"ID":"51eb2714fa058cb20d00fedg", "team":"ogres","metrics": {"reachedGoal":true, "rank":1}}
]
###
sendResponseObject = (req,res,object) ->

View file

View file

@ -19,6 +19,7 @@ module.exports.setup = (app) ->
else
sendMethodNotSupportedError req, res
catch error
log.error error
sendQueueNotFoundError req, res
setResponseHeaderToJSONContentType = (res) -> res.setHeader('Content-Type', 'application/json')