More progress with queues and client-side simulation

This commit is contained in:
Michael Schmatz 2014-02-10 13:18:39 -08:00
parent 824af18e39
commit 465ea22b1a
8 changed files with 110 additions and 65 deletions

View file

@ -21,6 +21,7 @@ module.exports = class LevelBus extends Bus
'thang-code-ran': 'onCodeRan'
'level-show-victory': 'onVictory'
'tome:spell-changed': 'onSpellChanged'
'tome:spell-created': 'onSpellCreated'
constructor: ->
super(arguments...)
@ -88,15 +89,28 @@ module.exports = class LevelBus extends Bus
onSpellChanged: (e) ->
return unless @onPoint()
console.log "Spell Changed:",e.spell.spellKey
code = @session.get('code')
code ?= {}
parts = e.spell.spellKey.split('/')
code[parts[0]] ?= {}
code[parts[0]][parts[1]] = e.spell.getSource()
@changedSessionProperties.code = true
@session.set({'code': code})
@saveSession()
onSpellCreated: (e) ->
#return unless @onPoint()
teamSpells = @session.get 'teamSpells'
spellTeam = e.spell.team
teamSpells[spellTeam] ?= []
unless e.spell.spellKey in teamSpells[spellTeam]
teamSpells[spellTeam].push e.spell.spellKey
console.log "Assigned spell #{e.spell.spellKey} to team #{spellTeam}"
onScriptStateChanged: (e) ->
return unless @onPoint()
@fireScriptsRef?.update(e)

View file

@ -26,6 +26,8 @@ module.exports = class Spell
@view.render() # Get it ready and code loaded in advance
@tabView = new SpellListTabEntryView spell: @, supermodel: @supermodel
@tabView.render()
@team = @permissions.readwrite[0] ? "common"
Backbone.Mediator.publish 'tome:spell-created', spell: @
addThang: (thang) ->
if @thangs[thang.id]

View file

@ -8,11 +8,10 @@ testing = '--unittest' in process.argv
module.exports.connect = () ->
address = module.exports.generateMongoConnectionString()
winston.info "Connecting to Mongo with connection string #{address}"
winston.info "Connecting to standalone server #{address}"
mongoose.connect address
mongoose.connection.once 'open', ->
Grid.gfs = Grid(mongoose.connection.db, mongoose.mongo)
mongoose.connection.once 'open', -> Grid.gfs = Grid(mongoose.connection.db, mongoose.mongo)
module.exports.generateMongoConnectionString = ->

View file

@ -21,7 +21,8 @@ module.exports.initializeQueueClient = (cb) ->
generateQueueClient = ->
if config.queue.accessKeyId
#if config.queue.accessKeyId
if false #TODO: revert this
queueClient = new SQSQueueClient()
else
queueClient = new MongoQueueClient()
@ -172,7 +173,7 @@ class MongoQueueClient
scheduledVisibilityTime: {type: Date, index: true}
receiptHandle: {type: String, index: true}
@mongooseConnection.model 'MessageQueue',schema
@mongooseConnection.model 'messageQueue',schema
class MongoQueue extends events.EventEmitter
@ -185,7 +186,7 @@ class MongoQueue extends events.EventEmitter
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
receieveMessage: (callback) ->
receiveMessage: (callback) ->
conditions =
queue: @queueName
scheduledVisibilityTime:
@ -199,9 +200,12 @@ class MongoQueue extends events.EventEmitter
receiptHandle: @_generateRandomReceiptHandle()
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate()
@Message.findOneAndUpdate conditions, update, options, =>
if err? then @emit 'error',err,data else @emit 'message',err,data
@Message.findOneAndUpdate conditions, update, options, (err, data) =>
return @emit 'error',err,data if err?
originalData = data
data = new MongoMessage originalData, this
@emit 'message',err,data
callback? err,data

View file

@ -55,12 +55,18 @@ _.extend LevelSessionSchema.properties,
# TODO: specify this more
code: { type: 'object' }
teamSpells:
type: 'object'
additionalProperties:
type: 'array'
players: { type: 'object' }
chat: { type: 'array' }
meanStrength: {type: 'number', default: 25}
standardDeviation: {type:'number', default:25/3, minimum: 0}
totalScore: {type: 'number'}
totalScore: {type: 'number', default: 10}
c.extendBasicProperties LevelSessionSchema, 'level.session'

View file

