2014-01-22 20:48:11 -05:00
|
|
|
config = require '../server_config'
|
|
|
|
winston = require 'winston'
|
|
|
|
mongoose = require 'mongoose'
|
|
|
|
async = require 'async'
|
|
|
|
errors = require './errors'
|
2014-01-22 21:29:02 -05:00
|
|
|
aws = require 'aws-sdk'
|
2014-01-23 13:52:43 -05:00
|
|
|
db = require './db'
|
2014-01-23 15:37:18 -05:00
|
|
|
mongoose = require 'mongoose'
|
2014-01-23 13:52:43 -05:00
|
|
|
events = require 'events'
|
2014-01-22 20:48:11 -05:00
|
|
|
|
2014-01-23 13:52:43 -05:00
|
|
|
queueClient = null
|
|
|
|
simulationQueue = null
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-23 00:41:29 -05:00
|
|
|
module.exports.setupRoutes = (app) ->
|
2014-01-23 15:37:18 -05:00
|
|
|
###queueClient.registerQueue "simulationQueue", {}, (err,data) ->
|
2014-01-23 13:52:43 -05:00
|
|
|
simulationQueue = data
|
|
|
|
simulationQueue.subscribe 'message', (err, data) ->
|
2014-01-23 15:37:18 -05:00
|
|
|
if data.Messages?
|
|
|
|
winston.info "Receieved message #{data.Messages?[0].Body}"
|
|
|
|
simulationQueue.deleteMessage data.Messages?[0].ReceiptHandle, ->
|
|
|
|
winston.info "Deleted message"
|
|
|
|
###
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-23 00:41:29 -05:00
|
|
|
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-24 20:25:02 -05:00
|
|
|
|
|
|
|
module.exports.generateQueueClient = ->
|
2014-01-23 15:37:18 -05:00
|
|
|
if config.isProduction
|
2014-01-23 13:52:43 -05:00
|
|
|
queueClient = new SQSQueueClient()
|
|
|
|
else
|
|
|
|
queueClient = new MongoQueueClient()
|
2014-01-24 20:25:02 -05:00
|
|
|
return queueClient
|
2014-01-23 00:41:29 -05:00
|
|
|
|
|
|
|
|
2014-01-24 20:25:02 -05:00
|
|
|
class SQSQueueClient
|
2014-01-23 13:52:43 -05:00
|
|
|
constructor: ->
|
2014-01-23 00:41:29 -05:00
|
|
|
@configure()
|
2014-01-22 21:29:02 -05:00
|
|
|
@sqs = @generateSQSInstance()
|
2014-01-24 20:25:02 -05:00
|
|
|
|
2014-01-24 14:20:44 -05:00
|
|
|
###Public API###
|
2014-01-23 13:52:43 -05:00
|
|
|
registerQueue: (queueName, options, callback) ->
|
|
|
|
#returns new queue in data argument of callback
|
|
|
|
@sqs.createQueue {QueueName: queueName}, (err,data) =>
|
|
|
|
if err?
|
|
|
|
winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}")
|
|
|
|
throw new Error("Fatal SQS error, see Winston output")
|
|
|
|
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
|
|
|
|
callback err, newQueue
|
2014-01-24 14:20:44 -05:00
|
|
|
###Public API###
|
|
|
|
configure: ->
|
|
|
|
aws.config.update
|
|
|
|
accessKeyId: config.queue.accessKeyId
|
|
|
|
secretAccessKey: config.queue.secretAccessKey
|
|
|
|
region: config.queue.region
|
2014-01-23 13:52:43 -05:00
|
|
|
|
2014-01-23 02:48:47 -05:00
|
|
|
generateSQSInstance: ->
|
|
|
|
new aws.SQS()
|
|
|
|
|
2014-01-23 02:41:26 -05:00
|
|
|
|
2014-01-23 13:52:43 -05:00
|
|
|
class SQSQueue extends events.EventEmitter
|
|
|
|
constructor: (@queueName, @queueUrl, @sqs) ->
|
|
|
|
|
2014-01-24 15:57:18 -05:00
|
|
|
subscribe: (eventName, callback) -> @on eventName, callback
|
|
|
|
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
2014-01-23 13:52:43 -05:00
|
|
|
|
2014-01-23 00:41:29 -05:00
|
|
|
|
2014-01-23 15:37:18 -05:00
|
|
|
publish: (messageBody,delayInSeconds, callback) ->
|
|
|
|
@sendMessage messageBody, delayInSeconds, callback
|
2014-01-23 02:41:26 -05:00
|
|
|
|
2014-01-23 02:13:14 -05:00
|
|
|
receiveMessage: (callback) ->
|
2014-01-23 13:52:43 -05:00
|
|
|
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
|
|
|
|
if err? then @emit 'error',err,data else @emit 'message',err,data
|
|
|
|
callback? err,data
|
|
|
|
|
2014-01-23 02:13:14 -05:00
|
|
|
deleteMessage: (receiptHandle, callback) ->
|
2014-01-23 13:52:43 -05:00
|
|
|
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
|
|
|
|
if err? then @emit 'error',err,data else @emit 'message',err,data
|
|
|
|
callback? err,data
|
|
|
|
|
2014-01-23 02:13:14 -05:00
|
|
|
|
|
|
|
sendMessage: (messageBody, delaySeconds, callback) ->
|
2014-01-23 13:52:43 -05:00
|
|
|
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
|
|
|
|
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
|
|
|
callback? err,data
|
|
|
|
|
|
|
|
listenForever: =>
|
|
|
|
async.forever (asyncCallback) =>
|
|
|
|
@receiveMessage (err, data) ->
|
2014-01-23 15:37:18 -05:00
|
|
|
asyncCallback(null)
|
|
|
|
|
2014-01-23 13:52:43 -05:00
|
|
|
|
2014-01-23 00:41:29 -05:00
|
|
|
|
|
|
|
|
2014-01-24 20:25:02 -05:00
|
|
|
class MongoQueueClient
|
2014-01-23 15:37:18 -05:00
|
|
|
constructor: ->
|
|
|
|
@configure()
|
2014-01-24 14:20:44 -05:00
|
|
|
@createMongoConnection()
|
|
|
|
@messageModel = @generateMessageModel()
|
2014-01-23 15:37:18 -05:00
|
|
|
|
2014-01-24 14:20:44 -05:00
|
|
|
###Public API###
|
2014-01-23 15:37:18 -05:00
|
|
|
registerQueue: (queueName, options, callback) ->
|
2014-01-24 14:20:44 -05:00
|
|
|
newQueue = new MongoQueue queueName,options,this
|
|
|
|
callback(null, newQueue)
|
|
|
|
###Public API###
|
2014-01-23 15:37:18 -05:00
|
|
|
|
|
|
|
configure: ->
|
|
|
|
@databaseAddress = db.generateDatabaseAddress()
|
|
|
|
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
|
|
|
|
|
|
|
|
createMongoConnection: ->
|
|
|
|
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
|
|
|
|
@mongooseConnection.on 'error', ->
|
|
|
|
winston.error "There was an error connecting to the queue in MongoDB"
|
|
|
|
@mongooseConnection.once 'open', ->
|
|
|
|
winston.info "Successfully connected to MongoDB queue!"
|
|
|
|
|
2014-01-24 14:20:44 -05:00
|
|
|
generateMessageModel: ->
|
|
|
|
#do find something like: messages not processing, queue as current queue, visibility time before now, sort by insertion time, findOne
|
|
|
|
schema = new mongoose.Schema
|
|
|
|
messageBody : Object,
|
|
|
|
processing: false,
|
|
|
|
insertionTime: {type: Date, default: Date.now }
|
|
|
|
queue: String
|
|
|
|
scheduledVisibilityTime: Date
|
|
|
|
@mongooseConnection.model 'Message',schema
|
2014-01-23 15:37:18 -05:00
|
|
|
|
|
|
|
|
|
|
|
class MongoQueue extends events.EventEmitter
|
2014-01-24 14:20:44 -05:00
|
|
|
constructor: (@queueName, options, @Message) ->
|
|
|
|
|
2014-01-24 15:57:18 -05:00
|
|
|
subscribe: (eventName, callback) -> @on eventName, callback
|
|
|
|
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-23 15:37:18 -05:00
|
|
|
publish: (messageBody, delayInSeconds, callback) ->
|
2014-01-24 14:20:44 -05:00
|
|
|
@sendMessage messageBody, delayInSeconds, callback
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-23 15:37:18 -05:00
|
|
|
receieveMessage: (callback) ->
|
2014-01-24 14:20:44 -05:00
|
|
|
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
|
|
|
|
options = {sort: 'insertionTime'}
|
|
|
|
update = {$set:{processing: true}}
|
2014-01-24 15:57:18 -05:00
|
|
|
@Message.findOneAndUpdate conditions, update, options, =>
|
|
|
|
if err? then @emit 'error',err,data else @emit 'message',err,data
|
|
|
|
callback? err,data
|
2014-01-24 14:20:44 -05:00
|
|
|
|
|
|
|
deleteMessage: (receiptHandle, callback) ->
|
|
|
|
#receiptHandle in this case is an ID
|
2014-01-24 15:57:18 -05:00
|
|
|
conditions = {queue: @queueName, _id : receiptHandle}
|
|
|
|
@Message.findOneAndRemove conditions, {}, =>
|
|
|
|
if err? then @emit 'error',err,data else @emit 'message',err,data
|
|
|
|
callback? err,data
|
|
|
|
|
2014-01-24 14:20:44 -05:00
|
|
|
|
|
|
|
sendMessage: (messageBody, delaySeconds, callback) ->
|
2014-01-24 15:57:18 -05:00
|
|
|
messageToSend = new @Message
|
2014-01-24 14:20:44 -05:00
|
|
|
messageBody: messageBody
|
|
|
|
processing: false
|
|
|
|
queue: @queueName
|
|
|
|
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
|
|
|
|
|
2014-01-24 20:25:02 -05:00
|
|
|
messageToSend.save (err,data) =>
|
2014-01-24 15:57:18 -05:00
|
|
|
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
|
|
|
callback? err,data
|
2014-01-24 14:20:44 -05:00
|
|
|
|
2014-01-24 15:57:18 -05:00
|
|
|
listenForever: =>
|
|
|
|
async.forever (asyncCallback) =>
|
|
|
|
@recieveMessage (err, data) ->
|
|
|
|
asyncCallback(null)
|
2014-01-22 21:29:02 -05:00
|
|
|
|
2014-01-23 13:52:43 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2014-01-22 21:29:02 -05:00
|
|
|
|
|
|
|
|
2014-01-24 14:20:44 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|