mirror of
https://github.com/codeninjasllc/codecombat.git
synced 2025-02-17 08:50:58 -05:00
Improved Mongo queueing and various refactors
This commit is contained in:
parent
8c2926c300
commit
824af18e39
4 changed files with 193 additions and 135 deletions
|
@ -7,18 +7,23 @@ testing = '--unittest' in process.argv
|
|||
|
||||
|
||||
module.exports.connect = () ->
|
||||
address = module.exports.generateMongoConnectionString()
|
||||
|
||||
winston.info "Connecting to standalone server #{address}"
|
||||
mongoose.connect address
|
||||
mongoose.connection.once 'open', ->
|
||||
Grid.gfs = Grid(mongoose.connection.db, mongoose.mongo)
|
||||
|
||||
|
||||
module.exports.generateMongoConnectionString = ->
|
||||
if config.mongo.mongoose_replica_string
|
||||
address = config.mongo.mongoose_replica_string
|
||||
winston.info "Connecting to replica set: #{address}"
|
||||
else
|
||||
dbName = config.mongo.db
|
||||
dbName += '_unittest' if testing
|
||||
address = config.mongo.host + ":" + config.mongo.port
|
||||
if config.mongo.username and config.mongo.password
|
||||
address = config.mongo.username + ":" + config.mongo.password + "@" + address
|
||||
# address = config.mongo.username + "@" + address # if connecting to production server
|
||||
address = "mongodb://#{address}/#{dbName}"
|
||||
winston.info "Connecting to standalone server #{address}"
|
||||
mongoose.connect address
|
||||
mongoose.connection.once 'open', ->
|
||||
Grid.gfs = Grid(mongoose.connection.db, mongoose.mongo)
|
||||
|
||||
return address
|
|
@ -1,131 +1,129 @@
|
|||
config = require '../../server_config'
|
||||
winston = require 'winston'
|
||||
log = require 'winston'
|
||||
mongoose = require 'mongoose'
|
||||
async = require 'async'
|
||||
errors = require './errors'
|
||||
aws = require 'aws-sdk'
|
||||
db = require './../routes/db'
|
||||
db = require './database'
|
||||
mongoose = require 'mongoose'
|
||||
events = require 'events'
|
||||
crypto = require 'crypto'
|
||||
|
||||
module.exports.queueClient = undefined
|
||||
|
||||
module.exports.sendwithusQueue = undefined
|
||||
|
||||
defaultMessageVisibilityTimeoutInSeconds = 20
|
||||
defaultMessageReceiptTimeout = 10
|
||||
|
||||
|
||||
module.exports.initializeQueueClient = (cb) ->
|
||||
module.exports.queueClient = module.exports.generateQueueClient() unless queueClient?
|
||||
module.exports.queueClient = generateQueueClient() unless queueClient?
|
||||
|
||||
cb?()
|
||||
|
||||
module.exports.initializeSendwithusQueue = (cb) ->
|
||||
queueClient = module.exports.generateQueueClient() unless queueClient?
|
||||
queueClient.registerQueue "sendwithus", {}, (err,data) ->
|
||||
if err?
|
||||
errorString = "There was an error registering the sendwithus queue."
|
||||
winston.error errorString
|
||||
throw new Error errorString
|
||||
module.exports.sendwithusQueue = data
|
||||
cb? err, module.exports.sendwithusQueue
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
module.exports.generateQueueClient = ->
|
||||
generateQueueClient = ->
|
||||
if config.queue.accessKeyId
|
||||
queueClient = new SQSQueueClient()
|
||||
else
|
||||
queueClient = new MongoQueueClient()
|
||||
return queueClient
|
||||
|
||||
|
||||
class MessageObject
|
||||
constructor: () ->
|
||||
return
|
||||
|
||||
getBody: ->
|
||||
return
|
||||
|
||||
getID: ->
|
||||
return
|
||||
removeFromQueue: ->
|
||||
return
|
||||
|
||||
requeue: ->
|
||||
return
|
||||
|
||||
changeMessageVisibilityTimeout: (secondsFromFunctionCall) ->
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
class SQSQueueClient
|
||||
constructor: ->
|
||||
@configure()
|
||||
@sqs = @generateSQSInstance()
|
||||
|
||||
###Public API###
|
||||
registerQueue: (queueName, options, callback) ->
|
||||
#returns new queue in data argument of callback
|
||||
@sqs.createQueue {QueueName: queueName}, (err,data) =>
|
||||
if err?
|
||||
winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}")
|
||||
throw new Error("Fatal SQS error, see Winston output")
|
||||
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
|
||||
callback err, newQueue
|
||||
###Public API###
|
||||
configure: ->
|
||||
queueCreationOptions =
|
||||
QueueName: queueName
|
||||
|
||||
@sqs.createQueue queueCreationOptions, (err,data) =>
|
||||
@_logAndThrowFatalException "There was an error creating a new SQS queue, reason: #{JSON.stringify err}" if err?
|
||||
|
||||
newQueue = new SQSQueue queueName, data.QueueUrl, @sqs
|
||||
|
||||
callback? err, newQueue
|
||||
|
||||
|
||||
constructor: ->
|
||||
@_configure()
|
||||
@sqs = @_generateSQSInstance()
|
||||
|
||||
|
||||
_configure: ->
|
||||
aws.config.update
|
||||
accessKeyId: config.queue.accessKeyId
|
||||
secretAccessKey: config.queue.secretAccessKey
|
||||
region: config.queue.region
|
||||
|
||||
generateSQSInstance: ->
|
||||
new aws.SQS()
|
||||
|
||||
_generateSQSInstance: -> new aws.SQS()
|
||||
|
||||
|
||||
_logAndThrowFatalException: (errorMessage) ->
|
||||
log.error errorMessage
|
||||
throw new Error errorMessage
|
||||
|
||||
|
||||
|
||||
class SQSQueue extends events.EventEmitter
|
||||
constructor: (@queueName, @queueUrl, @sqs) ->
|
||||
|
||||
|
||||
subscribe: (eventName, callback) -> @on eventName, callback
|
||||
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
||||
|
||||
|
||||
publish: (messageBody,delayInSeconds, callback) ->
|
||||
@sendMessage messageBody, delayInSeconds, callback
|
||||
|
||||
receiveMessage: (callback) ->
|
||||
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
|
||||
originalData = data
|
||||
data = new SQSMessage originalData, this
|
||||
queueReceiveOptions =
|
||||
QueueUrl: @queueUrl
|
||||
WaitTimeSeconds: defaultMessageReceiptTimeout
|
||||
|
||||
@sqs.receiveMessage queueReceiveOptions, (err, data) =>
|
||||
if err?
|
||||
@emit 'error',err,originalData
|
||||
else
|
||||
originalData = data
|
||||
data = new SQSMessage originalData, this
|
||||
@emit 'message',err,data
|
||||
|
||||
if err? then @emit 'error',err,originalData else @emit 'message',err,data
|
||||
callback? err,data
|
||||
|
||||
|
||||
deleteMessage: (receiptHandle, callback) ->
|
||||
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
|
||||
queueDeletionOptions =
|
||||
QueueUrl: @queueUrl
|
||||
ReceiptHandle: receiptHandle
|
||||
|
||||
@sqs.deleteMessage queueDeletionOptions, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
|
||||
callback? err,data
|
||||
|
||||
|
||||
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
|
||||
@sqs.changeMessageVisibility {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle, VisibilityTimeout: secondsFromNow}, (err, data) =>
|
||||
messageVisibilityTimeoutOptions =
|
||||
QueueUrl: @queueUrl
|
||||
ReceiptHandle: receiptHandle
|
||||
VisibilityTimeout: secondsFromNow
|
||||
|
||||
@sqs.changeMessageVisibility messageVisibilityTimeoutOptions, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'edited',err,data
|
||||
|
||||
callback? err,data
|
||||
|
||||
|
||||
sendMessage: (messageBody, delaySeconds, callback) ->
|
||||
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
|
||||
queueSendingOptions =
|
||||
QueueUrl: @queueUrl
|
||||
MessageBody: messageBody
|
||||
DelaySeconds: delaySeconds
|
||||
|
||||
@sqs.sendMessage queueSendingOptions, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
||||
|
||||
callback? err,data
|
||||
|
||||
listenForever: =>
|
||||
async.forever (asyncCallback) =>
|
||||
@receiveMessage (err, data) ->
|
||||
asyncCallback(null)
|
||||
|
||||
class SQSMessage extends MessageObject
|
||||
listenForever: => async.forever (asyncCallback) => @receiveMessage (err, data) -> asyncCallback(null)
|
||||
|
||||
|
||||
class SQSMessage
|
||||
constructor: (@originalMessage, @parentQueue) ->
|
||||
|
||||
isEmpty: -> not @originalMessage.Messages?[0]?
|
||||
|
@ -143,44 +141,38 @@ class SQSMessage extends MessageObject
|
|||
|
||||
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class MongoQueueClient
|
||||
constructor: ->
|
||||
@configure()
|
||||
@createMongoConnection()
|
||||
@messageModel = @generateMessageModel()
|
||||
|
||||
###Public API###
|
||||
registerQueue: (queueName, options, callback) ->
|
||||
newQueue = new MongoQueue queueName,options,@messageModel
|
||||
callback(null, newQueue)
|
||||
###Public API###
|
||||
|
||||
configure: ->
|
||||
@databaseAddress = db.generateDatabaseAddress()
|
||||
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
|
||||
|
||||
createMongoConnection: ->
|
||||
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
|
||||
@mongooseConnection.on 'error', ->
|
||||
winston.error "There was an error connecting to the queue in MongoDB"
|
||||
@mongooseConnection.once 'open', ->
|
||||
winston.info "Successfully connected to MongoDB queue!"
|
||||
constructor: ->
|
||||
@_configure()
|
||||
@_createMongoConnection()
|
||||
@messageModel = @_generateMessageModel()
|
||||
|
||||
generateMessageModel: ->
|
||||
#do find something like: messages not processing, queue as current queue, visibility time before now, sort by insertion time, findOne
|
||||
|
||||
_configure: -> @databaseAddress = db.generateMongoConnectionString()
|
||||
|
||||
|
||||
_createMongoConnection: ->
|
||||
@mongooseConnection = mongoose.createConnection @databaseAddress
|
||||
@mongooseConnection.on 'error', -> log.error "There was an error connecting to the queue in MongoDB"
|
||||
@mongooseConnection.once 'open', -> log.info "Successfully connected to MongoDB queue!"
|
||||
|
||||
|
||||
_generateMessageModel: ->
|
||||
schema = new mongoose.Schema
|
||||
messageBody: Object,
|
||||
processing: false,
|
||||
insertionTime: {type: Date, default: Date.now }
|
||||
queue: String
|
||||
scheduledVisibilityTime: Date
|
||||
@mongooseConnection.model 'Message',schema
|
||||
queue: {type: String, index:true}
|
||||
scheduledVisibilityTime: {type: Date, index: true}
|
||||
receiptHandle: {type: String, index: true}
|
||||
|
||||
@mongooseConnection.model 'MessageQueue',schema
|
||||
|
||||
|
||||
class MongoQueue extends events.EventEmitter
|
||||
|
@ -192,54 +184,96 @@ class MongoQueue extends events.EventEmitter
|
|||
subscribe: (eventName, callback) -> @on eventName, callback
|
||||
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
||||
|
||||
publish: (messageBody, delayInSeconds, callback) ->
|
||||
@sendMessage messageBody, delayInSeconds, callback
|
||||
|
||||
receieveMessage: (callback) ->
|
||||
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
|
||||
options = {sort: 'scheduledVisibilityTime'}
|
||||
update = {$set: {processing: true}}
|
||||
conditions =
|
||||
queue: @queueName
|
||||
scheduledVisibilityTime:
|
||||
$lt: new Date()
|
||||
|
||||
options =
|
||||
sort: 'scheduledVisibilityTime'
|
||||
|
||||
update =
|
||||
$set:
|
||||
receiptHandle: @_generateRandomReceiptHandle()
|
||||
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate()
|
||||
|
||||
@Message.findOneAndUpdate conditions, update, options, =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
|
||||
callback? err,data
|
||||
|
||||
|
||||
deleteMessage: (receiptHandle, callback) ->
|
||||
#receiptHandle in this case is an ID
|
||||
conditions = {queue: @queueName, _id: receiptHandle}
|
||||
@Message.findOneAndRemove conditions, {}, =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
conditions =
|
||||
queue: @queueName
|
||||
receiptHandle: receiptHandle
|
||||
scheduledVisibilityTime:
|
||||
$lt: new Date()
|
||||
|
||||
@Message.findOneAndRemove conditions, {}, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'delete',err,data
|
||||
|
||||
callback? err,data
|
||||
|
||||
|
||||
sendMessage: (messageBody, delaySeconds, callback) ->
|
||||
messageToSend = new @Message
|
||||
messageBody: messageBody
|
||||
processing: false
|
||||
queue: @queueName
|
||||
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
|
||||
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate delaySeconds
|
||||
|
||||
messageToSend.save (err,data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
||||
callback? err,data
|
||||
|
||||
listenForever: =>
|
||||
async.forever (asyncCallback) =>
|
||||
@recieveMessage (err, data) ->
|
||||
asyncCallback(null)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
|
||||
conditions =
|
||||
queue: @queueName
|
||||
receiptHandle: receiptHandle
|
||||
scheduledVisibilityTime:
|
||||
$lt: new Date()
|
||||
|
||||
update =
|
||||
$set:
|
||||
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate secondsFromNow
|
||||
|
||||
@Message.findOneAndUpdate conditions, update, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'update',err,data
|
||||
|
||||
callback? err, data
|
||||
|
||||
|
||||
listenForever: => async.forever (asyncCallback) => @recieveMessage (err, data) -> asyncCallback(null)
|
||||
|
||||
|
||||
_constructDefaultVisibilityTimeoutDate: (timeoutSeconds) ->
|
||||
timeoutSeconds ?= defaultMessageVisibilityTimeoutInSeconds
|
||||
newDate = new Date()
|
||||
newDate = new Date(newDate.getTime() + 1000 * timeoutSeconds)
|
||||
|
||||
newDate
|
||||
|
||||
|
||||
_generateRandomReceiptHandle: -> crypto.randomBytes(20).toString('hex')
|
||||
|
||||
|
||||
|
||||
class MongoMessage
|
||||
constructor: (@originalMessage, @parentQueue) ->
|
||||
|
||||
isEmpty: -> not @originalMessage
|
||||
|
||||
getBody: -> @originalMessage.messageBody
|
||||
|
||||
getID: -> @originalMesage._id
|
||||
|
||||
removeFromQueue: (callback) -> @parentQueue.deleteMessage @getReceiptHandle(), callbacks
|
||||
|
||||
requeue: (callback) -> @parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
|
||||
|
||||
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
|
||||
@parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback
|
||||
|
||||
getReceiptHandle: -> @originalMessage.receiptHandle
|
|
@ -45,7 +45,7 @@ module.exports.dispatchTaskToConsumer = (req, res) ->
|
|||
return errors.serverError res, "There was an error constructing the scoring task" if taskConstructionError?
|
||||
|
||||
taskProcessingTimeInSeconds = 10
|
||||
message.changeMessageVisibilityTimeout scoringTaskTimeoutInSeconds + taskProcessingTimeInSeconds
|
||||
message.changeMessageVisibilityTimeout 10
|
||||
|
||||
constructTaskLogObject userID,message.getReceiptHandle(), (taskLogError, taskLogObject) ->
|
||||
return errors.serverError res, "There was an error creating the task log object." if taskLogError?
|
||||
|
@ -135,7 +135,8 @@ module.exports.processTaskResult = (req, res) ->
|
|||
return errors.serverError res, "There as a problem logging the task computation: #{loggingError}" if loggingError?
|
||||
updateScores clientResponseObject, (updatingScoresError, newScores) ->
|
||||
return errors.serverError res, "There was an error updating the scores.#{updatingScoresError}" if updatingScoresError?
|
||||
sendResponseObject req, res, newScores
|
||||
sendResponseObject req, res, {"message":"The scores were updated successfully!"}
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -168,9 +169,29 @@ updateScores = (taskObject,callback) ->
|
|||
oldScoreArray = _.toArray putRankingFromMetricsIntoScoreObject taskObject, oldScores
|
||||
|
||||
newScoreArray = bayes.updatePlayerSkills oldScoreArray
|
||||
|
||||
#TODO: database persistence here
|
||||
callback err, newScoreArray
|
||||
|
||||
saveNewScoresToDatabase newScoreArray, callback
|
||||
|
||||
|
||||
saveNewScoresToDatabase = (newScoreArray, callback) ->
|
||||
async.eachSeries newScoreArray, updateScoreInSession, (err) ->
|
||||
return callback err, null if err?
|
||||
|
||||
callback err, {"message":"All scores were saved successfully."}
|
||||
|
||||
|
||||
updateScoreInSession = (scoreObject,callback) ->
|
||||
LevelSession.findOne {"_id": scoreObject.id}, (err, session) ->
|
||||
return callback err, null if err?
|
||||
|
||||
session.meanStrength = scoreObject.meanStrength
|
||||
session.standardDeviation = scoreObject.standardDeviation
|
||||
session.totalScore = scoreObject.meanStrength - 1.8 * scoreObject.standardDeviation
|
||||
|
||||
winston.info "Saving session!"
|
||||
|
||||
session.save callback
|
||||
|
||||
|
||||
putRankingFromMetricsIntoScoreObject = (taskObject,scoreObject) ->
|
||||
scoreObject = _.indexBy scoreObject, 'id'
|
||||
|
|
|
@ -3,8 +3,6 @@ sendwithusAPI = require 'sendwithus'
|
|||
swuAPIKey = config.mail.sendwithusAPIKey
|
||||
queues = require './commons/queue'
|
||||
|
||||
module.exports.setupQueue = -> queues.initializeSendwithusQueue() unless queues.sendwithusQueue?
|
||||
|
||||
module.exports.setupRoutes = (app) ->
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in a new issue