@ -1,5 +1,5 @@
config = require '../../server_config'
winston = require 'winston'
log = require 'winston'
mongoose = require 'mongoose'
async = require 'async'
errors = require '../commons/errors'
@ -14,29 +14,35 @@ bayes = new (require 'bayesian-battle')()
scoringTaskQueue = undefined
scoringTaskTimeoutInSeconds = 400
module.exports.setup = (app) -> connectToScoringQueue()
connectToScoringQueue = ->
queues.initializeQueueClient ->
queues.queueClient.registerQueue "scoring", {}, (err,data) ->
throwScoringQueueRegistrationError(err) if err?
scoringTaskQueue = data
winston.info "Connected to scoring task queue!"
log.info "Connected to scoring task queue!"
throwScoringQueueRegistrationError = (error) ->
winston.error "There was an error registering the scoring queue: #{error}"
log.error "There was an error registering the scoring queue: #{error}"
throw new Error "There was an error registering the scoring queue."
module.exports.createNewTask = (req, res) ->
scoringTaskQueue.sendMessage req.body, 0, (err, data) ->
return errors.badInput res, "There was an error creating the message, reason: #{err}" if err?
module.exports.setup = (app) -> connectToScoringQueue()
res.send data
res.end()
module.exports.dispatchTaskToConsumer = (req, res) ->
userID = getUserIDFromRequest req
return errors.forbidden res, "You need to be logged in to simulate games" unless userID?
userID = getUserIDFromRequest req,res
return errors.forbidden res, "You need to be logged in to simulate games" if isUserAnonymous req
scoringTaskQueue.receiveMessage (taskQueueReceiveError, message) ->
if message.isEmpty() or taskQueueReceiveError?
return errors.gatewayTimeoutError res, "No messages were receieved from the queue.#{taskQueueReceiveError}"
if (not message?) or message.isEmpty() or taskQueueReceiveError?
return errors.gatewayTimeoutError res, "No messages were receieved from the queue. Msg:#{taskQueueReceiveError}"
messageBody = parseTaskQueueMessage req, res, message
return errors.serverError res, "There was an error parsing the queue message" unless messageBody?
@ -44,8 +50,7 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
constructTaskObject messageBody, (taskConstructionError, taskObject) ->
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
taskProcessingTimeInSeconds = 10
message.changeMessageVisibilityTimeout 10
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
return errors.serverError res, "There was an error creating the task log object." if taskLogError?
@ -55,31 +60,21 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
sendResponseObject req, res, taskObject
getUserIDFromRequest = (req) -> if req.user? then return req.user._id else return null
isUserAnonymous = (req) -> if req.user? then return req.user.anonymous else return true
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
parseTaskQueueMessage = (req, res, message) ->
try
if typeof message.getBody() is "object" then return message.getBody()
return messageBody = JSON.parse message.getBody()
catch e
sendResponseObject req, res, {"error":"There was an error parsing the task.Error: #{e}" }
return null
getUserIDFromRequest = (req) ->
if req.user? and req.user._id?
return req.user._id
else
return null
constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
constructTaskObject = (taskMessageBody, callback) ->
async.map taskMessageBody.sessions, getSessionInformation, (err, sessions) ->
return callback err, data if err?
@ -114,6 +109,19 @@ getSessionInformation = (sessionIDString, callback) ->
callback err, sessionInformation
constructTaskLogObject = (calculatorUserID, messageIdentifierString, callback) ->
taskLogObject = new TaskLog
"createdAt": new Date()
"calculator":calculatorUserID
"sentDate": Date.now()
"messageIdentifierString":messageIdentifierString
taskLogObject.save callback
setTaskObjectTaskLogID = (taskObject, taskLogObjectID) -> taskObject.taskID = taskLogObjectID
sendResponseObject = (req,res,object) ->
res.setHeader('Content-Type', 'application/json')
res.send(object)
@ -128,13 +136,17 @@ module.exports.processTaskResult = (req, res) ->
taskLogJSON = taskLog.toObject()
#return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
#return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
return errors.badInput res, "That computational task has already been performed" if taskLogJSON.calculationTimeMS
return handleTimedOutTask req, res, clientResponseObject if hasTaskTimedOut taskLogJSON.sentDate
logTaskComputation clientResponseObject, taskLog, (loggingError) ->
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}" if loggingError?
if loggingError?
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}"
updateScores clientResponseObject, (updatingScoresError, newScores) ->
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}" if updatingScoresError?
if updatingScoresError?
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}"
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
@ -142,30 +154,30 @@ module.exports.processTaskResult = (req, res) ->
hasTaskTimedOut = (taskSentTimestamp) -> taskSentTimestamp + scoringTaskTimeoutInSeconds * 1000 < Date.now()
handleTimedOutTask = (req, res, taskBody) ->
errors.clientTimeout res, "The task results were not provided within a timely manner"
handleTimedOutTask = (req, res, taskBody) -> errors.clientTimeout res, "The results weren't provided within the timeout"
verifyClientResponse = (responseObject, res) ->
unless typeof responseObject is "object"
errors.badInput res, "The response to that query is required to be a JSON object."
return null
return responseObject
null
else
responseObject
logTaskComputation = (taskObject,taskLogObject, callback) ->
taskLogObject.calculationTimeMS = taskObject.calculationTimeMS
taskLogObject.sessions = taskObject.sessions
taskLogObject.save (err) -> callback err
taskLogObject.save callback
updateScores = (taskObject,callback) ->
winston.info "Updating scores"
sessionIDs = _.pluck taskObject.sessions, 'sessionID'
async.map sessionIDs, retrieveOldScoreMetrics, (err, oldScores) ->
callback err, {"error": "There was an error retrieving the old scores"} if err?
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
newScoreArray = bayes.updatePlayerSkills oldScoreArray
@ -175,36 +187,43 @@ updateScores = (taskObject,callback) ->
saveNewScoresToDatabase = (newScoreArray, callback) ->
async.eachSeries newScoreArray, updateScoreInSession, (err) ->
return callback err, null if err?
callback err, {"message":"All scores were saved successfully."}
if err? then callback err, null else callback err, {"message":"All scores were saved successfully."}
updateScoreInSession = (scoreObject,callback) ->
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
sessionObjectQuery =
"_id": scoreObject.id
LevelSession.findOne sessionObjectQuery, (err, session) ->
return callback err, null if err?
session.meanStrength = scoreObject.meanStrength
session.standardDeviation = scoreObject.standardDeviation
session.totalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
winston.info "Saving session!"
log.info "Saving session #{session._id}!"
session.save callback
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
scoreObject = _.indexBy scoreObject, 'id'
for session in taskObject.sessions
scoreObject[session.sessionID].gameRanking = session.metrics.rank
return scoreObject
scoreObject
retrieveOldScoreMetrics = (sessionID, callback) ->
LevelSession.findOne {"_id":sessionID}, (err, session) ->
sessionQuery =
"_id":sessionID
LevelSession.findOne sessionQuery, (err, session) ->
return callback err, {"error":"There was an error retrieving the session."} if err?
defaultScore = (25 - 1.8*(25/3))
defaultStandardDeviation = 25/3
oldScoreObject =
"standardDeviation":session.standardDeviation ? defaultStandardDeviation
"meanStrength":session.meanStrength ? 25
@ -216,14 +235,11 @@ retrieveOldScoreMetrics = (sessionID, callback) ->
###Sample Messages
sampleQueueMessage =
sessions: [
"52dfeb17c8b5f435c7000025"
"52dfe03ac8b5f435c7000009"
]
{
"sessions": ["52dea9b77e486eeb97000001","52d981a73cf02dcf260003cb"]
}
sampleUndoneTaskObject =
"taskID": "507f191e810c19729de860ea"

