codecombat/server/commons/queue.coffee

246 lines
7 KiB
CoffeeScript
Raw Normal View History

2014-02-05 12:39:14 -05:00
config = require '../../server_config'
2014-01-22 20:48:11 -05:00
winston = require 'winston'
mongoose = require 'mongoose'
async = require 'async'
2014-02-05 12:39:14 -05:00
errors = require './errors'
aws = require 'aws-sdk'
2014-02-05 12:39:14 -05:00
db = require './../routes/db'
mongoose = require 'mongoose'
2014-01-23 13:52:43 -05:00
events = require 'events'
module.exports.queueClient = undefined
module.exports.sendwithusQueue = undefined
2014-01-23 00:41:29 -05:00
module.exports.initializeQueueClient = (cb) ->
module.exports.queueClient = module.exports.generateQueueClient() unless queueClient?
cb?()
2014-01-27 14:14:44 -05:00
module.exports.initializeSendwithusQueue = (cb) ->
queueClient = module.exports.generateQueueClient() unless queueClient?
2014-01-27 14:14:44 -05:00
queueClient.registerQueue "sendwithus", {}, (err,data) ->
if err?
errorString = "There was an error registering the sendwithus queue."
winston.error errorString
throw new Error errorString
module.exports.sendwithusQueue = data
cb? err, module.exports.sendwithusQueue
2014-01-24 20:25:02 -05:00
2014-01-27 14:14:44 -05:00
module.exports.generateQueueClient = ->
if config.queue.accessKeyId
2014-01-23 13:52:43 -05:00
queueClient = new SQSQueueClient()
else
queueClient = new MongoQueueClient()
2014-01-24 20:25:02 -05:00
return queueClient
2014-01-23 00:41:29 -05:00
class MessageObject
constructor: () ->
return
getBody: ->
return
getID: ->
return
removeFromQueue: ->
return
requeue: ->
return
changeMessageVisibilityTimeout: (secondsFromFunctionCall) ->
return
2014-01-24 20:25:02 -05:00
class SQSQueueClient
2014-01-23 13:52:43 -05:00
constructor: ->
2014-01-23 00:41:29 -05:00
@configure()
@sqs = @generateSQSInstance()
2014-01-24 20:25:02 -05:00
2014-01-24 14:20:44 -05:00
###Public API###
2014-01-23 13:52:43 -05:00
registerQueue: (queueName, options, callback) ->
#returns new queue in data argument of callback
@sqs.createQueue {QueueName: queueName}, (err,data) =>
if err?
winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}")
throw new Error("Fatal SQS error, see Winston output")
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
callback err, newQueue
2014-01-24 14:20:44 -05:00
###Public API###
configure: ->
aws.config.update
accessKeyId: config.queue.accessKeyId
secretAccessKey: config.queue.secretAccessKey
region: config.queue.region
2014-01-23 13:52:43 -05:00
2014-01-23 02:48:47 -05:00
generateSQSInstance: ->
new aws.SQS()
2014-01-23 13:52:43 -05:00
class SQSQueue extends events.EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
2014-01-24 15:57:18 -05:00
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
2014-01-23 13:52:43 -05:00
2014-01-23 00:41:29 -05:00
publish: (messageBody,delayInSeconds, callback) ->
@sendMessage messageBody, delayInSeconds, callback
receiveMessage: (callback) ->
2014-01-23 13:52:43 -05:00
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
originalData = data
data = new SQSMessage originalData, this
if err? then @emit 'error',err,originalData else @emit 'message',err,data
2014-01-23 13:52:43 -05:00
callback? err,data
deleteMessage: (receiptHandle, callback) ->
2014-01-23 13:52:43 -05:00
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
@sqs.changeMessageVisibility {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle, VisibilityTimeout: secondsFromNow}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'edited',err,data
callback? err,data
sendMessage: (messageBody, delaySeconds, callback) ->
2014-01-23 13:52:43 -05:00
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'sent',err, data
callback? err,data
listenForever: =>
async.forever (asyncCallback) =>
@receiveMessage (err, data) ->
asyncCallback(null)
class SQSMessage extends MessageObject
constructor: (@originalMessage, @parentQueue) ->
isEmpty: -> not @originalMessage.Messages?[0]?
getBody: -> @originalMessage.Messages[0].Body
getID: -> @originalMessage.Messages[0].MessageId
removeFromQueue: (callback) -> @parentQueue.deleteMessage @getReceiptHandle(), callback
requeue: (callback) -> @parentQueue.changeMessageVisibilityTimeout 0, @getReceiptHandle(), callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
@parentQueue.changeMessageVisibilityTimeout secondsFromFunctionCall,@getReceiptHandle(), callback
getReceiptHandle: -> @originalMessage.Messages[0].ReceiptHandle
2014-01-23 13:52:43 -05:00
2014-01-23 00:41:29 -05:00
2014-01-24 20:25:02 -05:00
class MongoQueueClient
constructor: ->
@configure()
2014-01-24 14:20:44 -05:00
@createMongoConnection()
@messageModel = @generateMessageModel()
2014-01-24 14:20:44 -05:00
###Public API###
registerQueue: (queueName, options, callback) ->
2014-01-25 13:10:03 -05:00
newQueue = new MongoQueue queueName,options,@messageModel
2014-01-24 14:20:44 -05:00
callback(null, newQueue)
###Public API###
configure: ->
@databaseAddress = db.generateDatabaseAddress()
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
2014-01-25 13:10:03 -05:00
createMongoConnection: ->
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
@mongooseConnection.on 'error', ->
winston.error "There was an error connecting to the queue in MongoDB"
@mongooseConnection.once 'open', ->
winston.info "Successfully connected to MongoDB queue!"
2014-01-24 14:20:44 -05:00
generateMessageModel: ->
#do find something like: messages not processing, queue as current queue, visibility time before now, sort by insertion time, findOne
schema = new mongoose.Schema
2014-01-27 14:14:44 -05:00
messageBody: Object,
2014-01-24 14:20:44 -05:00
processing: false,
insertionTime: {type: Date, default: Date.now }
queue: String
scheduledVisibilityTime: Date
@mongooseConnection.model 'Message',schema
class MongoQueue extends events.EventEmitter
2014-01-25 13:10:03 -05:00
constructor: (queueName, options, messageModel) ->
@Message = messageModel
@queueName = queueName
2014-01-24 14:20:44 -05:00
2014-01-24 15:57:18 -05:00
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
publish: (messageBody, delayInSeconds, callback) ->
2014-01-24 14:20:44 -05:00
@sendMessage messageBody, delayInSeconds, callback
receieveMessage: (callback) ->
2014-01-24 14:20:44 -05:00
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
2014-01-27 14:14:44 -05:00
options = {sort: 'scheduledVisibilityTime'}
update = {$set: {processing: true}}
2014-01-24 15:57:18 -05:00
@Message.findOneAndUpdate conditions, update, options, =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
2014-01-24 14:20:44 -05:00
deleteMessage: (receiptHandle, callback) ->
#receiptHandle in this case is an ID
2014-01-27 14:14:44 -05:00
conditions = {queue: @queueName, _id: receiptHandle}
2014-01-24 15:57:18 -05:00
@Message.findOneAndRemove conditions, {}, =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
2014-01-24 14:20:44 -05:00
sendMessage: (messageBody, delaySeconds, callback) ->
2014-01-24 15:57:18 -05:00
messageToSend = new @Message
2014-01-24 14:20:44 -05:00
messageBody: messageBody
processing: false
queue: @queueName
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
2014-01-24 20:25:02 -05:00
messageToSend.save (err,data) =>
2014-01-24 15:57:18 -05:00
if err? then @emit 'error',err,data else @emit 'sent',err, data
callback? err,data
2014-01-24 14:20:44 -05:00
2014-01-24 15:57:18 -05:00
listenForever: =>
async.forever (asyncCallback) =>
@recieveMessage (err, data) ->
asyncCallback(null)
2014-01-23 13:52:43 -05:00
2014-01-24 14:20:44 -05:00