Added message class for SQS, basic scoring routes

This commit is contained in:
Michael Schmatz 2014-02-05 15:07:15 -08:00
parent 308f34bc76
commit b174b10642
3 changed files with 122 additions and 24 deletions

View file

@ -38,4 +38,5 @@ module.exports.routes =
'routes/languages'
'routes/mail'
'routes/sprites'
'routes/scoring'
]

View file

@ -7,21 +7,12 @@ aws = require 'aws-sdk'
db = require './../routes/db'
mongoose = require 'mongoose'
events = require 'events'
queueClient = undefined
module.exports.scoringTaskQueue = undefined
module.exports.sendwithusQueue = undefined
###
module.exports.setupRoutes = (app) ->
app.get '/multiplayer/'
queueClient.registerQueue "simulationQueue", {}, (err,data) ->
simulationQueue = data
simulationQueue.subscribe 'message', (err, data) ->
if data.Messages?
winston.info "Receieved message #{data.Messages?[0].Body}"
simulationQueue.deleteMessage data.Messages?[0].ReceiptHandle, ->
winston.info "Deleted message"
###
module.exports.initializeScoringTaskQueue = (cb) ->
queueClient = module.exports.generateQueueClient() unless queueClient?
@ -48,13 +39,34 @@ module.exports.initializeSendwithusQueue = (cb) ->
module.exports.generateQueueClient = ->
if config.isProduction
if config.queue.accessKeyId
queueClient = new SQSQueueClient()
else
queueClient = new MongoQueueClient()
return queueClient
class MessageObject
constructor: () ->
return
getBody: ->
return
getID: ->
return
removeFromQueue: ->
return
requeue: ->
return
changeMessageVisibilityTimeout: (secondsFromFunctionCall) ->
return
class SQSQueueClient
constructor: ->
@configure()
@ -92,7 +104,10 @@ class SQSQueue extends events.EventEmitter
receiveMessage: (callback) ->
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
originalData = data
data = new SQSMessage originalData, this
if err? then @emit 'error',err,originalData else @emit 'message',err,data
callback? err,data
deleteMessage: (receiptHandle, callback) ->
@ -100,6 +115,11 @@ class SQSQueue extends events.EventEmitter
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
@sqs.changeMessageVisibility {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle, VisibilityTimeout: secondsFromNow}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'edited',err,data
callback? err,data
sendMessage: (messageBody, delaySeconds, callback) ->
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
@ -111,6 +131,24 @@ class SQSQueue extends events.EventEmitter
@receiveMessage (err, data) ->
asyncCallback(null)
class SQSMessage extends MessageObject
constructor: (@originalMessage, @parentQueue) ->
isEmpty: -> not @originalMessage.Messages?[0]?
getBody: -> @originalMessage.Messages[0].Body
getID: -> @originalMessage.Messages[0].MessageId
removeFromQueue: (callback) -> parentQueue.deleteMessage @getReceiptHandle(), callback
requeue: (callback) -> parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle
@ -207,3 +245,4 @@ class MongoQueue extends events.EventEmitter

View file

@ -8,21 +8,44 @@ db = require './db'
mongoose = require 'mongoose'
events = require 'events'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
module.exports.connectToScoringQueue = -> queues.initializeScoringTaskQueue() unless queues.scoringTaskQueue?
connectToScoringQueue = ->
unless queues.scoringTaskQueue
queues.initializeScoringTaskQueue (err, data) ->
winston.info "Connected to scoring task queue!" unless err?
module.exports.setupRoutes = (app) ->
module.exports.setup = (app) ->
connectToScoringQueue()
app.get '/scoring/queue', (req,res) ->
#must also include the
queues.scoringTaskQueue.receieveMessage (err, data) ->
#once the data is recieved
queues.scoringTaskQueue.receiveMessage (err, message) ->
#check if message is empty!!!!!!!!!
if message.isEmpty()
sendResponseObject req, res, {"error":"No messages were received."}
else
constructTaskObject message.getBody(), (taskConstructionError, taskObject) ->
if taskConstructionError?
sendResponseObject req, res, {"error":taskConstructionError}
else
sendResponseObject req, res, taskObject
levelAndUserCodeMapResponse = {}
sendResponseObject req,res,levelAndUserCodeMapResponse
app.post '/scoring/queue', (req, res) ->
clientResponseObject = req.body
###
sampleClientResponseObject =
"processorUserID": "51eb2714fa058cb20d0006ef" #user ID of the person processing
"processingTime": 2745 #time in milliseconds
"processedSessionID": "52dfeb17c8b5f435c7000025" #the processed session
"processedSessionChangedTime": ISODate("2014-01-22T16:28:12.450Z") #to see if the session processed is the one in the database
"playerResults": [
{"ID":"51eb2714fa058cb20d0006ef", "team":"humans","metrics": {"reachedGoal":false, "rank":2}}
{"ID":"51eb2714fa058cb20d00fedg", "team":"ogres","metrics": {"reachedGoal":true, "rank":1}}
]
###
res.end("You posted an object to score!")
@ -35,7 +58,42 @@ sendResponseObject = (req,res,object) ->
res.end()
constructTaskObject = (data, callback) ->
#task includes session ID,
constructTaskObject = (taskMessageBody, callback) ->
getSessionInformation taskMessageBody.sessionID, (err, sessionInformation) ->
return callback err, data if err?
taskObject =
"messageGenerated": Date.now()
"sessionID": sessionInformation.sessionID
"sessionChangedTime": sessionInformation.changed
"taskGeneratingPlayerID": sessionInformation.creator
"code": sessionInformation.code
"players": sessionInformation.players
callback err, taskObject
###
"players" : [
{"ID":"51eb2714fa058cb20d0006ef", "team":"humans", "userCodeMap": "code goes here"}
{"ID":"51eb2714fa058cb20d00fedg", "team":"ogres","userCodeMap": "code goes here"}
]
###
getSessionInformation = (sessionID, callback) ->
LevelSession.findOne {"_id": mongoose.Types.ObjectId(sessionID)}, (err, session) ->
if err?
callback err, {"error":"There was an error retrieving the session."}
else
sessionInformation =
"sessionID": session._id
"players": session.players
"code": session.code
"changed": session.changed
"creator": session.creator
callback err, sessionInformation
processClientResponse = (clientResponseBody, callback) ->
return