Refactored scoring

This commit is contained in:
Michael Schmatz 2014-02-17 14:39:21 -08:00
parent 0fb2018f2a
commit 75d8888c92
2 changed files with 130 additions and 170 deletions

View file

@ -61,7 +61,9 @@
"stream-buffers": "0.2.x",
"sendwithus": "2.0.x",
"aws-sdk":"~2.0.0",
"bayesian-battle":"0.0.x"
"bayesian-battle":"0.0.x",
"hiredis":"",
"redis": ""
},
"devDependencies": {
"jade": "0.33.x",

View file

@ -19,78 +19,138 @@ module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (err,data) ->
throwScoringQueueRegistrationError(err) if err?
queues.queueClient.registerQueue "scoring", {}, (error,data) ->
if error? then throw new Error "There was an error registering the scoring queue: #{error}"
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
throwScoringQueueRegistrationError = (error) ->
log.error "There was an error registering the scoring queue: #{error}"
throw new Error "There was an error registering the scoring queue."
module.exports.createNewTask = (req, res) ->
return errors.forbidden res, "You need to be logged in to be added to the leaderboard" if isUserAnonymous req
return errors.badInput res, "The session ID is invalid" unless typeof req.body.session is "string"
LevelSession.findOne { "_id": req.body.session}, (err, sessionToScore) ->
return errors.serverError res, "There was an error finding the given session." if err?
sessionToScore = sessionToScore.toJSON()
console.log "Ranking session of team #{sessionToScore.team}"
requestSessionID = getSessionIDFromRequest req
if isUserAnonymous req then return errors.forbidden res, "You need to be logged in to be added to the leaderboard"
return errors.badInput res, "The session ID is invalid" unless requestSessionID is "string"
LevelSession.update { "_id": req.body.session}, {"submitted":true}, (err, data) ->
return errors.serverError res, "There was an error saving the submitted bool of the session." if err?
LevelSession.find { "levelID": "project-dota", "submitted": true}, (err, submittedSessions) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
console.log "Attemping to add session of team #{session.team} to taskPairs..."
if String(session._id) isnt req.body.session and session.team isnt sessionToScore.team and session.team in ["ogres","humans"]
console.log "Adding game to taskPairs!"
taskPairs.push [req.body.session,String session._id]
async.each taskPairs, sendTaskPairToQueue, (taskPairError) ->
return errors.serverError res, "There was an error sending the task pairs to the queue" if taskPairError?
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
fetchSubmittedSession requestSessionID, (err, sessionToScore) ->
if err? then return errors.serverError res, "There was an error finding the given session."
updateSubmittedSession sessionToScore, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the session"
fetchSessionsToRankAgainst (err, submittedSessions) ->
if err? then return errors.serverError res, "There was an error fetching the sessions to rank against"
taskPairs = generateTaskPairs(submittedSessions, sessionToScore)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return errors.serverError res, "There was an error sending the task pairs to the queue"
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
module.exports.dispatchTaskToConsumer = (req, res) ->
if isUserAnonymous(req) then return errors.forbidden res, "You need to be logged in to simulate games"
scoringTaskQueue.receiveMessage (err, message) ->
if err? or not messageIsInvalid(message) then return errors.gatewayTimeoutError res, "Queue Receive Error:#{err}"
messageBody = parseTaskQueueMessage req, res, message
return unless messageBody?
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
if taskConstructionError? then return errors.serverError res, "There was an error constructing the scoring task"
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) ->
if err? then return errors.serverError res, "There was an error changing the message visibility timeout."
constructTaskLogObject getUserIDFromRequest(req),message.getReceiptHandle(), (taskLogError, taskLogObject) ->
if taskLogError? then return errors.serverError res, "There was an error creating the task log object."
taskObject.taskID = taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
sendResponseObject req, res, taskObject
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
return unless clientResponseObject?
taskLogQuery = _id: clientResponseObject.taskID
TaskLog.findOne taskLogQuery, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
destroyQueueMessage clientResponseObject.receiptHandle, (err) ->
return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err?
logTaskComputation clientResponseObject, taskLog, (loggingErr) ->
return errors.serverError res, "There as a problem logging the task computation: #{loggingErr}" if loggingErr?
updateSessions clientResponseObject, (updateError, newScores) ->
return errors.serverError res, "There was an error updating the scores.#{updateError}" if updateError?
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
messageIsInvalid = (message) -> (not message?) or message.isEmpty()
getSessionIDFromRequest = (req) -> req.body.session
sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback
fetchSubmittedSession = (submittedSessionID, callback) ->
sessionQuery =
_id: submittedSessionID
LevelSession.findOne sessionQuery, (err, session) -> callback err, session?.toObject()
updateSubmittedSession = (sessionToUpdate, callback) ->
sessionQuery =
_id: sessionToUpdate._id
sessionUpdateObject =
submitted: true
submittedCode: sessionToUpdate.code
submitDate: new Date()
LevelSession.update sessionQuery, sessionUpdateObject, callback
fetchSessionsToRankAgainst = (callback) ->
submittedSessionsQuery =
levelID: "project-dota"
submitted: true
submittedCode:
$exists: true
LevelSession.find submittedSessionsQuery, callback
generateTaskPairs = (submittedSessions, sessionToScore) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionToScore.team
if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams
console.log "Adding game to taskPairs!"
taskPairs.push [req.body.session,String session._id]
taskPairs
sendTaskPairToQueue = (taskPair, callback) ->
taskObject =
sessions: taskPair
scoringTaskQueue.sendMessage taskObject, 0, (err,data) ->
callback err,data
module.exports.dispatchTaskToConsumer = (req, res) ->
userID = getUserIDFromRequest req,res
return errors.forbidden res, "You need to be logged in to simulate games" if isUserAnonymous req
scoringTaskQueue.receiveMessage (taskQueueReceiveError, message) ->
if (not message?) or message.isEmpty() or taskQueueReceiveError?
return errors.gatewayTimeoutError res, "No messages were receieved from the queue. Msg:#{taskQueueReceiveError}"
messageBody = parseTaskQueueMessage req, res, message
return errors.serverError res, "There was an error parsing the queue message" unless messageBody?
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
return errors.serverError res, "There was an error creating the task log object." if taskLogError?
setTaskObjectTaskLogID taskObject, taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
sendResponseObject req, res, taskObject
callback? err,data
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
@ -110,10 +170,10 @@ constructTaskObject = (taskMessageBody, callback) ->
for session in sessions
sessionInformation =
"sessionID": session.sessionID
"sessionChangedTime": session.changed
"sessionID": session._id
"submitDate": session.submitDate
"team": session.team ? "No team"
"code": session.code
"code": session.submittedCode
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
@ -122,19 +182,15 @@ constructTaskObject = (taskMessageBody, callback) ->
getSessionInformation = (sessionIDString, callback) ->
LevelSession.findOne {"_id": sessionIDString }, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
sessionQuery
_id:sessionIDString
session = session.toObject()
sessionInformation =
"sessionID": session._id
"code": _.cloneDeep session.code
"changed": session.changed
"creator": session.creator
"team": session.team
"teamSpells": session.teamSpells
"levelID": session.levelID
LevelSession.findOne sessionQuery, (err, session) ->
levelSessionFindOneError =
"error":"There was an error retrieving the session."
return callback err, levelSessionFindOneError if err?
sessionInformation = session.toObject()
callback err, sessionInformation
@ -147,45 +203,14 @@ constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) -
taskLogObject.save callback
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
if clientResponseObject?
TaskLog.findOne {"_id": clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
destroyQueueMessage clientResponseObject.receiptHandle, (err) ->
return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err?
logTaskComputation clientResponseObject, taskLog, (loggingError) ->
if loggingError?
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}"
updateScores clientResponseObject, (updatingScoresError, newScores) ->
if updatingScoresError?
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}"
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
destroyQueueMessage = (receiptHandle, callback) -> scoringTaskQueue.deleteMessage receiptHandle, callback
@ -197,17 +222,15 @@ verifyClientResponse = (responseObject, res) ->
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save callback
updateScores = (taskObject,callback) ->
updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldScoreMetrics, (err, oldScores) ->
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
callback err, {"error": "There was an error retrieving the old scores"} if err?
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
@ -245,7 +268,7 @@ putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject
retrieveOldScoreMetrics = (sessionID, callback) ->
retrieveOldSessionData = (sessionID, callback) ->
sessionQuery =
"_id":sessionID
@ -267,68 +290,3 @@ retrieveOldScoreMetrics = (sessionID, callback) ->
###Sample Messages
sampleQueueMessage =
{
"sessions": ["52dea9b77e486eeb97000001","52d981a73cf02dcf260003cb"]
}
sampleUndoneTaskObject =
"taskID": "507f191e810c19729de860ea"
"sessions" : [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"humans"
"code": "code goes here"
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"ogres"
"code": "code goes here"
}
]
sampleResponseObject =
"taskID": "507f191e810c19729de860ea"
"calculationTime":3201
"sessions": [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":2
}
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":1
}
}
]
sampleTaskLogObject=
{
"_id":ObjectId("507f191e810c19729de860ea") #datestamp is built into objectId
"calculatedBy":ObjectId("51eb2714fa058cb20d0006ef")
"calculationTime":3201
timedOut: false
"sessions":[
{
"ID":ObjectId("52dfeb17c8b5f435c7000025")
"metrics": {
"rank":2
}
},
{
"ID":ObjectId("51eb2714fa058cb20d00feda")
"metrics": {
"rank":1
}
}
]
}
###