View file

@ -9,5 +9,4 @@ ScoringTaskSchema = new mongoose.Schema(
sessions: {type: Array, default: []}
)
#ScoringTaskSchema.set 'capped', 104857600 #100MB capped collection
module.exports = mongoose.model('scoringTask', ScoringTaskSchema)

View file

@ -14,13 +14,15 @@ module.exports.setup = (app) ->
handler = loadQueueHandler queueName
if isHTTPMethodGet req
handler.dispatchTaskToConsumer req,res
else if isHTTPMethodPost req
else if isHTTPMethodPut req
handler.processTaskResult req,res
else if isHTTPMethodPost req
handler.createNewTask req, res #TODO: do not use this in production
else
sendMethodNotSupportedError req, res
catch error
log.error error
sendQueueNotFoundError req, res
sendQueueError req, res, error
setResponseHeaderToJSONContentType = (res) -> res.setHeader('Content-Type', 'application/json')
@ -38,8 +40,11 @@ isHTTPMethodGet = (req) -> return req.route.method is 'get'
isHTTPMethodPost = (req) -> return req.route.method is 'post'
isHTTPMethodPut = (req) -> return req.route.method is 'put'
sendMethodNotSupportedError = (req, res) -> errors.badMethod(res,"Queues do not support the HTTP method used." )
sendQueueNotFoundError = (req,res) -> errors.notFound(res, "Route #{req.path} not found.")
sendQueueError = (req,res, error) -> errors.serverError(res, "Route #{req.path} had a problem: #{error}")