diff --git a/package.json b/package.json index 90dc6f16d..20c2e90da 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,9 @@ "stream-buffers": "0.2.x", "sendwithus": "2.0.x", "aws-sdk":"~2.0.0", - "bayesian-battle":"0.0.x" + "bayesian-battle":"0.0.x", + "hiredis":"", + "redis": "" }, "devDependencies": { "jade": "0.33.x", diff --git a/server/queues/scoring.coffee b/server/queues/scoring.coffee index 5607d0ae8..24633b87f 100644 --- a/server/queues/scoring.coffee +++ b/server/queues/scoring.coffee @@ -19,78 +19,138 @@ module.exports.setup = (app) -> connectToScoringQueue() connectToScoringQueue = -> queues.initializeQueueClient -> - queues.queueClient.registerQueue "scoring", {}, (err,data) -> - throwScoringQueueRegistrationError(err) if err? + queues.queueClient.registerQueue "scoring", {}, (error,data) -> + if error? then throw new Error "There was an error registering the scoring queue: #{error}" scoringTaskQueue = data log.info "Connected to scoring task queue!" -throwScoringQueueRegistrationError = (error) -> - log.error "There was an error registering the scoring queue: #{error}" - throw new Error "There was an error registering the scoring queue." - module.exports.createNewTask = (req, res) -> - return errors.forbidden res, "You need to be logged in to be added to the leaderboard" if isUserAnonymous req - return errors.badInput res, "The session ID is invalid" unless typeof req.body.session is "string" - LevelSession.findOne { "_id": req.body.session}, (err, sessionToScore) -> - return errors.serverError res, "There was an error finding the given session." if err? - sessionToScore = sessionToScore.toJSON() - console.log "Ranking session of team #{sessionToScore.team}" + requestSessionID = getSessionIDFromRequest req + if isUserAnonymous req then return errors.forbidden res, "You need to be logged in to be added to the leaderboard" + return errors.badInput res, "The session ID is invalid" unless requestSessionID is "string" - LevelSession.update { "_id": req.body.session}, {"submitted":true}, (err, data) -> - return errors.serverError res, "There was an error saving the submitted bool of the session." if err? - LevelSession.find { "levelID": "project-dota", "submitted": true}, (err, submittedSessions) -> - taskPairs = [] - for session in submittedSessions - session = session.toObject() - console.log "Attemping to add session of team #{session.team} to taskPairs..." - if String(session._id) isnt req.body.session and session.team isnt sessionToScore.team and session.team in ["ogres","humans"] - console.log "Adding game to taskPairs!" - taskPairs.push [req.body.session,String session._id] - async.each taskPairs, sendTaskPairToQueue, (taskPairError) -> - return errors.serverError res, "There was an error sending the task pairs to the queue" if taskPairError? - sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"} + fetchSubmittedSession requestSessionID, (err, sessionToScore) -> + if err? then return errors.serverError res, "There was an error finding the given session." + updateSubmittedSession sessionToScore, (err, data) -> + if err? then return errors.serverError res, "There was an error updating the session" + + fetchSessionsToRankAgainst (err, submittedSessions) -> + if err? then return errors.serverError res, "There was an error fetching the sessions to rank against" + + taskPairs = generateTaskPairs(submittedSessions, sessionToScore) + sendEachTaskPairToTheQueue taskPairs, (taskPairError) -> + if taskPairError? then return errors.serverError res, "There was an error sending the task pairs to the queue" + + sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"} + +module.exports.dispatchTaskToConsumer = (req, res) -> + if isUserAnonymous(req) then return errors.forbidden res, "You need to be logged in to simulate games" + + scoringTaskQueue.receiveMessage (err, message) -> + if err? or not messageIsInvalid(message) then return errors.gatewayTimeoutError res, "Queue Receive Error:#{err}" + + messageBody = parseTaskQueueMessage req, res, message + return unless messageBody? + + constructTaskObject messageBody, (taskConstructionError, taskObject) -> + if taskConstructionError? then return errors.serverError res, "There was an error constructing the scoring task" + + message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) -> + if err? then return errors.serverError res, "There was an error changing the message visibility timeout." + + constructTaskLogObject getUserIDFromRequest(req),message.getReceiptHandle(), (taskLogError, taskLogObject) -> + if taskLogError? then return errors.serverError res, "There was an error creating the task log object." + + taskObject.taskID = taskLogObject._id + taskObject.receiptHandle = message.getReceiptHandle() + + sendResponseObject req, res, taskObject + +module.exports.processTaskResult = (req, res) -> + clientResponseObject = verifyClientResponse req.body, res + return unless clientResponseObject? + + taskLogQuery = _id: clientResponseObject.taskID + + TaskLog.findOne taskLogQuery, (err, taskLog) -> + return errors.serverError res, "There was an error retrieiving the task log object" if err? + + taskLogJSON = taskLog.toObject() + + return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS + return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate + + destroyQueueMessage clientResponseObject.receiptHandle, (err) -> + return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err? + + logTaskComputation clientResponseObject, taskLog, (loggingErr) -> + return errors.serverError res, "There as a problem logging the task computation: #{loggingErr}" if loggingErr? + + updateSessions clientResponseObject, (updateError, newScores) -> + return errors.serverError res, "There was an error updating the scores.#{updateError}" if updateError? + + sendResponseObject req, res, {"message":"The scores were updated successfully!"} + + +messageIsInvalid = (message) -> (not message?) or message.isEmpty() + +getSessionIDFromRequest = (req) -> req.body.session + +sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback + +fetchSubmittedSession = (submittedSessionID, callback) -> + sessionQuery = + _id: submittedSessionID + LevelSession.findOne sessionQuery, (err, session) -> callback err, session?.toObject() + + +updateSubmittedSession = (sessionToUpdate, callback) -> + sessionQuery = + _id: sessionToUpdate._id + + sessionUpdateObject = + submitted: true + submittedCode: sessionToUpdate.code + submitDate: new Date() + + LevelSession.update sessionQuery, sessionUpdateObject, callback + +fetchSessionsToRankAgainst = (callback) -> + submittedSessionsQuery = + levelID: "project-dota" + submitted: true + submittedCode: + $exists: true + + LevelSession.find submittedSessionsQuery, callback + + +generateTaskPairs = (submittedSessions, sessionToScore) -> + taskPairs = [] + for session in submittedSessions + session = session.toObject() + + teams = ['ogres','humans'] + opposingTeams = _.pull teams, sessionToScore.team + + if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams + console.log "Adding game to taskPairs!" + taskPairs.push [req.body.session,String session._id] + taskPairs sendTaskPairToQueue = (taskPair, callback) -> taskObject = sessions: taskPair scoringTaskQueue.sendMessage taskObject, 0, (err,data) -> - callback err,data - -module.exports.dispatchTaskToConsumer = (req, res) -> - userID = getUserIDFromRequest req,res - return errors.forbidden res, "You need to be logged in to simulate games" if isUserAnonymous req - - scoringTaskQueue.receiveMessage (taskQueueReceiveError, message) -> - if (not message?) or message.isEmpty() or taskQueueReceiveError? - return errors.gatewayTimeoutError res, "No messages were receieved from the queue. Msg:#{taskQueueReceiveError}" - - - messageBody = parseTaskQueueMessage req, res, message - return errors.serverError res, "There was an error parsing the queue message" unless messageBody? - - constructTaskObject messageBody, (taskConstructionError, taskObject) -> - return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError? - - message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds - - constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) -> - return errors.serverError res, "There was an error creating the task log object." if taskLogError? - - setTaskObjectTaskLogID taskObject, taskLogObject._id - - taskObject.receiptHandle = message.getReceiptHandle() - - sendResponseObject req, res, taskObject + callback? err,data getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null - isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true - parseTaskQueueMessage = (req, res, message) -> try if typeof message.getBody() is "object" then return message.getBody() @@ -110,10 +170,10 @@ constructTaskObject = (taskMessageBody, callback) -> for session in sessions sessionInformation = - "sessionID": session.sessionID - "sessionChangedTime": session.changed + "sessionID": session._id + "submitDate": session.submitDate "team": session.team ? "No team" - "code": session.code + "code": session.submittedCode "teamSpells": session.teamSpells ? {} "levelID": session.levelID @@ -122,19 +182,15 @@ constructTaskObject = (taskMessageBody, callback) -> getSessionInformation = (sessionIDString, callback) -> - LevelSession.findOne {"_id": sessionIDString }, (err, session) -> - return callback err, {"error":"There was an error retrieving the session."} if err? + sessionQuery + _id:sessionIDString - session = session.toObject() - sessionInformation = - "sessionID": session._id - "code": _.cloneDeep session.code - "changed": session.changed - "creator": session.creator - "team": session.team - "teamSpells": session.teamSpells - "levelID": session.levelID + LevelSession.findOne sessionQuery, (err, session) -> + levelSessionFindOneError = + "error":"There was an error retrieving the session." + return callback err, levelSessionFindOneError if err? + sessionInformation = session.toObject() callback err, sessionInformation @@ -147,45 +203,14 @@ constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) - taskLogObject.save callback - -setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID - - sendResponseObject = (req,res,object) -> res.setHeader('Content-Type', 'application/json') res.send(object) res.end() -module.exports.processTaskResult = (req, res) -> - clientResponseObject = verifyClientResponse req.body, res - - if clientResponseObject? - TaskLog.findOne {"_id": clientResponseObject.taskID}, (err, taskLog) -> - return errors.serverError res, "There was an error retrieiving the task log object" if err? - - taskLogJSON = taskLog.toObject() - - return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS - return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate - destroyQueueMessage clientResponseObject.receiptHandle, (err) -> - return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err? - - logTaskComputation clientResponseObject, taskLog, (loggingError) -> - if loggingError? - return errors.serverError res, "There as a problem logging the task computation: #{loggingError}" - - updateScores clientResponseObject, (updatingScoresError, newScores) -> - if updatingScoresError? - return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}" - - sendResponseObject req, res, {"message":"The scores were updated successfully!"} - - - hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now() - handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout" destroyQueueMessage = (receiptHandle, callback) -> scoringTaskQueue.deleteMessage receiptHandle, callback @@ -197,17 +222,15 @@ verifyClientResponse = (responseObject, res) -> else responseObject - logTaskComputation = (taskObject,taskLogObject, callback) -> taskLogObject.calculationTimeMS = taskObject.calculationTimeMS taskLogObject.sessions = taskObject.sessions taskLogObject.save callback - -updateScores = (taskObject,callback) -> +updateSessions = (taskObject,callback) -> sessionIDs = _.pluck taskObject.sessions, 'sessionID' - async.map sessionIDs, retrieveOldScoreMetrics, (err, oldScores) -> + async.map sessionIDs, retrieveOldSessionData, (err, oldScores) -> callback err, {"error": "There was an error retrieving the old scores"} if err? oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores @@ -245,7 +268,7 @@ putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) -> scoreObject -retrieveOldScoreMetrics = (sessionID, callback) -> +retrieveOldSessionData = (sessionID, callback) -> sessionQuery = "_id":sessionID @@ -267,68 +290,3 @@ retrieveOldScoreMetrics = (sessionID, callback) -> -###Sample Messages -sampleQueueMessage = - { - "sessions": ["52dea9b77e486eeb97000001","52d981a73cf02dcf260003cb"] - } - -sampleUndoneTaskObject = - "taskID": "507f191e810c19729de860ea" - "sessions" : [ - { - "ID":"52dfeb17c8b5f435c7000025" - "sessionChangedTime": "2014-01-22T16:28:12.450Z" - "team":"humans" - "code": "code goes here" - }, - { - "ID":"51eb2714fa058cb20d00fedg" - "sessionChangedTime": "2014-01-22T16:28:12.450Z" - "team":"ogres" - "code": "code goes here" - } - ] -sampleResponseObject = - "taskID": "507f191e810c19729de860ea" - "calculationTime":3201 - "sessions": [ - { - "ID":"52dfeb17c8b5f435c7000025" - "sessionChangedTime": "2014-01-22T16:28:12.450Z" - "metrics": { - "rank":2 - } - }, - { - "ID":"51eb2714fa058cb20d00fedg" - "sessionChangedTime": "2014-01-22T16:28:12.450Z" - "metrics": { - "rank":1 - } - } - ] - -sampleTaskLogObject= -{ - "_id":ObjectId("507f191e810c19729de860ea") #datestamp is built into objectId - "calculatedBy":ObjectId("51eb2714fa058cb20d0006ef") - "calculationTime":3201 - timedOut: false - "sessions":[ - { - "ID":ObjectId("52dfeb17c8b5f435c7000025") - "metrics": { - "rank":2 - } - }, - { - "ID":ObjectId("51eb2714fa058cb20d00feda") - "metrics": { - "rank":1 - } - } - ] -} - -### \ No newline at end of file