codecombat/server/commons/queue.coffee

258 lines
8 KiB
CoffeeScript
Raw Normal View History

2014-02-05 12:39:14 -05:00
config = require '../../server_config'
log = require 'winston'
2014-01-22 20:48:11 -05:00
mongoose = require 'mongoose'
async = require 'async'
aws = require 'aws-sdk'
db = require './database'
mongoose = require 'mongoose'
2014-01-23 13:52:43 -05:00
events = require 'events'
crypto = require 'crypto'
module.exports.queueClient = undefined
2014-02-13 15:59:21 -05:00
defaultMessageVisibilityTimeoutInSeconds = 500
defaultMessageReceiptTimeout = 10
module.exports.initializeQueueClient = (cb) ->
module.exports.queueClient = generateQueueClient() unless queueClient?
2014-01-27 14:14:44 -05:00
cb?()
2014-01-27 14:14:44 -05:00
generateQueueClient = ->
#if config.queue.accessKeyId
2014-02-11 18:47:06 -05:00
if false #TODO: Change this in production
2014-01-23 13:52:43 -05:00
queueClient = new SQSQueueClient()
else
queueClient = new MongoQueueClient()
class SQSQueueClient
registerQueue: (queueName, options, callback) ->
queueCreationOptions =
QueueName: queueName
2014-06-30 22:16:26 -04:00
@sqs.createQueue queueCreationOptions, (err, data) =>
@_logAndThrowFatalException "There was an error creating a new SQS queue, reason: #{JSON.stringify err}" if err?
newQueue = new SQSQueue queueName, data.QueueUrl, @sqs
callback? err, newQueue
2014-01-23 13:52:43 -05:00
constructor: ->
@_configure()
@sqs = @_generateSQSInstance()
2014-01-24 20:25:02 -05:00
_configure: ->
2014-01-24 14:20:44 -05:00
aws.config.update
accessKeyId: config.queue.accessKeyId
secretAccessKey: config.queue.secretAccessKey
region: config.queue.region
2014-01-23 13:52:43 -05:00
_generateSQSInstance: -> new aws.SQS()
_logAndThrowFatalException: (errorMessage) ->
log.error errorMessage
throw new Error errorMessage
2014-01-23 13:52:43 -05:00
class SQSQueue extends events.EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
2014-01-24 15:57:18 -05:00
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
2014-01-23 13:52:43 -05:00
receiveMessage: (callback) ->
queueReceiveOptions =
QueueUrl: @queueUrl
WaitTimeSeconds: defaultMessageReceiptTimeout
@sqs.receiveMessage queueReceiveOptions, (err, data) =>
if err?
2014-06-30 22:16:26 -04:00
@emit 'error', err, originalData
else
originalData = data
data = new SQSMessage originalData, this
2014-06-30 22:16:26 -04:00
@emit 'message', err, data
2014-01-23 13:52:43 -05:00
2014-06-30 22:16:26 -04:00
callback? err, data
deleteMessage: (receiptHandle, callback) ->
queueDeletionOptions =
QueueUrl: @queueUrl
ReceiptHandle: receiptHandle
@sqs.deleteMessage queueDeletionOptions, (err, data) =>
2014-06-30 22:16:26 -04:00
if err? then @emit 'error', err, data else @emit 'message', err, data
2014-01-23 13:52:43 -05:00
2014-06-30 22:16:26 -04:00
callback? err, data
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
messageVisibilityTimeoutOptions =
QueueUrl: @queueUrl
ReceiptHandle: receiptHandle
VisibilityTimeout: secondsFromNow
@sqs.changeMessageVisibility messageVisibilityTimeoutOptions, (err, data) =>
2014-06-30 22:16:26 -04:00
if err? then @emit 'error', err, data else @emit 'edited', err, data
2014-06-30 22:16:26 -04:00
callback? err, data
sendMessage: (messageBody, delaySeconds, callback) ->
queueSendingOptions =
QueueUrl: @queueUrl
MessageBody: messageBody
DelaySeconds: delaySeconds
@sqs.sendMessage queueSendingOptions, (err, data) =>
2014-06-30 22:16:26 -04:00
if err? then @emit 'error', err, data else @emit 'sent', err, data
2014-01-23 13:52:43 -05:00
2014-06-30 22:16:26 -04:00
callback? err, data
listenForever: => async.forever (asyncCallback) => @receiveMessage (err, data) -> asyncCallback(null)
class SQSMessage
constructor: (@originalMessage, @parentQueue) ->
isEmpty: -> not @originalMessage.Messages?[0]?
getBody: -> @originalMessage.Messages[0].Body
getID: -> @originalMessage.Messages[0].MessageId
removeFromQueue: (callback) -> @parentQueue.deleteMessage @getReceiptHandle(), callback
requeue: (callback) -> @parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
2014-06-30 22:16:26 -04:00
@parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall, @getReceiptHandle(), callback
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle
2014-01-24 20:25:02 -05:00
class MongoQueueClient
registerQueue: (queueName, options, callback) ->
2014-06-30 22:16:26 -04:00
newQueue = new MongoQueue queueName, options, @messageModel
2014-01-24 14:20:44 -05:00
callback(null, newQueue)
constructor: ->
@_configure()
@_createMongoConnection()
@messageModel = @_generateMessageModel()
_configure: -> @databaseAddress = db.generateMongoConnectionString()
_createMongoConnection: ->
@mongooseConnection = mongoose.createConnection @databaseAddress
@mongooseConnection.on 'error', -> log.error 'There was an error connecting to the queue in MongoDB' unless config.proxy
2014-06-30 22:16:26 -04:00
@mongooseConnection.once 'open', -> log.info 'Successfully connected to MongoDB queue!'
_generateMessageModel: ->
2014-01-24 14:20:44 -05:00
schema = new mongoose.Schema
2014-01-27 14:14:44 -05:00
messageBody: Object,
2014-06-30 22:16:26 -04:00
queue: {type: String, index: true}
scheduledVisibilityTime: {type: Date, index: true}
receiptHandle: {type: String, index: true}
2014-06-30 22:16:26 -04:00
@mongooseConnection.model 'messageQueue', schema
class MongoQueue extends events.EventEmitter
2014-01-25 13:10:03 -05:00
constructor: (queueName, options, messageModel) ->
@Message = messageModel
@queueName = queueName
2014-01-24 15:57:18 -05:00
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
2014-03-15 12:20:13 -04:00
totalMessagesInQueue: (callback) -> @Message.count {}, callback
2014-06-30 22:16:26 -04:00
receiveMessage: (callback) ->
conditions =
queue: @queueName
scheduledVisibilityTime:
2014-02-13 15:59:21 -05:00
$lte: new Date()
#options =
# sort: 'scheduledVisibilityTime'
update =
$set:
receiptHandle: @_generateRandomReceiptHandle()
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate()
2014-03-20 14:31:26 -04:00
@Message.findOneAndUpdate conditions, update, (err, data) =>
2014-06-30 22:16:26 -04:00
return @emit 'error', err, data if err?
originalData = data
data = new MongoMessage originalData, this
2014-06-30 22:16:26 -04:00
@emit 'message', err, data
callback? err, data
2014-01-24 14:20:44 -05:00
deleteMessage: (receiptHandle, callback) ->
conditions =
queue: @queueName
receiptHandle: receiptHandle
scheduledVisibilityTime:
2014-02-13 15:59:21 -05:00
$gte: new Date()
@Message.findOneAndRemove conditions, {}, (err, data) =>
2014-06-30 22:16:26 -04:00
if err? then @emit 'error', err, data else @emit 'delete', err, data
2014-01-24 15:57:18 -05:00
2014-06-30 22:16:26 -04:00
callback? err, data
2014-01-24 14:20:44 -05:00
sendMessage: (messageBody, delaySeconds, callback) ->
2014-01-24 15:57:18 -05:00
messageToSend = new @Message
2014-01-24 14:20:44 -05:00
messageBody: messageBody
queue: @queueName
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate delaySeconds
2014-06-30 22:16:26 -04:00
messageToSend.save (err, data) =>
if err? then @emit 'error', err, data else @emit 'sent', err, data
callback? err, data
2014-01-24 14:20:44 -05:00
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
conditions =
queue: @queueName
receiptHandle: receiptHandle
scheduledVisibilityTime:
2014-02-13 15:59:21 -05:00
$gte: new Date()
update =
$set:
scheduledVisibilityTime: @_constructDefaultVisibilityTimeoutDate secondsFromNow
@Message.findOneAndUpdate conditions, update, (err, data) =>
2014-02-13 15:59:21 -05:00
if err?
log.error "There was a problem updating the message visibility timeout:#{err}"
2014-06-30 22:16:26 -04:00
@emit 'error', err, data
2014-02-13 15:59:21 -05:00
else
2014-06-30 22:16:26 -04:00
@emit 'update', err, data
#log.info 'The message visibility time was updated'
2014-01-23 13:52:43 -05:00
callback? err, data
2014-01-23 13:52:43 -05:00
listenForever: => async.forever (asyncCallback) => @recieveMessage (err, data) -> asyncCallback(null)
2014-01-23 13:52:43 -05:00
_constructDefaultVisibilityTimeoutDate: (timeoutSeconds) ->
timeoutSeconds ?= defaultMessageVisibilityTimeoutInSeconds
newDate = new Date()
newDate = new Date(newDate.getTime() + 1000 * timeoutSeconds)
newDate
_generateRandomReceiptHandle: -> crypto.randomBytes(20).toString('hex')
2014-01-24 14:20:44 -05:00
class MongoMessage
constructor: (@originalMessage, @parentQueue) ->
2014-01-24 14:20:44 -05:00
isEmpty: -> not @originalMessage
2014-01-24 14:20:44 -05:00
getBody: -> @originalMessage.messageBody
2014-01-24 14:20:44 -05:00
getID: -> @originalMesage._id
2014-01-24 14:20:44 -05:00
removeFromQueue: (callback) -> @parentQueue.deleteMessage @getReceiptHandle(), callbacks
2014-01-24 14:20:44 -05:00
requeue: (callback) -> @parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
2014-06-30 22:16:26 -04:00
@parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall, @getReceiptHandle(), callback
getReceiptHandle: -> @originalMessage.receiptHandle