Merge pull request #385 from codecombat/redis

Merge redis into master
This commit is contained in:
Michael Schmatz 2014-02-18 12:53:57 -08:00
commit dc70fec977
4 changed files with 157 additions and 200 deletions

View file

@ -70,8 +70,9 @@ module.exports = class Simulator
@sendResultsBackToServer taskResults
sendResultsBackToServer: (results) =>
console.log "Sending result back to server!"
$.ajax
url: @taskURL
url: "/queue/scoring"
data: results
type: "PUT"
success: @handleTaskResultsTransferSuccess
@ -104,7 +105,8 @@ module.exports = class Simulator
for session in @task.getSessions()
sessionResult =
sessionID: session.sessionID
sessionChangedTime: session.sessionChangedTime
submitDate: session.submitDate
creator: session.creator
metrics:
rank: @calculateSessionRank session.sessionID, simulationResults.goalStates, @task.generateTeamToSessionMap()

View file

@ -61,7 +61,9 @@
"stream-buffers": "0.2.x",
"sendwithus": "2.0.x",
"aws-sdk":"~2.0.0",
"bayesian-battle":"0.0.x"
"bayesian-battle":"0.0.x",
"hiredis":"",
"redis": ""
},
"devDependencies": {
"jade": "0.33.x",

View file

@ -175,9 +175,6 @@ _.extend LevelSessionSchema.properties,
title: 'Opponent User ID'
description: 'The user ID of an opponent'
type: ['object','string']
codeSubmitDate: c.date
title: 'Submitted'
description: 'The date which the opponent submitted their session (used to check if the match can be replayed)'
metrics:
type: 'object'
properties:

View file

@ -19,82 +19,165 @@ module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (err,data) ->
throwScoringQueueRegistrationError(err) if err?
queues.queueClient.registerQueue "scoring", {}, (error,data) ->
if error? then throw new Error "There was an error registering the scoring queue: #{error}"
scoringTaskQueue = data
log.info "Connected to scoring task queue!"
throwScoringQueueRegistrationError = (error) ->
log.error "There was an error registering the scoring queue: #{error}"
throw new Error "There was an error registering the scoring queue."
module.exports.createNewTask = (req, res) ->
return errors.forbidden res, "You need to be logged in to be added to the leaderboard" if isUserAnonymous req
return errors.badInput res, "The session ID is invalid" unless typeof req.body.session is "string"
LevelSession.findOne { "_id": req.body.session}, (err, sessionToScore) ->
return errors.serverError res, "There was an error finding the given session." if err?
sessionToScore = sessionToScore.toJSON()
console.log "Ranking session of team #{sessionToScore.team}"
requestSessionID = req.body.session
if isUserAnonymous req then return errors.forbidden res, "You need to be logged in to be added to the leaderboard"
return errors.badInput res, "The session ID is invalid" unless typeof requestSessionID is "string"
fetchSessionToSubmit requestSessionID, (err, sessionToSubmit) ->
if err? then return errors.serverError res, "There was an error finding the given session."
updateSessionToSubmit sessionToSubmit, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the session"
fetchSessionsToRankAgainst (err, sessionsToRankAgainst) ->
if err? then return errors.serverError res, "There was an error fetching the sessions to rank against"
taskPairs = generateTaskPairs(sessionsToRankAgainst, sessionToSubmit)
sendEachTaskPairToTheQueue taskPairs, (taskPairError) ->
if taskPairError? then return errors.serverError res, "There was an error sending the task pairs to the queue"
LevelSession.update { "_id": req.body.session}, {"submitted":true}, (err, data) ->
return errors.serverError res, "There was an error saving the submitted bool of the session." if err?
LevelSession.find { "levelID": "project-dota", "submitted": true}, (err, submittedSessions) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
console.log "Attemping to add session of team #{session.team} to taskPairs..."
if String(session._id) isnt req.body.session and session.team isnt sessionToScore.team and session.team in ["ogres","humans"]
console.log "Adding game to taskPairs!"
taskPairs.push [req.body.session,String session._id]
async.each taskPairs, sendTaskPairToQueue, (taskPairError) ->
return errors.serverError res, "There was an error sending the task pairs to the queue" if taskPairError?
sendResponseObject req, res, {"message":"All task pairs were succesfully sent to the queue"}
sendTaskPairToQueue = (taskPair, callback) ->
taskObject =
sessions: taskPair
scoringTaskQueue.sendMessage taskObject, 0, (err,data) ->
callback err,data
module.exports.dispatchTaskToConsumer = (req, res) ->
userID = getUserIDFromRequest req,res
return errors.forbidden res, "You need to be logged in to simulate games" if isUserAnonymous req
scoringTaskQueue.receiveMessage (taskQueueReceiveError, message) ->
if (not message?) or message.isEmpty() or taskQueueReceiveError?
return errors.gatewayTimeoutError res, "No messages were receieved from the queue. Msg:#{taskQueueReceiveError}"
if isUserAnonymous(req) then return errors.forbidden res, "You need to be logged in to simulate games"
scoringTaskQueue.receiveMessage (err, message) ->
if err? or messageIsInvalid(message) then return errors.gatewayTimeoutError res, "Queue Receive Error:#{err}"
console.log "Received Message"
messageBody = parseTaskQueueMessage req, res, message
return errors.serverError res, "There was an error parsing the queue message" unless messageBody?
return unless messageBody?
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
if taskConstructionError? then return errors.serverError res, "There was an error constructing the scoring task"
console.log "Constructed task body"
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds, (err) ->
if err? then return errors.serverError res, "There was an error changing the message visibility timeout."
console.log "Changed visibility timeout"
constructTaskLogObject getUserIDFromRequest(req),message.getReceiptHandle(), (taskLogError, taskLogObject) ->
if taskLogError? then return errors.serverError res, "There was an error creating the task log object."
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds
taskObject.taskID = taskLogObject._id
taskObject.receiptHandle = message.getReceiptHandle()
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
return errors.serverError res, "There was an error creating the task log object." if taskLogError?
sendResponseObject req, res, taskObject
setTaskObjectTaskLogID taskObject, taskLogObject._id
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
taskObject.receiptHandle = message.getReceiptHandle()
return unless clientResponseObject?
TaskLog.findOne {_id: clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
sendResponseObject req, res, taskObject
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
scoringTaskQueue.deleteMessage clientResponseObject.receiptHandle, (err) ->
if err? then return errors.badInput res, "The queue message is already back in the queue, rejecting results."
logTaskComputation clientResponseObject, taskLog, (logErr) ->
if logErr? then return errors.serverError res, "There as a problem logging the task computation: #{logErr}"
updateSessions clientResponseObject, (updateError, newScoreArray) ->
if updateError? then return errors.serverError res, "There was an error updating the scores.#{updateError}"
newScoresObject = _.indexBy newScoreArray, 'id'
addMatchToSessions clientResponseObject, newScoresObject, (err, data) ->
if err? then return errors.serverError res, "There was an error updating the sessions with the match! #{JSON.stringify err}"
console.log "Sending response object"
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
addMatchToSessions = (clientResponseObject, newScoreObject, callback) ->
matchObject = {}
matchObject.date = new Date()
matchObject.opponents = {}
for session in clientResponseObject.sessions
sessionID = session.sessionID
matchObject.opponents[sessionID] = {}
matchObject.opponents[sessionID].sessionID = sessionID
matchObject.opponents[sessionID].userID = session.creator
matchObject.opponents[sessionID].metrics = {}
matchObject.opponents[sessionID].metrics.rank = newScoreObject[sessionID].gameRanking
log.info "Match object computed, result: #{matchObject}"
log.info "Writing match object to database..."
#use bind with async to do the writes
sessionIDs = _.pluck clientResponseObject.sessions, 'sessionID'
async.each sessionIDs, updateMatchesInSession.bind(@,matchObject), (err) -> callback err, null
updateMatchesInSession = (matchObject, sessionID, callback) ->
currentMatchObject = {}
currentMatchObject.date = matchObject.date
currentMatchObject.metrics = matchObject.opponents[sessionID].metrics
opponentsClone = _.cloneDeep matchObject.opponents
opponentsClone = _.omit opponentsClone, sessionID
opponentsArray = _.toArray opponentsClone
currentMatchObject.opponents = opponentsArray
sessionUpdateObject =
$push: {matches: currentMatchObject}
log.info "Updating session #{sessionID}"
LevelSession.update {"_id":sessionID}, sessionUpdateObject, callback
messageIsInvalid = (message) -> (not message?) or message.isEmpty()
sendEachTaskPairToTheQueue = (taskPairs, callback) -> async.each taskPairs, sendTaskPairToQueue, callback
fetchSessionToSubmit = (submittedSessionID, callback) ->
LevelSession.findOne {_id: submittedSessionID}, (err, session) -> callback err, session?.toObject()
updateSessionToSubmit = (sessionToUpdate, callback) ->
sessionUpdateObject =
submitted: true
submittedCode: sessionToUpdate.code
submitDate: new Date()
matches: []
meanStrength: 25
standardDeviation: 25/3
totalScore: 10
LevelSession.update {_id: sessionToUpdate._id}, sessionUpdateObject, callback
fetchSessionsToRankAgainst = (callback) ->
submittedSessionsQuery =
levelID: "project-dota"
submitted: true
submittedCode:
$exists: true
LevelSession.find submittedSessionsQuery, callback
generateTaskPairs = (submittedSessions, sessionToScore) ->
taskPairs = []
for session in submittedSessions
session = session.toObject()
teams = ['ogres','humans']
opposingTeams = _.pull teams, sessionToScore.team
if String(session._id) isnt String(sessionToScore._id) and session.team in opposingTeams
console.log "Adding game to taskPairs!"
taskPairs.push [sessionToScore._id,String session._id]
return taskPairs
sendTaskPairToQueue = (taskPair, callback) ->
scoringTaskQueue.sendMessage {sessions: taskPair}, 0, (err,data) -> callback? err,data
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.get('anonymous') else return true
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
return messageBody = JSON.parse message.getBody()
catch e
sendResponseObject req, res, {"error":"There was an error parsing the task.Error: #{e}" }
@ -110,31 +193,23 @@ constructTaskObject = (taskMessageBody, callback) ->
for session in sessions
sessionInformation =
"sessionID": session.sessionID
"sessionChangedTime": session.changed
"sessionID": session._id
"submitDate": session.submitDate
"team": session.team ? "No team"
"code": session.code
"code": session.submittedCode
"teamSpells": session.teamSpells ? {}
"levelID": session.levelID
"creator": session.creator
taskObject.sessions.push sessionInformation
callback err, taskObject
getSessionInformation = (sessionIDString, callback) ->
LevelSession.findOne {"_id": sessionIDString }, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
sessionInformation =
"sessionID": session._id
"code": _.cloneDeep session.code
"changed": session.changed
"creator": session.creator
"team": session.team
"teamSpells": session.teamSpells
"levelID": session.levelID
LevelSession.findOne {_id:sessionIDString}, (err, session) ->
if err? then return callback err, {"error":"There was an error retrieving the session."}
sessionInformation = session.toObject()
callback err, sessionInformation
@ -144,52 +219,17 @@ constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) -
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
res.end()
module.exports.processTaskResult = (req, res) ->
clientResponseObject = verifyClientResponse req.body, res
if clientResponseObject?
TaskLog.findOne {"_id": clientResponseObject.taskID}, (err, taskLog) ->
return errors.serverError res, "There was an error retrieiving the task log object" if err?
taskLogJSON = taskLog.toObject()
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
destroyQueueMessage clientResponseObject.receiptHandle, (err) ->
return errors.badInput res, "The queue message is already back in the queue, rejecting results." if err?
logTaskComputation clientResponseObject, taskLog, (loggingError) ->
if loggingError?
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}"
updateSessions clientResponseObject, (updatingScoresError, newScores) ->
if updatingScoresError?
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}"
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
destroyQueueMessage = (receiptHandle, callback) -> scoringTaskQueue.deleteMessage receiptHandle, callback
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
@ -197,7 +237,6 @@ verifyClientResponse = (responseObject, res) ->
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
@ -208,127 +247,44 @@ updateSessions = (taskObject,callback) ->
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldSessionData, (err, oldScores) ->
callback err, {"error": "There was an error retrieving the old scores"} if err?
if err? then callback err, {"error": "There was an error retrieving the old scores"}
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
saveNewScoresToDatabase newScoreArray, callback
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) ->
if err? then callback err, null else callback err, {"message":"All scores were saved successfully."}
async.eachSeries newScoreArray, updateScoreInSession, (err) -> callback err,newScoreArray
updateScoreInSession = (scoreObject,callback) ->
sessionObjectQuery =
"_id": scoreObject.id
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
if err? then return callback err, null
LevelSession.findOne sessionObjectQuery, (err, session) ->
return callback err, null if err?
session = session.toObject()
updateObject =
meanStrength: scoreObject.meanStrength
standardDeviation: scoreObject.standardDeviation
totalScore: scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
LevelSession.update {"_id": scoreObject.id}, updateObject, callback
log.info "New total score for session #{scoreObject.id} is #{updateObject.totalScore}"
LevelSession.update sessionObjectQuery, updateObject, callback
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
for session in taskObject.sessions
scoreObject[session.sessionID].gameRanking = session.metrics.rank
scoreObject
scoreObject[session.sessionID].gameRanking = session.metrics.rank for session in taskObject.sessions
return scoreObject
retrieveOldSessionData = (sessionID, callback) ->
sessionQuery =
"_id":sessionID
LevelSession.findOne sessionQuery, (err, session) ->
LevelSession.findOne {"_id":sessionID}, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
session = session.toObject()
defaultScore = (25 - 1.8*(25/3))
defaultStandardDeviation = 25/3
oldScoreObject =
"standardDeviation":session.standardDeviation ? defaultStandardDeviation
"standardDeviation":session.standardDeviation ? 25/3
"meanStrength":session.meanStrength ? 25
"totalScore":session.totalScore ? defaultScore
"totalScore":session.totalScore ? (25 - 1.8*(25/3))
"id": sessionID
callback err, oldScoreObject
###Sample Messages
sampleQueueMessage =
{
"sessions": ["52dea9b77e486eeb97000001","52d981a73cf02dcf260003cb"]
}
sampleUndoneTaskObject =
"taskID": "507f191e810c19729de860ea"
"sessions" : [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"humans"
"code": "code goes here"
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"team":"ogres"
"code": "code goes here"
}
]
sampleResponseObject =
"taskID": "507f191e810c19729de860ea"
"calculationTime":3201
"sessions": [
{
"ID":"52dfeb17c8b5f435c7000025"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":2
}
},
{
"ID":"51eb2714fa058cb20d00fedg"
"sessionChangedTime": "2014-01-22T16:28:12.450Z"
"metrics": {
"rank":1
}
}
]
sampleTaskLogObject=
{
"_id":ObjectId("507f191e810c19729de860ea") #datestamp is built into objectId
"calculatedBy":ObjectId("51eb2714fa058cb20d0006ef")
"calculationTime":3201
timedOut: false
"sessions":[
{
"ID":ObjectId("52dfeb17c8b5f435c7000025")
"metrics": {
"rank":2
}
},
{
"ID":ObjectId("51eb2714fa058cb20d00feda")
"metrics": {
"rank":1
}
}
]
}
###