Implemented task logging and new score calculations

This commit is contained in:
Michael Schmatz 2014-02-08 10:11:43 -08:00
parent 350384f5cb
commit b9569a310a
4 changed files with 67 additions and 24 deletions

View file

@ -61,7 +61,7 @@
"stream-buffers": "0.2.x", "stream-buffers": "0.2.x",
"sendwithus": "2.0.x", "sendwithus": "2.0.x",
"aws-sdk":"~2.0.0", "aws-sdk":"~2.0.0",
"mubsub": "~0.2.1" "bayesian-battle":"0.0.x"
}, },
"devDependencies": { "devDependencies": {
"jade": "0.33.x", "jade": "0.33.x",

View file

@ -39,4 +39,5 @@ module.exports.gatewayTimeoutError = (res, message="Gateway timeout") ->
res.end() res.end()
module.exports.clientTimeout = (res, message="The server did not recieve the client response in a timely manner") -> module.exports.clientTimeout = (res, message="The server did not recieve the client response in a timely manner") ->
res.send 408 res.send 408, message
res.end()

View file

@ -9,9 +9,10 @@ mongoose = require 'mongoose'
queues = require '../commons/queue' queues = require '../commons/queue'
LevelSession = require '../levels/sessions/LevelSession' LevelSession = require '../levels/sessions/LevelSession'
TaskLog = require './task/ScoringTask' TaskLog = require './task/ScoringTask'
bayes = new (require 'bayesian-battle')()
scoringTaskQueue = undefined scoringTaskQueue = undefined
scoringTaskTimeoutInSeconds = 20 scoringTaskTimeoutInSeconds = 400
connectToScoringQueue = -> connectToScoringQueue = ->
queues.initializeQueueClient -> queues.initializeQueueClient ->
@ -43,16 +44,20 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
constructTaskObject messageBody, (taskConstructionError, taskObject) -> constructTaskObject messageBody, (taskConstructionError, taskObject) ->
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError? return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds + 10 #10 seconds processing time taskProcessingTimeInSeconds = 10
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds + taskProcessingTimeInSeconds
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) -> constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
return errors.serverError res, "There was an error creating the task log object." if taskLogError? return errors.serverError res, "There was an error creating the task log object." if taskLogError?
taskObject.taskLogID = taskLogObject._id setTaskObjectTaskLogID taskObject, taskLogObject._id
sendResponseObject req, res, taskObject sendResponseObject req, res, taskObject
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
parseTaskQueueMessage = (req, res, message) -> parseTaskQueueMessage = (req, res, message) ->
try try
return messageBody = JSON.parse message.getBody() return messageBody = JSON.parse message.getBody()
@ -96,7 +101,7 @@ getSessionInformation = (sessionIDString, callback) ->
LevelSession.findOne {"_id": sessionIDString }, (err, session) -> LevelSession.findOne {"_id": sessionIDString }, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err? return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toJSON() session = session.toObject()
sessionInformation = sessionInformation =
"sessionID": session._id "sessionID": session._id
"code": _.cloneDeep session.code "code": _.cloneDeep session.code
@ -114,37 +119,75 @@ sendResponseObject = (req,res,object) ->
res.end() res.end()
module.exports.processTaskResult = (req, res) -> module.exports.processTaskResult = (req, res) ->
clientResponseObject = parseClientResponseObject req, res clientResponseObject = verifyClientResponse req.body, res
if clientResponseObject? if clientResponseObject?
TaskLog.findOne {"_id": clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut clientResponseObject taskLogJSON = taskLog.toObject()
logTaskComputation clientResponseObject return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
updateScores clientResponseObject return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
logTaskComputation clientResponseObject, taskLog, (loggingError) ->
errors.serverError res, "There as a problem logging the task computation" if loggingError?
updateScores clientResponseObject, (updatingScoresError, newScores) ->
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}" if updatingScoresError?
sendResponseObject req, res, newScores
hasTaskTimedOut = (taskBody) ->
taskBody.messageGenerated + scoringTaskTimeoutInSeconds < Date.now() hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
handleTimedOutTask = (req, res, taskBody) -> handleTimedOutTask = (req, res, taskBody) ->
errors.clientError res, "The task results were not provided within a timely manner" errors.clientTimeout res, "The task results were not provided within a timely manner"
verifyClientResponse = (responseObject, res) ->
parseClientResponseObject = (req, res) -> unless typeof responseObject is "object"
try errors.badInput res, "The response to that query is required to be a JSON object."
return JSON.parse req.body
catch e
errors.badInput res, "Unprocessable task response object."
return null return null
return responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save (err) -> callback err
winston.info "Game successfully simulated in #{taskObject.calculationTimeMS}, logging result"
updateScores = (taskObject,callback) ->
winston.info "Updating scores"
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldScoreMetrics, (err, oldScores) ->
callback err, {"error": "There was an error retrieving the old scores"} if err?
oldScores = _.indexBy oldScores, 'id'
for sessions in taskObject.sessions
oldScores[session.ID].gameRanking = session.metrics.rank
newScores = bayes.updatePlayerSkills oldScores
callback err, newScores
logTaskComputation = (taskObject) ->
return return
updateScores = (taskObject) -> retrieveOldScoreMetrics = (sessionID, callback) ->
return LevelSession.findOne {"_id":sessionID}, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
oldScoreObject =
"standardDeviation":session.standardDeviation ? 25/3
"meanStrength":session.meanStrength ? 25
"totalScore":session.totalScore ? 25 - 1.8*(25/3)
"id" = sessionID
callback err, oldScoreObject
@ -158,7 +201,6 @@ sampleQueueMessage =
sampleUndoneTaskObject = sampleUndoneTaskObject =
"taskID": "507f191e810c19729de860ea" "taskID": "507f191e810c19729de860ea"
"messageGenerated": 1391811773418
"sessions" : [ "sessions" : [
{ {
"ID":"52dfeb17c8b5f435c7000025" "ID":"52dfeb17c8b5f435c7000025"

View file

@ -2,7 +2,7 @@ mongoose = require('mongoose')
ScoringTaskSchema = new mongoose.Schema( ScoringTaskSchema = new mongoose.Schema(
calculator: {type:mongoose.Schema.Types.ObjectId} calculator: {type:mongoose.Schema.Types.ObjectId}
sentDate: {type: Date} sentDate: {type: Number}
messageIdentifierString: {type: String} messageIdentifierString: {type: String}
calculationTimeMS: {type: Number, default: 0} calculationTimeMS: {type: Number, default: 0}
sessions: {type: Array, default: []} sessions: {type: Array, default: []}