codecombat/server/queues/scoring.coffee

448 lines
18 KiB
CoffeeScript
Raw Normal View History

2014-02-05 12:39:14 -05:00
config = require '../../server_config'
log = require 'winston'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
async = require 'async'
errors = require '../commons/errors'
aws = require 'aws-sdk'
2014-02-06 16:25:11 -05:00
db = require './../routes/db'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
2014-02-07 18:52:24 -05:00
TaskLog = require './task/ScoringTask'
bayes = new (require 'bayesian-battle')()
2014-02-05 12:39:14 -05:00
scoringTaskQueue = undefined
2014-02-26 20:30:56 -05:00
scoringTaskTimeoutInSeconds = 180
module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
2014-02-17 17:39:21 -05:00
queues.queueClient.registerQueue "scoring", {}, (error,data) ->
if error? then throw new Error "There was an error registering the scoring queue: #{error}"
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
2014-02-24 10:50:43 -05:00
2014-02-26 15:14:02 -05:00
module.exports.addPairwiseTaskToQueueFromRequest = (req, res) ->
2014-02-24 10:50:43 -05:00
taskPair = req.body.sessions
2014-02-26 15:14:02 -05:00
addPairwiseTaskToQueue req.body.sessions (err, success) ->
if err? then return errors.serverError res, "There was an error adding pairwise tasks: #{err}"
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
addPairwiseTaskToQueue = (taskPair, cb) ->
2014-02-24 10:50:43 -05:00
LevelSession.findOne(_id:taskPair[0]).lean().exec (err, firstSession) =>
2014-02-26 15:14:02 -05:00
if err? then return cb err, false
2014-02-24 10:50:43 -05:00
LevelSession.find(_id:taskPair[1]).exec (err, secondSession) =>
2014-02-26 15:14:02 -05:00
if err? then return cb err, false
2014-02-24 10:50:43 -05:00
try
taskPairs = generateTaskPairs(secondSession, firstSession)
catch e
2014-02-26 15:14:02 -05:00
if e then return cb e, false
2014-02-24 10:50:43 -05:00
2014-02-26 15:14:02 -05:00
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return cb taskPairError,false
cb null, true
module.exports.createNewTask = (req, res) ->
2014-02-18 14:46:14 -05:00
requestSessionID = req.body.session
validatePermissions req, requestSessionID, (error, permissionsAreValid) ->
if err? then return errors.serverError res, "There was an error validating permissions"
unless permissionsAreValid then return errors.forbidden res, "You do not have the permissions to submit that game to the leaderboard"
return errors.badInput res, "The session ID is invalid" unless typeof requestSessionID is "string"
fetchSessionToSubmit requestSessionID, (err, sessionToSubmit) ->
if err? then return errors.serverError res, "There was an error finding the given session."
updateSessionToSubmit sessionToSubmit, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the session"
2014-02-26 15:14:02 -05:00
opposingTeam = calculateOpposingTeam(sessionToSubmit.team)
fetchInitialSessionsToRankAgainst opposingTeam, (err, sessionsToRankAgainst) ->
if err? then return errors.serverError res, "There was an error fetching the sessions to rank against"
2014-02-07 18:52:24 -05:00
taskPairs = generateTaskPairs(sessionsToRankAgainst, sessionToSubmit)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return errors.serverError res, "There was an error sending the task pairs to the queue"
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
2014-02-17 17:39:21 -05:00
module.exports.dispatchTaskToConsumer = (req, res) ->
if isUserAnonymous(req) then return errors.forbidden res, "You need to be logged in to simulate games"
2014-02-06 16:25:11 -05:00
2014-02-17 17:39:21 -05:00
scoringTaskQueue.receiveMessage (err, message) ->
2014-02-18 14:46:14 -05:00
if err? or messageIsInvalid(message) then return errors.gatewayTimeoutError res, "Queue Receive Error:#{err}"
console.log "Received Message"
messageBody = parseTaskQueueMessage req, res, message
2014-02-17 17:39:21 -05:00
return unless messageBody?
2014-02-06 16:25:11 -05:00
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
2014-02-17 17:39:21 -05:00
if taskConstructionError? then return errors.serverError res, "There was an error constructing the scoring task"
2014-02-18 14:46:14 -05:00
console.log "Constructed task body"
2014-02-17 17:39:21 -05:00
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) ->
if err? then return errors.serverError res, "There was an error changing the message visibility timeout."
2014-02-18 14:46:14 -05:00
console.log "Changed visibility timeout"
2014-02-17 17:39:21 -05:00
constructTaskLogObject getUserIDFromRequest(req),message.getReceiptHandle(), (taskLogError, taskLogObject) ->
if taskLogError? then return errors.serverError res, "There was an error creating the task log object."
2014-02-07 18:52:24 -05:00
2014-02-17 17:39:21 -05:00
taskObject.taskID = taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
2014-02-07 18:52:24 -05:00
2014-02-17 17:39:21 -05:00
sendResponseObject req, res, taskObject
2014-02-13 15:59:21 -05:00
2014-02-17 17:39:21 -05:00
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
2014-02-05 12:39:14 -05:00
2014-02-18 14:46:14 -05:00
return unless clientResponseObject?
TaskLog.findOne {_id: clientResponseObject.taskID}, (err, taskLog) ->
2014-02-17 17:39:21 -05:00
return errors.serverError res, "There was an error retrieiving the task log object" if err?
2014-02-17 17:39:21 -05:00
taskLogJSON = taskLog.toObject()
2014-02-17 17:39:21 -05:00
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
2014-02-18 14:46:14 -05:00
scoringTaskQueue.deleteMessage clientResponseObject.receiptHandle, (err) ->
if err? then return errors.badInput res, "The queue message is already back in the queue, rejecting results."
logTaskComputation clientResponseObject, taskLog, (logErr) ->
if logErr? then return errors.serverError res, "There as a problem logging the task computation: #{logErr}"
updateSessions clientResponseObject, (updateError, newScoreArray) ->
if updateError? then return errors.serverError res, "There was an error updating the scores.#{updateError}"
newScoresObject = _.indexBy newScoreArray, 'id'
addMatchToSessions clientResponseObject, newScoresObject, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the sessions with the match! #{JSON.stringify err}"
2014-02-26 20:30:56 -05:00
2014-02-26 15:14:02 -05:00
originalSessionID = clientResponseObject.originalSessionID
originalSessionTeam = clientResponseObject.originalSessionTeam
originalSessionRank = parseInt clientResponseObject.originalSessionRank
2014-02-26 20:30:56 -05:00
determineIfSessionShouldContinueAndUpdateLog originalSessionID, originalSessionRank, (err, sessionShouldContinue) ->
if err? then return errors.serverError res, "There was an error determining if the session should continue, #{err}"
if sessionShouldContinue
opposingTeam = calculateOpposingTeam(originalSessionTeam)
opponentID = _.pull(_.keys(newScoresObject), originalSessionID)
sessionNewScore = newScoresObject[originalSessionID].totalScore
opponentNewScore = newScoresObject[opponentID].totalScore
findNearestBetterSessionID originalSessionID, sessionNewScore, opponentNewScore, opponentID ,opposingTeam, (err, opponentSessionID) ->
2014-02-26 20:30:56 -05:00
if err? then return errors.serverError res, "There was an error finding the nearest sessionID!"
unless opponentSessionID then return sendResponseObject req, res, {"message":"There were no more games to rank(game is at top!"}
addPairwiseTaskToQueue [originalSessionID, opponentSessionID], (err, success) ->
if err? then return errors.serverError res, "There was an error sending the pairwise tasks to the queue!"
sendResponseObject req, res, {"message":"The scores were updated successfully and more games were sent to the queue!"}
else
console.log "Player lost, achieved rank #{originalSessionRank}"
sendResponseObject req, res, {"message":"The scores were updated successfully, person lost so no more games are being inserted!"}
determineIfSessionShouldContinueAndUpdateLog = (sessionID, sessionRank, cb) ->
queryParameters =
_id: sessionID
updateParameters =
"$inc": {}
if sessionRank is 0
updateParameters["$inc"] = {numberOfWinsAndTies: 1}
else
updateParameters["$inc"] = {numberOfLosses: 1}
LevelSession.findOneAndUpdate queryParameters, updateParameters,{select: 'numberOfWinsAndTies numberOfLosses'}, (err, updatedSession) ->
if err? then return cb err, updatedSession
updatedSession = updatedSession.toObject()
totalNumberOfGamesPlayed = updatedSession.numberOfWinsAndTies + updatedSession.numberOfLosses
if totalNumberOfGamesPlayed < 5
console.log "Number of games played is less than 5, continuing..."
cb null, true
else
2014-03-01 19:56:04 -05:00
ratio = (updatedSession.numberOfLosses) / (totalNumberOfGamesPlayed)
2014-02-26 20:30:56 -05:00
if ratio > 0.66
cb null, false
console.log "Ratio(#{ratio}) is bad, ending simulation"
else
console.log "Ratio(#{ratio}) is good, so continuing simulations"
cb null, true
findNearestBetterSessionID = (sessionID, sessionTotalScore, opponentSessionTotalScore, opponentSessionID, opposingTeam, cb) ->
retrieveAllOpponentSessionIDs sessionID, (err, opponentSessionIDs) ->
if err? then return cb err, null
2014-02-26 15:14:02 -05:00
queryParameters =
totalScore:
$gt:opponentSessionTotalScore
_id:
$nin: opponentSessionIDs
"level.original": "52d97ecd32362bc86e004e87"
"level.majorVersion": 0
submitted: true
submittedCode:
$exists: true
team: opposingTeam
limitNumber = 1
sortParameters =
totalScore: 1
selectString = '_id totalScore'
query = LevelSession.findOne(queryParameters)
.sort(sortParameters)
.limit(limitNumber)
.select(selectString)
.lean()
2014-02-26 15:14:02 -05:00
console.log "Finding session with score near #{opponentSessionTotalScore}"
query.exec (err, session) ->
if err? then return cb err, session
unless session then return cb err, null
console.log "Found session with score #{session.totalScore}"
cb err, session._id
2014-02-26 15:14:02 -05:00
retrieveAllOpponentSessionIDs = (sessionID, cb) ->
query = LevelSession.findOne({"_id":sessionID})
.select('matches.opponents.sessionID')
2014-02-26 15:14:02 -05:00
.lean()
query.exec (err, session) ->
if err? then return cb err, null
opponentSessionIDs = (match.opponents[0].sessionID for match in session.matches)
cb err, opponentSessionIDs
2014-02-26 15:14:02 -05:00
calculateOpposingTeam = (sessionTeam) ->
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionTeam
return opposingTeams[0]
validatePermissions = (req, sessionID, callback) ->
if isUserAnonymous req then return callback null, false
if isUserAdmin req then return callback null, true
LevelSession.findOne(_id:sessionID).select('creator submittedCode code').lean().exec (err, retrievedSession) ->
if err? then return callback err, retrievedSession
code = retrievedSession.code
submittedCode = retrievedSession.submittedCode
callback null, (retrievedSession.creator is req.user?.id and not _.isEqual(code, submittedCode))
2014-02-18 14:46:14 -05:00
addMatchToSessions = (clientResponseObject, newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
2014-02-19 17:26:49 -05:00
matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking)
2014-02-18 14:46:14 -05:00
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
2014-02-17 17:39:21 -05:00
2014-02-18 14:46:14 -05:00
sessionUpdateObject =
$push: {matches: currentMatchObject}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
2014-02-17 17:39:21 -05:00
messageIsInvalid = (message) -> (not message?) or message.isEmpty()
sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback
2014-02-18 14:46:14 -05:00
fetchSessionToSubmit = (submittedSessionID, callback) ->
LevelSession.findOne {_id: submittedSessionID}, (err, session) -> callback err, session?.toObject()
2014-02-17 17:39:21 -05:00
2014-02-18 14:46:14 -05:00
updateSessionToSubmit = (sessionToUpdate, callback) ->
2014-02-17 17:39:21 -05:00
sessionUpdateObject =
submitted: true
submittedCode: sessionToUpdate.code
submitDate: new Date()
2014-02-18 14:46:14 -05:00
matches: []
meanStrength: 25
standardDeviation: 25/3
totalScore: 10
2014-02-26 20:30:56 -05:00
numberOfWinsAndTies: 0
numberOfLosses: 0
2014-02-18 14:46:14 -05:00
LevelSession.update {_id: sessionToUpdate._id}, sessionUpdateObject, callback
2014-02-17 17:39:21 -05:00
2014-02-26 15:14:02 -05:00
fetchInitialSessionsToRankAgainst = (opposingTeam, callback) ->
console.log "Fetching sessions to rank against for opposing team #{opposingTeam}"
findParameters =
"level.original": "52d97ecd32362bc86e004e87"
"level.majorVersion": 0
2014-02-17 17:39:21 -05:00
submitted: true
submittedCode:
$exists: true
2014-02-26 15:14:02 -05:00
team: opposingTeam
sortParameters =
totalScore: 1
limitNumber = 1
query = LevelSession.find(findParameters)
.sort(sortParameters)
.limit(limitNumber)
query.exec callback
2014-02-17 17:39:21 -05:00
generateTaskPairs = (submittedSessions, sessionToScore) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionToScore.team
if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams
console.log "Adding game to taskPairs!"
2014-02-18 14:46:14 -05:00
taskPairs.push [sessionToScore._id,String session._id]
return taskPairs
2014-02-17 17:39:21 -05:00
sendTaskPairToQueue = (taskPair, callback) ->
2014-02-18 14:46:14 -05:00
scoringTaskQueue.sendMessage {sessions: taskPair}, 0, (err,data) -> callback? err,data
2014-02-17 17:39:21 -05:00
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
isUserAdmin = (req) -> return Boolean(req.user?.isAdmin())
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
return messageBody = JSON.parse message.getBody()
catch e
sendResponseObject req, res, {"error":"There was an error parsing the task.Error: #{e}" }
2014-02-07 18:52:24 -05:00
return null
constructTaskObject = (taskMessageBody, callback) ->
async.map taskMessageBody.sessions, getSessionInformation, (err, sessions) ->
return callback err, data if err?
taskObject =
"messageGenerated": Date.now()
2014-02-07 18:52:24 -05:00
"sessions": []
for session in sessions
sessionInformation =
2014-02-17 17:39:21 -05:00
"sessionID": session._id
"submitDate": session.submitDate
"team": session.team ? "No team"
2014-02-17 17:39:21 -05:00
"code": session.submittedCode
2014-02-10 20:09:51 -05:00
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
2014-02-18 14:46:14 -05:00
"creator": session.creator
"creatorName":session.creatorName
2014-02-10 20:09:51 -05:00
2014-02-07 18:52:24 -05:00
taskObject.sessions.push sessionInformation
callback err, taskObject
getSessionInformation = (sessionIDString, callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {_id:sessionIDString}, (err, session) ->
if err? then return callback err, {"error":"There was an error retrieving the session."}
2014-02-17 17:39:21 -05:00
sessionInformation = session.toObject()
callback err, sessionInformation
constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
2014-02-07 18:52:24 -05:00
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
2014-02-07 18:52:24 -05:00
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
2014-02-17 17:39:21 -05:00
updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
2014-02-17 17:39:21 -05:00
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
2014-02-18 14:46:14 -05:00
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
2014-02-18 14:46:14 -05:00
async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
session = session.toObject()
2014-02-13 15:59:21 -05:00
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
2014-02-18 14:46:14 -05:00
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
2014-02-13 15:59:21 -05:00
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
2014-02-18 14:46:14 -05:00
scoreObject[session.sessionID].gameRanking = session.metrics.rank for session in taskObject.sessions
return scoreObject
2014-02-17 17:39:21 -05:00
retrieveOldSessionData = (sessionID, callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {"_id":sessionID}, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
oldScoreObject =
2014-02-18 14:46:14 -05:00
"standardDeviation":session.standardDeviation ? 25/3
"meanStrength":session.meanStrength ? 25
2014-02-18 14:46:14 -05:00
"totalScore":session.totalScore ? (25 - 1.8*(25/3))
2014-02-08 13:18:41 -05:00
"id": sessionID
callback err, oldScoreObject