diff --git a/server/queues/scoring.coffee b/server/queues/scoring.coffee index 327abf949..a316674c2 100644 --- a/server/queues/scoring.coffee +++ b/server/queues/scoring.coffee @@ -43,17 +43,17 @@ module.exports.addPairwiseTaskToQueueFromRequest = (req, res) -> addPairwiseTaskToQueue = (taskPair, cb) -> LevelSession.findOne(_id:taskPair[0]).lean().exec (err, firstSession) => - if err? then return cb err, false + if err? then return cb err LevelSession.find(_id:taskPair[1]).exec (err, secondSession) => - if err? then return cb err, false + if err? then return cb err try taskPairs = generateTaskPairs(secondSession, firstSession) catch e - if e then return cb e, false + if e then return cb e sendEachTaskPairToTheQueue taskPairs, (taskPairError) -> - if taskPairError? then return cb taskPairError,false - cb null, true + if taskPairError? then return cb taskPairError + cb null module.exports.resimulateAllSessions = (req, res) -> unless isUserAdmin req then return errors.unauthorized res, "Unauthorized. Even if you are authorized, you shouldn't do this" @@ -68,8 +68,8 @@ module.exports.resimulateAllSessions = (req, res) -> majorVersion: levelMajorVersion query = LevelSession - .find(findParameters) - .lean() + .find(findParameters) + .lean() query.exec (err, result) -> if err? then return errors.serverError res, err @@ -107,7 +107,7 @@ module.exports.createNewTask = (req, res) -> originalLevelID = req.body.originalLevelID currentLevelID = req.body.levelID requestLevelMajorVersion = parseInt(req.body.levelMajorVersion) - + async.waterfall [ validatePermissions.bind(@,req,requestSessionID) fetchAndVerifyLevelType.bind(@,currentLevelID) @@ -115,12 +115,12 @@ module.exports.createNewTask = (req, res) -> updateSessionToSubmit fetchInitialSessionsToRankAgainst.bind(@, requestLevelMajorVersion, originalLevelID) generateAndSendTaskPairsToTheQueue - + ], (err, successMessageObject) -> if err? then return errors.serverError res, "There was an error submitting the game to the queue:#{err}" sendResponseObject req, res, successMessageObject - + validatePermissions = (req,sessionID, callback) -> if isUserAnonymous req then return callback "You are unauthorized to submit that game to the simulator" if isUserAdmin req then return callback null @@ -136,7 +136,7 @@ validatePermissions = (req,sessionID, callback) -> query.exec (err, retrievedSession) -> if err? then return callback err userHasPermissionToSubmitCode = retrievedSession.creator is req.user?.id and - not _.isEqual(retrievedSession.code, retrievedSession.submittedCode) + not _.isEqual(retrievedSession.code, retrievedSession.submittedCode) unless userHasPermissionToSubmitCode then return callback "You are unauthorized to submit that game to the simulator" callback null @@ -209,7 +209,7 @@ generateAndSendTaskPairsToTheQueue = (sessionToRankAgainst,submittedSession, cal sendEachTaskPairToTheQueue taskPairs, (taskPairError) -> if taskPairError? then return callback taskPairError callback null, {"message": "All task pairs were succesfully sent to the queue"} - + module.exports.dispatchTaskToConsumer = (req, res) -> async.waterfall [ @@ -221,7 +221,7 @@ module.exports.dispatchTaskToConsumer = (req, res) -> constructTaskLogObject.bind(@, getUserIDFromRequest(req)) processTaskObject ], (err, taskObjectToSend) -> - if err? + if err? if typeof err is "string" and err.indexOf "No more games in the queue" isnt -1 res.send(204, "No games to score.") return res.end() @@ -229,16 +229,16 @@ module.exports.dispatchTaskToConsumer = (req, res) -> return errors.serverError res, "There was an error dispatching the task: #{err}" sendResponseObject req, res, taskObjectToSend - - + + checkSimulationPermissions = (req, cb) -> - if isUserAnonymous req + if isUserAnonymous req cb "You need to be logged in to simulate games" else cb null - + receiveMessageFromSimulationQueue = (cb) -> - scoringTaskQueue.receiveMessage (err, message) -> + scoringTaskQueue.receiveMessage (err, message) -> if err? then return cb "No more games in the queue, error:#{err}" if messageIsInvalid(message) then return cb "Message received from queue is invalid" cb null, message @@ -292,90 +292,190 @@ processTaskObject = (taskObject,taskLogObject, message, cb) -> cb null, taskObject getSessionInformation = (sessionIDString, callback) -> - findParameters = + findParameters = _id: sessionIDString selectString = 'submitDate team submittedCode teamSpells levelID creator creatorName' query = LevelSession - .findOne(findParameters) - .select(selectString) - .lean() - + .findOne(findParameters) + .select(selectString) + .lean() + query.exec (err, session) -> if err? then return callback err, {"error":"There was an error retrieving the session."} callback null, session module.exports.processTaskResult = (req, res) -> - clientResponseObject = verifyClientResponse req.body, res + async.waterfall [ + verifyClientResponse.bind(@,req.body) + fetchTaskLog.bind(@) + checkTaskLog.bind(@) + deleteQueueMessage.bind(@) + fetchLevelSession.bind(@) + checkSubmissionDate.bind(@) + logTaskComputation.bind(@) + updateSessions.bind(@) + indexNewScoreArray.bind(@) + addMatchToSessions.bind(@) + updateUserSimulationCounts.bind(@, req.user._id) + determineIfSessionShouldContinueAndUpdateLog.bind(@) + findNearestBetterSessionID.bind(@) + addNewSessionsToQueue.bind(@) + ], (err, results) -> + if err is "shouldn't continue" + sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"} + else if err is "no session was found" + sendResponseObject req, res, {"message":"There were no more games to rank (game is at top)!"} + else if err? + errors.serverError res, "There was an error:#{err}" + else + sendResponseObject req, res, {"message":"The scores were updated successfully and more games were sent to the queue!"} - return unless clientResponseObject? - TaskLog.findOne {_id: clientResponseObject.taskID}, (err, taskLog) -> - return errors.serverError res, "There was an error retrieiving the task log object" if err? +verifyClientResponse = (responseObject, callback) -> + #TODO: better verification + unless typeof responseObject is "object" + callback "The response to that query is required to be a JSON object." + else + @clientResponseObject = responseObject + log.info "Verified client response!" + callback null, responseObject - taskLogJSON = taskLog.toObject() +fetchTaskLog = (responseObject, callback) -> + findParameters = + _id: responseObject.taskID + query = TaskLog + .findOne(findParameters) + query.exec (err, taskLog) => + @taskLog = taskLog + log.info "Fetched task log!" + callback err, taskLog.toObject() - return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS - return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate +checkTaskLog = (taskLog, callback) -> + if taskLog.calculationTimeMS then return callback "That computational task has already been performed" + if hasTaskTimedOut taskLog.sentDate then return callback "The task has timed out" + log.info "Checked task log" + callback null - scoringTaskQueue.deleteMessage clientResponseObject.receiptHandle, (err) -> - console.log "Deleted message." - if err? then return errors.badInput res, "The queue message is already back in the queue, rejecting results." +deleteQueueMessage = (callback) -> + scoringTaskQueue.deleteMessage @clientResponseObject.receiptHandle, (err) -> + log.info "Deleted queue message" + callback err - LevelSession.findOne(_id: clientResponseObject.originalSessionID).lean().exec (err, levelSession) -> - if err? then return errors.serverError res, "There was a problem finding the level session:#{err}" - - supposedSubmissionDate = new Date(clientResponseObject.sessions[0].submitDate) - - if Number(supposedSubmissionDate) isnt Number(levelSession.submitDate) - return sendResponseObject req, res, {"message":"The game has been resubmitted. Removing from queue..."} - - logTaskComputation clientResponseObject, taskLog, (logErr) -> - if logErr? then return errors.serverError res, "There as a problem logging the task computation: #{logErr}" - - updateSessions clientResponseObject, (updateError, newScoreArray) -> - if updateError? then return errors.serverError res, "There was an error updating the scores.#{updateError}" - - newScoresObject = _.indexBy newScoreArray, 'id' - - addMatchToSessions clientResponseObject, newScoresObject, (err, data) -> - if err? then return errors.serverError res, "There was an error updating the sessions with the match! #{JSON.stringify err}" - - incrementUserSimulationCount req.user._id, 'simulatedBy' - incrementUserSimulationCount levelSession.creator, 'simulatedFor' - - originalSessionID = clientResponseObject.originalSessionID - originalSessionTeam = clientResponseObject.originalSessionTeam - originalSessionRank = parseInt clientResponseObject.originalSessionRank - - determineIfSessionShouldContinueAndUpdateLog originalSessionID, originalSessionRank, (err, sessionShouldContinue) -> - if err? then return errors.serverError res, "There was an error determining if the session should continue, #{err}" - - if sessionShouldContinue - opposingTeam = calculateOpposingTeam(originalSessionTeam) - opponentID = _.pull(_.keys(newScoresObject), originalSessionID) - sessionNewScore = newScoresObject[originalSessionID].totalScore - opponentNewScore = newScoresObject[opponentID].totalScore - - levelOriginalID = levelSession.level.original - levelOriginalMajorVersion = levelSession.level.majorVersion - findNearestBetterSessionID levelOriginalID, levelOriginalMajorVersion, originalSessionID, sessionNewScore, opponentNewScore, opponentID, opposingTeam, (err, opponentSessionID) -> - if err? then return errors.serverError res, "There was an error finding the nearest sessionID!" - if opponentSessionID - addPairwiseTaskToQueue [originalSessionID, opponentSessionID], (err, success) -> - if err? then return errors.serverError res, "There was an error sending the pairwise tasks to the queue!" - sendResponseObject req, res, {"message":"The scores were updated successfully and more games were sent to the queue!"} - else - LevelSession.update {_id: originalSessionID}, {isRanking: false}, {multi: false}, (err, affected) -> - if err? then return errors.serverError res, "There was an error marking the victorious session as not being ranked." - return sendResponseObject req, res, {"message":"There were no more games to rank (game is at top)!"} - else - console.log "Player lost, achieved rank #{originalSessionRank}" - LevelSession.update {_id: originalSessionID}, {isRanking: false}, {multi: false}, (err, affected) -> - if err? then return errors.serverError res, "There was an error marking the completed session as not being ranked." - sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"} +fetchLevelSession = (callback) -> + findParameters = + _id: @clientResponseObject.originalSessionID + query = LevelSession + .findOne(findParameters) + .lean() + query.exec (err, session) => + @levelSession = session + log.info "Fetched level session" + callback err -determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) -> +checkSubmissionDate = (callback) -> + supposedSubmissionDate = new Date(@clientResponseObject.sessions[0].submitDate) + if Number(supposedSubmissionDate) isnt Number(@levelSession.submitDate) + callback "The game has been resubmitted. Removing from queue..." + else + log.info "Checked submission date" + callback null + +logTaskComputation = (callback) -> + @taskLog.set('calculationTimeMS',@clientResponseObject.calculationTimeMS) + @taskLog.set('sessions') + @taskLog.calculationTimeMS = @clientResponseObject.calculationTimeMS + @taskLog.sessions = @clientResponseObject.sessions + @taskLog.save (err, saved) -> + log.info "Logged task computation" + callback err + +updateSessions = (callback) -> + sessionIDs = _.pluck @clientResponseObject.sessions, 'sessionID' + + async.map sessionIDs, retrieveOldSessionData, (err, oldScores) => + if err? then callback err, {"error": "There was an error retrieving the old scores"} + + oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject @clientResponseObject, oldScores + newScoreArray = bayes.updatePlayerSkills oldScoreArray + saveNewScoresToDatabase newScoreArray, callback + + +saveNewScoresToDatabase = (newScoreArray, callback) -> + async.eachSeries newScoreArray, updateScoreInSession, (err) -> + log.info "Saved new scores to database" + callback err,newScoreArray + + +updateScoreInSession = (scoreObject,callback) -> + LevelSession.findOne {"_id": scoreObject.id}, (err, session) -> + if err? then return callback err, null + + session = session.toObject() + newTotalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation + scoreHistoryAddition = [Date.now(), newTotalScore] + updateObject = + meanStrength: scoreObject.meanStrength + standardDeviation: scoreObject.standardDeviation + totalScore: newTotalScore + $push: {scoreHistory: {$each: [scoreHistoryAddition], $slice: -1000}} + + LevelSession.update {"_id": scoreObject.id}, updateObject, callback + log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}" + +indexNewScoreArray = (newScoreArray, callback) -> + newScoresObject = _.indexBy newScoreArray, 'id' + @newScoresObject = newScoresObject + callback null, newScoresObject + +addMatchToSessions = (newScoreObject, callback) -> + matchObject = {} + matchObject.date = new Date() + matchObject.opponents = {} + for session in @clientResponseObject.sessions + sessionID = session.sessionID + matchObject.opponents[sessionID] = {} + matchObject.opponents[sessionID].sessionID = sessionID + matchObject.opponents[sessionID].userID = session.creator + matchObject.opponents[sessionID].metrics = {} + matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking) + + log.info "Match object computed, result: #{matchObject}" + log.info "Writing match object to database..." + #use bind with async to do the writes + sessionIDs = _.pluck @clientResponseObject.sessions, 'sessionID' + async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err + +updateMatchesInSession = (matchObject, sessionID, callback) -> + currentMatchObject = {} + currentMatchObject.date = matchObject.date + currentMatchObject.metrics = matchObject.opponents[sessionID].metrics + opponentsClone = _.cloneDeep matchObject.opponents + opponentsClone = _.omit opponentsClone, sessionID + opponentsArray = _.toArray opponentsClone + currentMatchObject.opponents = opponentsArray + + sessionUpdateObject = + $push: {matches: {$each: [currentMatchObject], $slice: -200}} + log.info "Updating session #{sessionID}" + LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback + +updateUserSimulationCounts = (reqUserID,callback) -> + incrementUserSimulationCount reqUserID, 'simulatedBy', (err) => + if err? then return callback err + incrementUserSimulationCount @levelSession.creator, 'simulatedFor', callback + +incrementUserSimulationCount = (userID, type, callback) => + inc = {} + inc[type] = 1 + User.update {_id: userID}, {$inc: inc}, (err, affected) -> + log.error "Error incrementing #{type} for #{userID}: #{err}" if err + callback err + +determineIfSessionShouldContinueAndUpdateLog = (cb) -> + sessionID = @clientResponseObject.originalSessionID + sessionRank = parseInt @clientResponseObject.originalSessionRank + queryParameters = _id: sessionID @@ -394,18 +494,28 @@ determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) -> totalNumberOfGamesPlayed = updatedSession.numberOfWinsAndTies + updatedSession.numberOfLosses if totalNumberOfGamesPlayed < 10 console.log "Number of games played is less than 10, continuing..." - cb null, true + cb null else ratio = (updatedSession.numberOfLosses) / (totalNumberOfGamesPlayed) if ratio > 0.33 - cb null, false + cb "shouldn't continue" console.log "Ratio(#{ratio}) is bad, ending simulation" else console.log "Ratio(#{ratio}) is good, so continuing simulations" - cb null, true + cb null + + +findNearestBetterSessionID = (cb) -> + levelOriginalID = @levelSession.level.original + levelMajorVersion = @levelSession.level.majorVersion + sessionID = @clientResponseObject.originalSessionID + sessionTotalScore = @newScoresObject[sessionID].totalScore + opponentSessionID = _.pull(_.keys(@newScoresObject), sessionID) + opponentSessionTotalScore = @newScoresObject[opponentSessionID].totalScore + opposingTeam = calculateOpposingTeam(@clientResponseObject.originalSessionTeam) + -findNearestBetterSessionID = (levelOriginalID, levelMajorVersion, sessionID, sessionTotalScore, opponentSessionTotalScore, opponentSessionID, opposingTeam, cb) -> retrieveAllOpponentSessionIDs sessionID, (err, opponentSessionIDs) -> if err? then return cb err, null @@ -434,23 +544,23 @@ findNearestBetterSessionID = (levelOriginalID, levelMajorVersion, sessionID, ses selectString = '_id totalScore' query = LevelSession.findOne(queryParameters) - .sort(sortParameters) - .limit(limitNumber) - .select(selectString) - .lean() + .sort(sortParameters) + .limit(limitNumber) + .select(selectString) + .lean() console.log "Finding session with score near #{opponentSessionTotalScore}" query.exec (err, session) -> if err? then return cb err, session - unless session then return cb err, null + unless session then return cb "no session was found" console.log "Found session with score #{session.totalScore}" cb err, session._id retrieveAllOpponentSessionIDs = (sessionID, cb) -> query = LevelSession.findOne({"_id":sessionID}) - .select('matches.opponents.sessionID matches.date submitDate') - .lean() + .select('matches.opponents.sessionID matches.date submitDate') + .lean() query.exec (err, session) -> if err? then return cb err, null opponentSessionIDs = (match.opponents[0].sessionID for match in session.matches when match.date > session.submitDate) @@ -462,44 +572,11 @@ calculateOpposingTeam = (sessionTeam) -> opposingTeams = _.pull teams, sessionTeam return opposingTeams[0] -incrementUserSimulationCount = (userID, type) -> - inc = {} - inc[type] = 1 - User.update {_id: userID}, {$inc: inc}, (err, affected) -> - log.error "Error incrementing #{type} for #{userID}: #{err}" if err +addNewSessionsToQueue = (sessionID, callback) -> + sessions = [@clientResponseObject.originalSessionID, sessionID] + addPairwiseTaskToQueue sessions, callback -addMatchToSessions = (clientResponseObject, newScoreObject, callback) -> - matchObject = {} - matchObject.date = new Date() - matchObject.opponents = {} - for session in clientResponseObject.sessions - sessionID = session.sessionID - matchObject.opponents[sessionID] = {} - matchObject.opponents[sessionID].sessionID = sessionID - matchObject.opponents[sessionID].userID = session.creator - matchObject.opponents[sessionID].metrics = {} - matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking) - - log.info "Match object computed, result: #{matchObject}" - log.info "Writing match object to database..." - #use bind with async to do the writes - sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID' - async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null - -updateMatchesInSession = (matchObject, sessionID, callback) -> - currentMatchObject = {} - currentMatchObject.date = matchObject.date - currentMatchObject.metrics = matchObject.opponents[sessionID].metrics - opponentsClone = _.cloneDeep matchObject.opponents - opponentsClone = _.omit opponentsClone, sessionID - opponentsArray = _.toArray opponentsClone - currentMatchObject.opponents = opponentsArray - - sessionUpdateObject = - $push: {matches: {$each: [currentMatchObject], $slice: -200}} - log.info "Updating session #{sessionID}" - LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback @@ -545,49 +622,6 @@ hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutI handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout" -verifyClientResponse = (responseObject, res) -> - unless typeof responseObject is "object" - errors.badInput res, "The response to that query is required to be a JSON object." - null - else - responseObject - -logTaskComputation = (taskObject,taskLogObject, callback) -> - taskLogObject.calculationTimeMS = taskObject.calculationTimeMS - taskLogObject.sessions = taskObject.sessions - taskLogObject.save callback - - -updateSessions = (taskObject,callback) -> - sessionIDs = _.pluck taskObject.sessions, 'sessionID' - - async.map sessionIDs, retrieveOldSessionData, (err, oldScores) -> - if err? then callback err, {"error": "There was an error retrieving the old scores"} - - oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores - newScoreArray = bayes.updatePlayerSkills oldScoreArray - saveNewScoresToDatabase newScoreArray, callback - - -saveNewScoresToDatabase = (newScoreArray, callback) -> - async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray - - -updateScoreInSession = (scoreObject,callback) -> - LevelSession.findOne {"_id": scoreObject.id}, (err, session) -> - if err? then return callback err, null - - session = session.toObject() - newTotalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation - scoreHistoryAddition = [Date.now(), newTotalScore] - updateObject = - meanStrength: scoreObject.meanStrength - standardDeviation: scoreObject.standardDeviation - totalScore: newTotalScore - $push: {scoreHistory: {$each: [scoreHistoryAddition], $slice: -1000}} - - LevelSession.update {"_id": scoreObject.id}, updateObject, callback - log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}" putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->