Refactored simulator, removed duplication bug

This merge is going to be terrible
This commit is contained in:
Michael Schmatz 2014-04-02 15:00:43 -07:00
parent 6a51bd6837
commit 33e14ac406

View file

@ -43,17 +43,17 @@ module.exports.addPairwiseTaskToQueueFromRequest = (req, res) ->
addPairwiseTaskToQueue = (taskPair, cb) ->
LevelSession.findOne(_id:taskPair[0]).lean().exec (err, firstSession) =>
if err? then return cb err, false
if err? then return cb err
LevelSession.find(_id:taskPair[1]).exec (err, secondSession) =>
if err? then return cb err, false
if err? then return cb err
try
taskPairs = generateTaskPairs(secondSession, firstSession)
catch e
if e then return cb e, false
if e then return cb e
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return cb taskPairError,false
cb null, true
if taskPairError? then return cb taskPairError
cb null
module.exports.resimulateAllSessions = (req, res) ->
unless isUserAdmin req then return errors.unauthorized res, "Unauthorized. Even if you are authorized, you shouldn't do this"
@ -68,8 +68,8 @@ module.exports.resimulateAllSessions = (req, res) ->
majorVersion: levelMajorVersion
query = LevelSession
.find(findParameters)
.lean()
.find(findParameters)
.lean()
query.exec (err, result) ->
if err? then return errors.serverError res, err
@ -107,7 +107,7 @@ module.exports.createNewTask = (req, res) ->
originalLevelID = req.body.originalLevelID
currentLevelID = req.body.levelID
requestLevelMajorVersion = parseInt(req.body.levelMajorVersion)
async.waterfall [
validatePermissions.bind(@,req,requestSessionID)
fetchAndVerifyLevelType.bind(@,currentLevelID)
@ -115,12 +115,12 @@ module.exports.createNewTask = (req, res) ->
updateSessionToSubmit
fetchInitialSessionsToRankAgainst.bind(@, requestLevelMajorVersion, originalLevelID)
generateAndSendTaskPairsToTheQueue
], (err, successMessageObject) ->
if err? then return errors.serverError res, "There was an error submitting the game to the queue:#{err}"
sendResponseObject req, res, successMessageObject
validatePermissions = (req,sessionID, callback) ->
if isUserAnonymous req then return callback "You are unauthorized to submit that game to the simulator"
if isUserAdmin req then return callback null
@ -136,7 +136,7 @@ validatePermissions = (req,sessionID, callback) ->
query.exec (err, retrievedSession) ->
if err? then return callback err
userHasPermissionToSubmitCode = retrievedSession.creator is req.user?.id and
not _.isEqual(retrievedSession.code, retrievedSession.submittedCode)
not _.isEqual(retrievedSession.code, retrievedSession.submittedCode)
unless userHasPermissionToSubmitCode then return callback "You are unauthorized to submit that game to the simulator"
callback null
@ -209,7 +209,7 @@ generateAndSendTaskPairsToTheQueue = (sessionToRankAgainst,submittedSession, cal
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return callback taskPairError
callback null, {"message": "All task pairs were succesfully sent to the queue"}
module.exports.dispatchTaskToConsumer = (req, res) ->
async.waterfall [
@ -221,7 +221,7 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
constructTaskLogObject.bind(@, getUserIDFromRequest(req))
processTaskObject
], (err, taskObjectToSend) ->
if err?
if err?
if typeof err is "string" and err.indexOf "No more games in the queue" isnt -1
res.send(204, "No games to score.")
return res.end()
@ -229,16 +229,16 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
return errors.serverError res, "There was an error dispatching the task: #{err}"
sendResponseObject req, res, taskObjectToSend
checkSimulationPermissions = (req, cb) ->
if isUserAnonymous req
if isUserAnonymous req
cb "You need to be logged in to simulate games"
else
cb null
receiveMessageFromSimulationQueue = (cb) ->
scoringTaskQueue.receiveMessage (err, message) ->
scoringTaskQueue.receiveMessage (err, message) ->
if err? then return cb "No more games in the queue, error:#{err}"
if messageIsInvalid(message) then return cb "Message received from queue is invalid"
cb null, message
@ -292,19 +292,19 @@ processTaskObject = (taskObject,taskLogObject, message, cb) ->
cb null, taskObject
getSessionInformation = (sessionIDString, callback) ->
findParameters =
findParameters =
_id: sessionIDString
selectString = 'submitDate team submittedCode teamSpells levelID creator creatorName'
query = LevelSession
.findOne(findParameters)
.select(selectString)
.lean()
.findOne(findParameters)
.select(selectString)
.lean()
query.exec (err, session) ->
if err? then return callback err, {"error":"There was an error retrieving the session."}
callback null, session
###
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
@ -373,9 +373,178 @@ module.exports.processTaskResult = (req, res) ->
LevelSession.update {_id: originalSessionID}, {isRanking: false}, {multi: false}, (err, affected) ->
if err? then return errors.serverError res, "There was an error marking the completed session as not being ranked."
sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"}
###
module.exports.processTaskResult = (req, res) ->
async.waterfall [
verifyClientResponse.bind(@,req.body)
fetchTaskLog.bind(@)
checkTaskLog.bind(@)
deleteQueueMessage.bind(@)
fetchLevelSession.bind(@)
checkSubmissionDate.bind(@)
logTaskComputation.bind(@)
updateSessions.bind(@)
indexNewScoreArray.bind(@)
addMatchToSessions.bind(@)
updateUserSimulationCounts.bind(@, req.user._id)
determineIfSessionShouldContinueAndUpdateLog.bind(@)
findNearestBetterSessionID.bind(@)
addNewSessionsToQueue.bind(@)
], (err, results) ->
if err is "shouldn't continue"
sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"}
else if err is "no session was found"
sendResponseObject req, res, {"message":"There were no more games to rank (game is at top)!"}
else if err?
errors.serverError res, "There was an error:#{err}"
else
sendResponseObject req, res, {"message":"The scores were updated successfully and more games were sent to the queue!"}
verifyClientResponse = (responseObject, callback) ->
#TODO: better verification
unless typeof responseObject is "object"
callback "The response to that query is required to be a JSON object."
else
@clientResponseObject = responseObject
log.info "Verified client response!"
callback null, responseObject
fetchTaskLog = (responseObject, callback) ->
findParameters =
_id: responseObject.taskID
query = TaskLog
.findOne(findParameters)
query.exec (err, taskLog) =>
@taskLog = taskLog
log.info "Fetched task log!"
callback err, taskLog.toObject()
checkTaskLog = (taskLog, callback) ->
if taskLog.calculationTimeMS then return callback "That computational task has already been performed"
if hasTaskTimedOut taskLog.sentDate then return callback "The task has timed out"
log.info "Checked task log"
callback null
deleteQueueMessage = (callback) ->
scoringTaskQueue.deleteMessage @clientResponseObject.receiptHandle, (err) ->
log.info "Deleted queue message"
callback err
fetchLevelSession = (callback) ->
findParameters =
_id: @clientResponseObject.originalSessionID
query = LevelSession
.findOne(findParameters)
.lean()
query.exec (err, session) =>
@levelSession = session
log.info "Fetched level session"
callback err
determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) ->
checkSubmissionDate = (callback) ->
supposedSubmissionDate = new Date(@clientResponseObject.sessions[0].submitDate)
if Number(supposedSubmissionDate) isnt Number(@levelSession.submitDate)
callback "The game has been resubmitted. Removing from queue..."
else
log.info "Checked submission date"
callback null
logTaskComputation = (callback) ->
@taskLog.set('calculationTimeMS',@clientResponseObject.calculationTimeMS)
@taskLog.set('sessions')
@taskLog.calculationTimeMS = @clientResponseObject.calculationTimeMS
@taskLog.sessions = @clientResponseObject.sessions
@taskLog.save (err, saved) ->
log.info "Logged task computation"
callback err
updateSessions = (callback) ->
sessionIDs = _.pluck @clientResponseObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) =>
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject @clientResponseObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) ->
log.info "Saved new scores to database"
callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
session = session.toObject()
newTotalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
scoreHistoryAddition = [Date.now(), newTotalScore]
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: newTotalScore
$push: {scoreHistory: {$each: [scoreHistoryAddition], $slice: -1000}}
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
indexNewScoreArray = (newScoreArray, callback) ->
newScoresObject = _.indexBy newScoreArray, 'id'
@newScoresObject = newScoresObject
callback null, newScoresObject
addMatchToSessions = (newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in @clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking)
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck @clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
sessionUpdateObject =
$push: {matches: {$each: [currentMatchObject], $slice: -200}}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
updateUserSimulationCounts = (reqUserID,callback) ->
incrementUserSimulationCount reqUserID, 'simulatedBy', (err) =>
if err? then return callback err
incrementUserSimulationCount @levelSession.creator, 'simulatedFor', callback
incrementUserSimulationCount = (userID, type, callback) =>
inc = {}
inc[type] = 1
User.update {_id: userID}, {$inc: inc}, (err, affected) ->
log.error "Error incrementing #{type} for #{userID}: #{err}" if err
callback err
determineIfSessionShouldContinueAndUpdateLog = (cb) ->
sessionID = @clientResponseObject.originalSessionID
sessionRank = parseInt @clientResponseObject.originalSessionRank
queryParameters =
_id: sessionID
@ -394,18 +563,28 @@ determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) ->
totalNumberOfGamesPlayed = updatedSession.numberOfWinsAndTies + updatedSession.numberOfLosses
if totalNumberOfGamesPlayed < 10
console.log "Number of games played is less than 10, continuing..."
cb null, true
cb null
else
ratio = (updatedSession.numberOfLosses) / (totalNumberOfGamesPlayed)
if ratio > 0.33
cb null, false
cb "shouldn't continue"
console.log "Ratio(#{ratio}) is bad, ending simulation"
else
console.log "Ratio(#{ratio}) is good, so continuing simulations"
cb null, true
cb null
findNearestBetterSessionID = (cb) ->
levelOriginalID = @levelSession.level.original
levelMajorVersion = @levelSession.level.majorVersion
sessionID = @clientResponseObject.originalSessionID
sessionTotalScore = @newScoresObject[sessionID].totalScore
opponentSessionID = _.pull(_.keys(@newScoresObject), sessionID)
opponentSessionTotalScore = @newScoresObject[opponentSessionID].totalScore
opposingTeam = calculateOpposingTeam(@clientResponseObject.originalSessionTeam)
findNearestBetterSessionID = (levelOriginalID, levelMajorVersion, sessionID, sessionTotalScore, opponentSessionTotalScore, opponentSessionID, opposingTeam, cb) ->
retrieveAllOpponentSessionIDs sessionID, (err, opponentSessionIDs) ->
if err? then return cb err, null
@ -434,23 +613,23 @@ findNearestBetterSessionID = (levelOriginalID, levelMajorVersion, sessionID, ses
selectString = '_id totalScore'
query = LevelSession.findOne(queryParameters)
.sort(sortParameters)
.limit(limitNumber)
.select(selectString)
.lean()
.sort(sortParameters)
.limit(limitNumber)
.select(selectString)
.lean()
console.log "Finding session with score near #{opponentSessionTotalScore}"
query.exec (err, session) ->
if err? then return cb err, session
unless session then return cb err, null
unless session then return cb "no session was found"
console.log "Found session with score #{session.totalScore}"
cb err, session._id
retrieveAllOpponentSessionIDs = (sessionID, cb) ->
query = LevelSession.findOne({"_id":sessionID})
.select('matches.opponents.sessionID matches.date submitDate')
.lean()
.select('matches.opponents.sessionID matches.date submitDate')
.lean()
query.exec (err, session) ->
if err? then return cb err, null
opponentSessionIDs = (match.opponents[0].sessionID for match in session.matches when match.date > session.submitDate)
@ -462,44 +641,11 @@ calculateOpposingTeam = (sessionTeam) ->
opposingTeams = _.pull teams, sessionTeam
return opposingTeams[0]
incrementUserSimulationCount = (userID, type) ->
inc = {}
inc[type] = 1
User.update {_id: userID}, {$inc: inc}, (err, affected) ->
log.error "Error incrementing #{type} for #{userID}: #{err}" if err
addNewSessionsToQueue = (sessionID, callback) ->
sessions = [@clientResponseObject.originalSessionID, sessionID]
addPairwiseTaskToQueue sessions, callback
addMatchToSessions = (clientResponseObject, newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking)
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
sessionUpdateObject =
$push: {matches: {$each: [currentMatchObject], $slice: -200}}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
@ -545,49 +691,6 @@ hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutI
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
session = session.toObject()
newTotalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
scoreHistoryAddition = [Date.now(), newTotalScore]
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: newTotalScore
$push: {scoreHistory: {$each: [scoreHistoryAddition], $slice: -1000}}
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->