codecombat/server/queues/scoring.coffee
2014-03-24 10:58:43 -07:00

608 lines
23 KiB
CoffeeScript

config = require '../../server_config'
log = require 'winston'
mongoose = require 'mongoose'
async = require 'async'
errors = require '../commons/errors'
aws = require 'aws-sdk'
db = require './../routes/db'
mongoose = require 'mongoose'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
Level = require '../levels/Level'
User = require '../users/User'
TaskLog = require './task/ScoringTask'
bayes = new (require 'bayesian-battle')()
scoringTaskQueue = undefined
scoringTaskTimeoutInSeconds = 240
module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (error,data) ->
if error? then throw new Error "There was an error registering the scoring queue: #{error}"
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
module.exports.messagesInQueueCount = (req, res) ->
scoringTaskQueue.totalMessagesInQueue (err, count) ->
if err? then return errors.serverError res, "There was an issue finding the Mongoose count:#{err}"
response = String(count)
res.send(response)
res.end()
module.exports.addPairwiseTaskToQueueFromRequest = (req, res) ->
taskPair = req.body.sessions
addPairwiseTaskToQueue req.body.sessions (err, success) ->
if err? then return errors.serverError res, "There was an error adding pairwise tasks: #{err}"
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
addPairwiseTaskToQueue = (taskPair, cb) ->
LevelSession.findOne(_id:taskPair[0]).lean().exec (err, firstSession) =>
if err? then return cb err, false
LevelSession.find(_id:taskPair[1]).exec (err, secondSession) =>
if err? then return cb err, false
try
taskPairs = generateTaskPairs(secondSession, firstSession)
catch e
if e then return cb e, false
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return cb taskPairError,false
cb null, true
module.exports.resimulateAllSessions = (req, res) ->
unless isUserAdmin req then return errors.unauthorized res, "Unauthorized. Even if you are authorized, you shouldn't do this"
originalLevelID = req.body.originalLevelID
levelMajorVersion = parseInt(req.body.levelMajorVersion)
findParameters =
submitted: true
level:
original: originalLevelID
majorVersion: levelMajorVersion
query = LevelSession
.find(findParameters)
.lean()
query.exec (err, result) ->
if err? then return errors.serverError res, err
result = _.sample result, 10
async.each result, resimulateSession.bind(@,originalLevelID,levelMajorVersion), (err) ->
if err? then return errors.serverError res, err
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
resimulateSession = (originalLevelID, levelMajorVersion, session, cb) =>
sessionUpdateObject =
submitted: true
submitDate: new Date()
meanStrength: 25
standardDeviation: 25/3
totalScore: 10
numberOfWinsAndTies: 0
numberOfLosses: 0
isRanking: true
LevelSession.update {_id: session._id}, sessionUpdateObject, (err, updatedSession) ->
if err? then return cb err, null
opposingTeam = calculateOpposingTeam(session.team)
fetchInitialSessionsToRankAgainst levelMajorVersion, originalLevelID, opposingTeam, (err, sessionsToRankAgainst) ->
if err? then return cb err, null
taskPairs = generateTaskPairs(sessionsToRankAgainst, session)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return cb taskPairError, null
cb null
module.exports.createNewTask = (req, res) ->
requestSessionID = req.body.session
originalLevelID = req.body.originalLevelID
currentLevelID = req.body.levelID
requestLevelMajorVersion = parseInt(req.body.levelMajorVersion)
async.waterfall [
validatePermissions.bind(@,req,requestSessionID)
fetchAndVerifyLevelType.bind(@,currentLevelID)
fetchSessionObjectToSubmit.bind(@, requestSessionID)
updateSessionToSubmit
fetchInitialSessionsToRankAgainst.bind(@, requestLevelMajorVersion, originalLevelID)
generateAndSendTaskPairsToTheQueue
], (err, successMessageObject) ->
if err? then return errors.serverError res, "There was an error submitting the game to the queue:#{err}"
sendResponseObject req, res, successMessageObject
validatePermissions = (req,sessionID, callback) ->
if isUserAnonymous req then return callback "You are unauthorized to submit that game to the simulator"
if isUserAdmin req then return callback null
findParameters =
_id: sessionID
selectString = 'creator submittedCode code'
query = LevelSession
.findOne(findParameters)
.select(selectString)
.lean()
query.exec (err, retrievedSession) ->
if err? then return callback err
userHasPermissionToSubmitCode = retrievedSession.creator is req.user?.id and
not _.isEqual(retrievedSession.code, retrievedSession.submittedCode)
unless userHasPermissionToSubmitCode then return callback "You are unauthorized to submit that game to the simulator"
callback null
fetchAndVerifyLevelType = (levelID, cb) ->
findParameters =
_id: levelID
selectString = 'type'
query = Level
.findOne(findParameters)
.select(selectString)
.lean()
query.exec (err, levelWithType) ->
if err? then return cb err
if not levelWithType.type or levelWithType.type isnt "ladder" then return cb "Level isn't of type 'ladder'"
cb null
fetchSessionObjectToSubmit = (sessionID, callback) ->
findParameters =
_id: sessionID
selectString = 'team code'
query = LevelSession
.findOne(findParameters)
.select(selectString)
query.exec (err, session) ->
callback err, session?.toObject()
updateSessionToSubmit = (sessionToUpdate, callback) ->
sessionUpdateObject =
submitted: true
submittedCode: sessionToUpdate.code
submitDate: new Date()
meanStrength: 25
standardDeviation: 25/3
totalScore: 10
numberOfWinsAndTies: 0
numberOfLosses: 0
isRanking: true
LevelSession.update {_id: sessionToUpdate._id}, sessionUpdateObject, (err, result) ->
callback err, sessionToUpdate
fetchInitialSessionsToRankAgainst = (levelMajorVersion, levelID, submittedSession, callback) ->
opposingTeam = calculateOpposingTeam(submittedSession.team)
findParameters =
"level.original": levelID
"level.majorVersion": levelMajorVersion
submitted: true
submittedCode:
$exists: true
team: opposingTeam
sortParameters =
totalScore: 1
limitNumber = 1
query = LevelSession.find(findParameters)
.sort(sortParameters)
.limit(limitNumber)
query.exec (err, sessionToRankAgainst) ->
callback err, sessionToRankAgainst, submittedSession
generateAndSendTaskPairsToTheQueue = (sessionToRankAgainst,submittedSession, callback) ->
taskPairs = generateTaskPairs(sessionToRankAgainst, submittedSession)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return callback taskPairError
callback null, {"message": "All task pairs were succesfully sent to the queue"}
module.exports.dispatchTaskToConsumer = (req, res) ->
async.waterfall [
checkSimulationPermissions.bind(@,req)
receiveMessageFromSimulationQueue
changeMessageVisibilityTimeout
parseTaskQueueMessage
constructTaskObject
constructTaskLogObject.bind(@, getUserIDFromRequest(req))
processTaskObject
], (err, taskObjectToSend) ->
if err?
if typeof err is "string" and err.indexOf "No more games in the queue" isnt -1
res.send(204, "No games to score.")
return res.end()
else
return errors.serverError res, "There was an error dispatching the task: #{err}"
sendResponseObject req, res, taskObjectToSend
checkSimulationPermissions = (req, cb) ->
if isUserAnonymous req
cb "You need to be logged in to simulate games"
else
cb null
receiveMessageFromSimulationQueue = (cb) ->
scoringTaskQueue.receiveMessage (err, message) ->
if err? then return cb "No more games in the queue, error:#{err}"
if messageIsInvalid(message) then return cb "Message received from queue is invalid"
cb null, message
changeMessageVisibilityTimeout = (message, cb) ->
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) -> cb err, message
parseTaskQueueMessage = (message,cb) ->
try
if typeof message.getBody() is "object"
messageBody = message.getBody()
else
messageBody = JSON.parse message.getBody()
cb null, messageBody, message
catch e
cb "There was an error parsing the task.Error: #{e}"
constructTaskObject = (taskMessageBody, message, callback) ->
async.map taskMessageBody.sessions, getSessionInformation, (err, sessions) ->
if err? then return callback err
taskObject =
"messageGenerated": Date.now()
"sessions": []
for session in sessions
sessionInformation =
"sessionID": session._id
"submitDate": session.submitDate
"team": session.team ? "No team"
"code": session.submittedCode
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
"creator": session.creator
"creatorName":session.creatorName
taskObject.sessions.push sessionInformation
callback null, taskObject, message
constructTaskLogObject = (calculatorUserID, taskObject, message, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":message.getReceiptHandle()
taskLogObject.save (err) -> callback err, taskObject, taskLogObject, message
processTaskObject = (taskObject,taskLogObject, message, cb) ->
taskObject.taskID = taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
cb null, taskObject
getSessionInformation = (sessionIDString, callback) ->
findParameters =
_id: sessionIDString
selectString = 'submitDate team submittedCode teamSpells levelID creator creatorName'
query = LevelSession
.findOne(findParameters)
.select(selectString)
.lean()
query.exec (err, session) ->
if err? then return callback err, {"error":"There was an error retrieving the session."}
callback null, session
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
return unless clientResponseObject?
TaskLog.findOne {_id: clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
scoringTaskQueue.deleteMessage clientResponseObject.receiptHandle, (err) ->
console.log "Deleted message."
if err? then return errors.badInput res, "The queue message is already back in the queue, rejecting results."
LevelSession.findOne(_id: clientResponseObject.originalSessionID).lean().exec (err, levelSession) ->
if err? then return errors.serverError res, "There was a problem finding the level session:#{err}"
supposedSubmissionDate = new Date(clientResponseObject.sessions[0].submitDate)
if Number(supposedSubmissionDate) isnt Number(levelSession.submitDate)
return sendResponseObject req, res, {"message":"The game has been resubmitted. Removing from queue..."}
logTaskComputation clientResponseObject, taskLog, (logErr) ->
if logErr? then return errors.serverError res, "There as a problem logging the task computation: #{logErr}"
updateSessions clientResponseObject, (updateError, newScoreArray) ->
if updateError? then return errors.serverError res, "There was an error updating the scores.#{updateError}"
newScoresObject = _.indexBy newScoreArray, 'id'
addMatchToSessions clientResponseObject, newScoresObject, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the sessions with the match! #{JSON.stringify err}"
incrementUserSimulationCount req.user._id, 'simulatedBy'
incrementUserSimulationCount levelSession.creator, 'simulatedFor'
originalSessionID = clientResponseObject.originalSessionID
originalSessionTeam = clientResponseObject.originalSessionTeam
originalSessionRank = parseInt clientResponseObject.originalSessionRank
determineIfSessionShouldContinueAndUpdateLog originalSessionID, originalSessionRank, (err, sessionShouldContinue) ->
if err? then return errors.serverError res, "There was an error determining if the session should continue, #{err}"
if sessionShouldContinue
opposingTeam = calculateOpposingTeam(originalSessionTeam)
opponentID = _.pull(_.keys(newScoresObject), originalSessionID)
sessionNewScore = newScoresObject[originalSessionID].totalScore
opponentNewScore = newScoresObject[opponentID].totalScore
levelOriginalID = levelSession.level.original
levelOriginalMajorVersion = levelSession.level.majorVersion
findNearestBetterSessionID levelOriginalID, levelOriginalMajorVersion, originalSessionID, sessionNewScore, opponentNewScore, opponentID, opposingTeam, (err, opponentSessionID) ->
if err? then return errors.serverError res, "There was an error finding the nearest sessionID!"
if opponentSessionID
addPairwiseTaskToQueue [originalSessionID, opponentSessionID], (err, success) ->
if err? then return errors.serverError res, "There was an error sending the pairwise tasks to the queue!"
sendResponseObject req, res, {"message":"The scores were updated successfully and more games were sent to the queue!"}
else
LevelSession.update {_id: originalSessionID}, {isRanking: false}, {multi: false}, (err, affected) ->
if err? then return errors.serverError res, "There was an error marking the victorious session as not being ranked."
return sendResponseObject req, res, {"message":"There were no more games to rank (game is at top)!"}
else
console.log "Player lost, achieved rank #{originalSessionRank}"
LevelSession.update {_id: originalSessionID}, {isRanking: false}, {multi: false}, (err, affected) ->
if err? then return errors.serverError res, "There was an error marking the completed session as not being ranked."
sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"}
determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) ->
queryParameters =
_id: sessionID
updateParameters =
"$inc": {}
if sessionRank is 0
updateParameters["$inc"] = {numberOfWinsAndTies: 1}
else
updateParameters["$inc"] = {numberOfLosses: 1}
LevelSession.findOneAndUpdate queryParameters, updateParameters,{select: 'numberOfWinsAndTies numberOfLosses'}, (err, updatedSession) ->
if err? then return cb err, updatedSession
updatedSession = updatedSession.toObject()
totalNumberOfGamesPlayed = updatedSession.numberOfWinsAndTies + updatedSession.numberOfLosses
if totalNumberOfGamesPlayed < 10
console.log "Number of games played is less than 10, continuing..."
cb null, true
else
ratio = (updatedSession.numberOfLosses) / (totalNumberOfGamesPlayed)
if ratio > 0.33
cb null, false
console.log "Ratio(#{ratio}) is bad, ending simulation"
else
console.log "Ratio(#{ratio}) is good, so continuing simulations"
cb null, true
findNearestBetterSessionID = (levelOriginalID, levelMajorVersion, sessionID, sessionTotalScore, opponentSessionTotalScore, opponentSessionID, opposingTeam, cb) ->
retrieveAllOpponentSessionIDs sessionID, (err, opponentSessionIDs) ->
if err? then return cb err, null
queryParameters =
totalScore:
$gt: opponentSessionTotalScore
_id:
$nin: opponentSessionIDs
"level.original": levelOriginalID
"level.majorVersion": levelMajorVersion
submitted: true
submittedCode:
$exists: true
team: opposingTeam
if opponentSessionTotalScore < 30
# Don't play a ton of matches at low scores--skip some in proportion to how close to 30 we are.
# TODO: this could be made a lot more flexible.
queryParameters["totalScore"]["$gt"] = opponentSessionTotalScore + 2 * (30 - opponentSessionTotalScore) / 20
limitNumber = 1
sortParameters =
totalScore: 1
selectString = '_id totalScore'
query = LevelSession.findOne(queryParameters)
.sort(sortParameters)
.limit(limitNumber)
.select(selectString)
.lean()
console.log "Finding session with score near #{opponentSessionTotalScore}"
query.exec (err, session) ->
if err? then return cb err, session
unless session then return cb err, null
console.log "Found session with score #{session.totalScore}"
cb err, session._id
retrieveAllOpponentSessionIDs = (sessionID, cb) ->
query = LevelSession.findOne({"_id":sessionID})
.select('matches.opponents.sessionID matches.date submitDate')
.lean()
query.exec (err, session) ->
if err? then return cb err, null
opponentSessionIDs = (match.opponents[0].sessionID for match in session.matches when match.date > session.submitDate)
cb err, opponentSessionIDs
calculateOpposingTeam = (sessionTeam) ->
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionTeam
return opposingTeams[0]
incrementUserSimulationCount = (userID, type) ->
inc = {}
inc[type] = 1
User.update {_id: userID}, {$inc: inc}, (err, affected) ->
log.error "Error incrementing #{type} for #{userID}: #{err}" if err
addMatchToSessions = (clientResponseObject, newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking)
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
sessionUpdateObject =
$push: {matches: {$each: [currentMatchObject], $slice: -200}}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
messageIsInvalid = (message) -> (not message?) or message.isEmpty()
sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback
generateTaskPairs = (submittedSessions, sessionToScore) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionToScore.team
if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams
console.log "Adding game to taskPairs!"
taskPairs.push [sessionToScore._id,String session._id]
return taskPairs
sendTaskPairToQueue = (taskPair, callback) ->
scoringTaskQueue.sendMessage {sessions: taskPair}, 5, (err,data) -> callback? err,data
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
isUserAdmin = (req) -> return Boolean(req.user?.isAdmin())
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
session = session.toObject()
newTotalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
scoreHistoryAddition = [Date.now(), newTotalScore]
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: newTotalScore
$push: {scoreHistory: {$each: [scoreHistoryAddition], $slice: -1000}}
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
scoreObject[session.sessionID].gameRanking = session.metrics.rank for session in taskObject.sessions
return scoreObject
retrieveOldSessionData = (sessionID, callback) ->
LevelSession.findOne {"_id":sessionID}, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
oldScoreObject =
"standardDeviation":session.standardDeviation ? 25/3
"meanStrength":session.meanStrength ? 25
"totalScore":session.totalScore ? (25 - 1.8*(25/3))
"id": sessionID
callback err, oldScoreObject