codecombat/server/queues/scoring.coffee

334 lines
11 KiB
CoffeeScript
Raw Normal View History

2014-02-05 12:39:14 -05:00
config = require '../../server_config'
log = require 'winston'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
async = require 'async'
errors = require '../commons/errors'
aws = require 'aws-sdk'
2014-02-06 16:25:11 -05:00
db = require './../routes/db'
2014-02-05 12:39:14 -05:00
mongoose = require 'mongoose'
queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession'
2014-02-07 18:52:24 -05:00
TaskLog = require './task/ScoringTask'
bayes = new (require 'bayesian-battle')()
2014-02-05 12:39:14 -05:00
scoringTaskQueue = undefined
scoringTaskTimeoutInSeconds = 400
module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (err,data) ->
throwScoringQueueRegistrationError(err) if err?
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
throwScoringQueueRegistrationError = (error) ->
log.error "There was an error registering the scoring queue: #{error}"
throw new Error "There was an error registering the scoring queue."
module.exports.createNewTask = (req, res) ->
return errors.forbidden res, "You need to be logged in to be added to the leaderboard" if isUserAnonymous req
return errors.badInput res, "The session ID is invalid" unless typeof req.body.session is "string"
LevelSession.findOne { "_id": req.body.session}, (err, sessionToScore) ->
return errors.serverError res, "There was an error finding the given session." if err?
2014-02-13 18:42:00 -05:00
sessionToScore = sessionToScore.toJSON()
console.log "Ranking session of team #{sessionToScore.team}"
LevelSession.update { "_id": req.body.session}, {"submitted":true}, (err, data) ->
return errors.serverError res, "There was an error saving the submitted bool of the session." if err?
LevelSession.find { "levelID": "project-dota", "submitted": true}, (err, submittedSessions) ->
taskPairs = []
for session in submittedSessions
2014-02-13 19:28:57 -05:00
session = session.toObject()
console.log "Attemping to add session of team #{session.team} to taskPairs..."
if String(session._id) isnt req.body.session and session.team isnt sessionToScore.team and session.team in ["ogres","humans"]
2014-02-13 19:28:57 -05:00
console.log "Adding game to taskPairs!"
taskPairs.push [req.body.session,String session._id]
async.each taskPairs, sendTaskPairToQueue, (taskPairError) ->
return errors.serverError res, "There was an error sending the task pairs to the queue" if taskPairError?
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
sendTaskPairToQueue = (taskPair, callback) ->
taskObject =
sessions: taskPair
scoringTaskQueue.sendMessage taskObject, 0, (err,data) ->
callback err,data
2014-02-05 12:39:14 -05:00
2014-02-06 16:25:11 -05:00
module.exports.dispatchTaskToConsumer = (req, res) ->
userID = getUserIDFromRequest req,res
return errors.forbidden res, "You need to be logged in to simulate games" if isUserAnonymous req
2014-02-07 18:52:24 -05:00
scoringTaskQueue.receiveMessage (taskQueueReceiveError, message) ->
if (not message?) or message.isEmpty() or taskQueueReceiveError?
return errors.gatewayTimeoutError res, "No messages were receieved from the queue. Msg:#{taskQueueReceiveError}"
2014-02-06 16:25:11 -05:00
2014-02-13 15:59:21 -05:00
messageBody = parseTaskQueueMessage req, res, message
return errors.serverError res, "There was an error parsing the queue message" unless messageBody?
2014-02-06 16:25:11 -05:00
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
2014-02-07 18:52:24 -05:00
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds
2014-02-07 18:52:24 -05:00
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
return errors.serverError res, "There was an error creating the task log object." if taskLogError?
setTaskObjectTaskLogID taskObject, taskLogObject._id
2014-02-07 18:52:24 -05:00
2014-02-13 15:59:21 -05:00
taskObject.receiptHandle = message.getReceiptHandle()
2014-02-07 18:52:24 -05:00
sendResponseObject req, res, taskObject
2014-02-05 12:39:14 -05:00
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
return messageBody = JSON.parse message.getBody()
catch e
sendResponseObject req, res, {"error":"There was an error parsing the task.Error: #{e}" }
2014-02-07 18:52:24 -05:00
return null
constructTaskObject = (taskMessageBody, callback) ->
async.map taskMessageBody.sessions, getSessionInformation, (err, sessions) ->
return callback err, data if err?
taskObject =
"messageGenerated": Date.now()
2014-02-07 18:52:24 -05:00
"sessions": []
for session in sessions
sessionInformation =
"sessionID": session.sessionID
"sessionChangedTime": session.changed
"team": session.team ? "No team"
"code": session.code
2014-02-10 20:09:51 -05:00
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
2014-02-07 18:52:24 -05:00
taskObject.sessions.push sessionInformation
callback err, taskObject
getSessionInformation = (sessionIDString, callback) ->
LevelSession.findOne {"_id": sessionIDString }, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
sessionInformation =
"sessionID": session._id
"code": _.cloneDeep session.code
"changed": session.changed
"creator": session.creator
"team": session.team
2014-02-10 20:09:51 -05:00
"teamSpells": session.teamSpells
"levelID": session.levelID
callback err, sessionInformation
constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
if clientResponseObject?
TaskLog.findOne {"_id": clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
2014-02-07 18:52:24 -05:00
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
2014-02-13 15:59:21 -05:00
destroyQueueMessage clientResponseObject.receiptHandle, (err) ->
return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err?
2014-02-13 15:59:21 -05:00
logTaskComputation clientResponseObject, taskLog, (loggingError) ->
if loggingError?
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}"
2014-02-13 15:59:21 -05:00
updateScores clientResponseObject, (updatingScoresError, newScores) ->
if updatingScoresError?
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}"
2014-02-13 15:59:21 -05:00
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
2014-02-07 18:52:24 -05:00
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
2014-02-07 18:52:24 -05:00
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
2014-02-07 18:52:24 -05:00
2014-02-13 15:59:21 -05:00
destroyQueueMessage = (receiptHandle, callback) -> scoringTaskQueue.deleteMessage receiptHandle, callback
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
updateScores = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldScoreMetrics, (err, oldScores) ->
callback err, {"error": "There was an error retrieving the old scores"} if err?
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) ->
if err? then callback err, null else callback err, {"message":"All scores were saved successfully."}
updateScoreInSession = (scoreObject,callback) ->
sessionObjectQuery =
"_id": scoreObject.id
LevelSession.findOne sessionObjectQuery, (err, session) ->
return callback err, null if err?
session = session.toObject()
2014-02-13 15:59:21 -05:00
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
LevelSession.update sessionObjectQuery, updateObject, callback
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
for session in taskObject.sessions
scoreObject[session.sessionID].gameRanking = session.metrics.rank
scoreObject
retrieveOldScoreMetrics = (sessionID, callback) ->
sessionQuery =
"_id":sessionID
LevelSession.findOne sessionQuery, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
2014-02-08 13:18:41 -05:00
defaultScore = (25 - 1.8*(25/3))
defaultStandardDeviation = 25/3
oldScoreObject =
2014-02-08 13:18:41 -05:00
"standardDeviation":session.standardDeviation ? defaultStandardDeviation
"meanStrength":session.meanStrength ? 25
2014-02-08 13:18:41 -05:00
"totalScore":session.totalScore ? defaultScore
"id": sessionID
callback err, oldScoreObject
2014-02-05 12:39:14 -05:00
###Sample Messages
sampleQueueMessage =
{
"sessions": ["52dea9b77e486eeb97000001","52d981a73cf02dcf260003cb"]
}
sampleUndoneTaskObject =
"taskID": "507f191e810c19729de860ea"
"sessions" : [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"humans"
"code": "code goes here"
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"ogres"
"code": "code goes here"
}
]
sampleResponseObject =
"taskID": "507f191e810c19729de860ea"
"calculationTime":3201
"sessions": [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":2
}
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":1
}
}
]
sampleTaskLogObject=
{
"_id":ObjectId("507f191e810c19729de860ea") #datestamp is built into objectId
"calculatedBy":ObjectId("51eb2714fa058cb20d0006ef")
"calculationTime":3201
timedOut: false
"sessions":[
{
"ID":ObjectId("52dfeb17c8b5f435c7000025")
"metrics": {
"rank":2
}
},
{
"ID":ObjectId("51eb2714fa058cb20d00feda")
"metrics": {
"rank":1
}
}
]
}
###