codecombat/server/queues/scoring.coffee

291 lines
12 KiB
CoffeeScript
Raw Normal View History

2014-02-05 12:39:14 -05:00
config = require '../../server_config'
log = require 'winston'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
async = require 'async'
errors = require '../commons/errors'
aws = require 'aws-sdk'
2014-02-06 16:25:11 -05:00
db = require './../routes/db'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
2014-02-07 18:52:24 -05:00
TaskLog = require './task/ScoringTask'
bayes = new (require 'bayesian-battle')()
2014-02-05 12:39:14 -05:00
scoringTaskQueue = undefined
scoringTaskTimeoutInSeconds = 400
module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
2014-02-17 17:39:21 -05:00
queues.queueClient.registerQueue "scoring", {}, (error,data) ->
if error? then throw new Error "There was an error registering the scoring queue: #{error}"
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
module.exports.createNewTask = (req, res) ->
2014-02-18 14:46:14 -05:00
requestSessionID = req.body.session
2014-02-17 17:39:21 -05:00
if isUserAnonymous req then return errors.forbidden res, "You need to be logged in to be added to the leaderboard"
2014-02-18 14:46:14 -05:00
return errors.badInput res, "The session ID is invalid" unless typeof requestSessionID is "string"
2014-02-18 14:46:14 -05:00
fetchSessionToSubmit requestSessionID, (err, sessionToSubmit) ->
2014-02-17 17:39:21 -05:00
if err? then return errors.serverError res, "There was an error finding the given session."
2014-02-18 14:46:14 -05:00
updateSessionToSubmit sessionToSubmit, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the session"
2014-02-18 14:46:14 -05:00
fetchSessionsToRankAgainst (err, sessionsToRankAgainst) ->
if err? then return errors.serverError res, "There was an error fetching the sessions to rank against"
2014-02-05 12:39:14 -05:00
2014-02-18 14:46:14 -05:00
taskPairs = generateTaskPairs(sessionsToRankAgainst, sessionToSubmit)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return errors.serverError res, "There was an error sending the task pairs to the queue"
2014-02-07 18:52:24 -05:00
2014-02-18 14:46:14 -05:00
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
2014-02-17 17:39:21 -05:00
module.exports.dispatchTaskToConsumer = (req, res) ->
if isUserAnonymous(req) then return errors.forbidden res, "You need to be logged in to simulate games"
2014-02-06 16:25:11 -05:00
2014-02-17 17:39:21 -05:00
scoringTaskQueue.receiveMessage (err, message) ->
2014-02-18 14:46:14 -05:00
if err? or messageIsInvalid(message) then return errors.gatewayTimeoutError res, "Queue Receive Error:#{err}"
console.log "Received Message"
messageBody = parseTaskQueueMessage req, res, message
2014-02-17 17:39:21 -05:00
return unless messageBody?
2014-02-06 16:25:11 -05:00
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
2014-02-17 17:39:21 -05:00
if taskConstructionError? then return errors.serverError res, "There was an error constructing the scoring task"
2014-02-18 14:46:14 -05:00
console.log "Constructed task body"
2014-02-17 17:39:21 -05:00
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) ->
if err? then return errors.serverError res, "There was an error changing the message visibility timeout."
2014-02-18 14:46:14 -05:00
console.log "Changed visibility timeout"
2014-02-17 17:39:21 -05:00
constructTaskLogObject getUserIDFromRequest(req),message.getReceiptHandle(), (taskLogError, taskLogObject) ->
if taskLogError? then return errors.serverError res, "There was an error creating the task log object."
2014-02-07 18:52:24 -05:00
2014-02-17 17:39:21 -05:00
taskObject.taskID = taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
2014-02-07 18:52:24 -05:00
2014-02-17 17:39:21 -05:00
sendResponseObject req, res, taskObject
2014-02-13 15:59:21 -05:00
2014-02-17 17:39:21 -05:00
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
2014-02-05 12:39:14 -05:00
2014-02-18 14:46:14 -05:00
return unless clientResponseObject?
TaskLog.findOne {_id: clientResponseObject.taskID}, (err, taskLog) ->
2014-02-17 17:39:21 -05:00
return errors.serverError res, "There was an error retrieiving the task log object" if err?
2014-02-17 17:39:21 -05:00
taskLogJSON = taskLog.toObject()
2014-02-17 17:39:21 -05:00
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
2014-02-18 14:46:14 -05:00
scoringTaskQueue.deleteMessage clientResponseObject.receiptHandle, (err) ->
if err? then return errors.badInput res, "The queue message is already back in the queue, rejecting results."
logTaskComputation clientResponseObject, taskLog, (logErr) ->
if logErr? then return errors.serverError res, "There as a problem logging the task computation: #{logErr}"
updateSessions clientResponseObject, (updateError, newScoreArray) ->
if updateError? then return errors.serverError res, "There was an error updating the scores.#{updateError}"
newScoresObject = _.indexBy newScoreArray, 'id'
addMatchToSessions clientResponseObject, newScoresObject, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the sessions with the match! #{JSON.stringify err}"
console.log "Sending response object"
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
addMatchToSessions = (clientResponseObject, newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
2014-02-19 17:26:49 -05:00
matchObject.opponents[sessionID].metrics.rank = Number(newScoreObject[sessionID].gameRanking)
2014-02-18 14:46:14 -05:00
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
2014-02-17 17:39:21 -05:00
2014-02-18 14:46:14 -05:00
sessionUpdateObject =
$push: {matches: currentMatchObject}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
2014-02-17 17:39:21 -05:00
messageIsInvalid = (message) -> (not message?) or message.isEmpty()
sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback
2014-02-18 14:46:14 -05:00
fetchSessionToSubmit = (submittedSessionID, callback) ->
LevelSession.findOne {_id: submittedSessionID}, (err, session) -> callback err, session?.toObject()
2014-02-17 17:39:21 -05:00
2014-02-18 14:46:14 -05:00
updateSessionToSubmit = (sessionToUpdate, callback) ->
2014-02-17 17:39:21 -05:00
sessionUpdateObject =
submitted: true
submittedCode: sessionToUpdate.code
submitDate: new Date()
2014-02-18 14:46:14 -05:00
matches: []
meanStrength: 25
standardDeviation: 25/3
totalScore: 10
LevelSession.update {_id: sessionToUpdate._id}, sessionUpdateObject, callback
2014-02-17 17:39:21 -05:00
fetchSessionsToRankAgainst = (callback) ->
submittedSessionsQuery =
levelID: "project-dota"
submitted: true
submittedCode:
$exists: true
LevelSession.find submittedSessionsQuery, callback
generateTaskPairs = (submittedSessions, sessionToScore) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionToScore.team
if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams
console.log "Adding game to taskPairs!"
2014-02-18 14:46:14 -05:00
taskPairs.push [sessionToScore._id,String session._id]
return taskPairs
2014-02-17 17:39:21 -05:00
sendTaskPairToQueue = (taskPair, callback) ->
2014-02-18 14:46:14 -05:00
scoringTaskQueue.sendMessage {sessions: taskPair}, 0, (err,data) -> callback? err,data
2014-02-17 17:39:21 -05:00
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
return messageBody = JSON.parse message.getBody()
catch e
sendResponseObject req, res, {"error":"There was an error parsing the task.Error: #{e}" }
2014-02-07 18:52:24 -05:00
return null
constructTaskObject = (taskMessageBody, callback) ->
async.map taskMessageBody.sessions, getSessionInformation, (err, sessions) ->
return callback err, data if err?
taskObject =
"messageGenerated": Date.now()
2014-02-07 18:52:24 -05:00
"sessions": []
for session in sessions
sessionInformation =
2014-02-17 17:39:21 -05:00
"sessionID": session._id
"submitDate": session.submitDate
"team": session.team ? "No team"
2014-02-17 17:39:21 -05:00
"code": session.submittedCode
2014-02-10 20:09:51 -05:00
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
2014-02-18 14:46:14 -05:00
"creator": session.creator
2014-02-10 20:09:51 -05:00
2014-02-07 18:52:24 -05:00
taskObject.sessions.push sessionInformation
callback err, taskObject
getSessionInformation = (sessionIDString, callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {_id:sessionIDString}, (err, session) ->
if err? then return callback err, {"error":"There was an error retrieving the session."}
2014-02-17 17:39:21 -05:00
sessionInformation = session.toObject()
callback err, sessionInformation
constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
2014-02-07 18:52:24 -05:00
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
2014-02-07 18:52:24 -05:00
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
2014-02-17 17:39:21 -05:00
updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
2014-02-17 17:39:21 -05:00
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
2014-02-18 14:46:14 -05:00
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
2014-02-18 14:46:14 -05:00
async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
session = session.toObject()
2014-02-13 15:59:21 -05:00
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
2014-02-18 14:46:14 -05:00
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
2014-02-13 15:59:21 -05:00
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
2014-02-18 14:46:14 -05:00
scoreObject[session.sessionID].gameRanking = session.metrics.rank for session in taskObject.sessions
return scoreObject
2014-02-17 17:39:21 -05:00
retrieveOldSessionData = (sessionID, callback) ->
2014-02-18 14:46:14 -05:00
LevelSession.findOne {"_id":sessionID}, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
oldScoreObject =
2014-02-18 14:46:14 -05:00
"standardDeviation":session.standardDeviation ? 25/3
"meanStrength":session.meanStrength ? 25
2014-02-18 14:46:14 -05:00
"totalScore":session.totalScore ? (25 - 1.8*(25/3))
2014-02-08 13:18:41 -05:00
"id": sessionID
callback err, oldScoreObject