From b174b10642b5d66d86fa7086c1a81867c114c9b7 Mon Sep 17 00:00:00 2001 From: Michael Schmatz Date: Wed, 5 Feb 2014 15:07:15 -0800 Subject: [PATCH] Added message class for SQS, basic scoring routes --- server/commons/mapping.coffee | 1 + server/commons/queue.coffee | 67 +++++++++++++++++++++++------- server/routes/scoring.coffee | 78 ++++++++++++++++++++++++++++++----- 3 files changed, 122 insertions(+), 24 deletions(-) diff --git a/server/commons/mapping.coffee b/server/commons/mapping.coffee index a26717b58..cdf709b5e 100644 --- a/server/commons/mapping.coffee +++ b/server/commons/mapping.coffee @@ -38,4 +38,5 @@ module.exports.routes = 'routes/languages' 'routes/mail' 'routes/sprites' + 'routes/scoring' ] \ No newline at end of file diff --git a/server/commons/queue.coffee b/server/commons/queue.coffee index 1e53675eb..76a03a805 100644 --- a/server/commons/queue.coffee +++ b/server/commons/queue.coffee @@ -7,21 +7,12 @@ aws = require 'aws-sdk' db = require './../routes/db' mongoose = require 'mongoose' events = require 'events' - queueClient = undefined module.exports.scoringTaskQueue = undefined module.exports.sendwithusQueue = undefined -### -module.exports.setupRoutes = (app) -> - app.get '/multiplayer/' - queueClient.registerQueue "simulationQueue", {}, (err,data) -> - simulationQueue = data - simulationQueue.subscribe 'message', (err, data) -> - if data.Messages? - winston.info "Receieved message #{data.Messages?[0].Body}" - simulationQueue.deleteMessage data.Messages?[0].ReceiptHandle, -> - winston.info "Deleted message" -### + + + module.exports.initializeScoringTaskQueue = (cb) -> queueClient = module.exports.generateQueueClient() unless queueClient? @@ -48,13 +39,34 @@ module.exports.initializeSendwithusQueue = (cb) -> module.exports.generateQueueClient = -> - if config.isProduction + if config.queue.accessKeyId queueClient = new SQSQueueClient() else queueClient = new MongoQueueClient() return queueClient +class MessageObject + constructor: () -> + return + + getBody: -> + return + + getID: -> + return + removeFromQueue: -> + return + + requeue: -> + return + + changeMessageVisibilityTimeout: (secondsFromFunctionCall) -> + return + + + + class SQSQueueClient constructor: -> @configure() @@ -92,7 +104,10 @@ class SQSQueue extends events.EventEmitter receiveMessage: (callback) -> @sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) => - if err? then @emit 'error',err,data else @emit 'message',err,data + originalData = data + data = new SQSMessage originalData, this + + if err? then @emit 'error',err,originalData else @emit 'message',err,data callback? err,data deleteMessage: (receiptHandle, callback) -> @@ -100,6 +115,11 @@ class SQSQueue extends events.EventEmitter if err? then @emit 'error',err,data else @emit 'message',err,data callback? err,data + changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) -> + @sqs.changeMessageVisibility {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle, VisibilityTimeout: secondsFromNow}, (err, data) => + if err? then @emit 'error',err,data else @emit 'edited',err,data + callback? err,data + sendMessage: (messageBody, delaySeconds, callback) -> @sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) => @@ -111,6 +131,24 @@ class SQSQueue extends events.EventEmitter @receiveMessage (err, data) -> asyncCallback(null) + class SQSMessage extends MessageObject + constructor: (@originalMessage, @parentQueue) -> + + isEmpty: -> not @originalMessage.Messages?[0]? + + getBody: -> @originalMessage.Messages[0].Body + + getID: -> @originalMessage.Messages[0].MessageId + + removeFromQueue: (callback) -> parentQueue.deleteMessage @getReceiptHandle(), callback + + requeue: (callback) -> parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback + + changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) -> + parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback + + getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle + @@ -207,3 +245,4 @@ class MongoQueue extends events.EventEmitter + diff --git a/server/routes/scoring.coffee b/server/routes/scoring.coffee index da5279eec..8243a9afb 100644 --- a/server/routes/scoring.coffee +++ b/server/routes/scoring.coffee @@ -8,21 +8,44 @@ db = require './db' mongoose = require 'mongoose' events = require 'events' queues = require '../commons/queue' +LevelSession = require '../levels/sessions/LevelSession' -module.exports.connectToScoringQueue = -> queues.initializeScoringTaskQueue() unless queues.scoringTaskQueue? +connectToScoringQueue = -> + unless queues.scoringTaskQueue + queues.initializeScoringTaskQueue (err, data) -> + winston.info "Connected to scoring task queue!" unless err? -module.exports.setupRoutes = (app) -> +module.exports.setup = (app) -> + connectToScoringQueue() app.get '/scoring/queue', (req,res) -> #must also include the - queues.scoringTaskQueue.receieveMessage (err, data) -> - #once the data is recieved - - - levelAndUserCodeMapResponse = {} - sendResponseObject req,res,levelAndUserCodeMapResponse + queues.scoringTaskQueue.receiveMessage (err, message) -> + #check if message is empty!!!!!!!!! + if message.isEmpty() + sendResponseObject req, res, {"error":"No messages were received."} + else + constructTaskObject message.getBody(), (taskConstructionError, taskObject) -> + if taskConstructionError? + sendResponseObject req, res, {"error":taskConstructionError} + else + sendResponseObject req, res, taskObject + app.post '/scoring/queue', (req, res) -> clientResponseObject = req.body + ### + sampleClientResponseObject = + "processorUserID": "51eb2714fa058cb20d0006ef" #user ID of the person processing + "processingTime": 2745 #time in milliseconds + "processedSessionID": "52dfeb17c8b5f435c7000025" #the processed session + "processedSessionChangedTime": ISODate("2014-01-22T16:28:12.450Z") #to see if the session processed is the one in the database + "playerResults": [ + {"ID":"51eb2714fa058cb20d0006ef", "team":"humans","metrics": {"reachedGoal":false, "rank":2}} + {"ID":"51eb2714fa058cb20d00fedg", "team":"ogres","metrics": {"reachedGoal":true, "rank":1}} + ] + ### + + res.end("You posted an object to score!") @@ -35,7 +58,42 @@ sendResponseObject = (req,res,object) -> res.end() -constructTaskObject = (data, callback) -> - #task includes session ID, +constructTaskObject = (taskMessageBody, callback) -> + getSessionInformation taskMessageBody.sessionID, (err, sessionInformation) -> + return callback err, data if err? + taskObject = + "messageGenerated": Date.now() + "sessionID": sessionInformation.sessionID + "sessionChangedTime": sessionInformation.changed + "taskGeneratingPlayerID": sessionInformation.creator + "code": sessionInformation.code + "players": sessionInformation.players + callback err, taskObject + + ### + "players" : [ + {"ID":"51eb2714fa058cb20d0006ef", "team":"humans", "userCodeMap": "code goes here"} + {"ID":"51eb2714fa058cb20d00fedg", "team":"ogres","userCodeMap": "code goes here"} + ] + ### + + + +getSessionInformation = (sessionID, callback) -> + LevelSession.findOne {"_id": mongoose.Types.ObjectId(sessionID)}, (err, session) -> + if err? + callback err, {"error":"There was an error retrieving the session."} + else + sessionInformation = + "sessionID": session._id + "players": session.players + "code": session.code + "changed": session.changed + "creator": session.creator + callback err, sessionInformation + +processClientResponse = (clientResponseBody, callback) -> + return +