2014-02-05 09:39:14 -08:00
config = require ' ../../server_config '
2014-02-10 10:30:08 -08:00
log = require ' winston '
2014-01-22 17:48:11 -08:00
mongoose = require ' mongoose '
async = require ' async '
2014-01-22 18:29:02 -08:00
aws = require ' aws-sdk '
2014-02-10 10:30:08 -08:00
db = require ' ./database '
2014-01-23 12:37:18 -08:00
mongoose = require ' mongoose '
2014-01-23 10:52:43 -08:00
events = require ' events '
2014-02-10 10:30:08 -08:00
crypto = require ' crypto '
2014-02-05 15:07:15 -08:00
2014-02-06 14:32:35 -08:00
module.exports.queueClient = undefined
2014-02-05 15:07:15 -08:00
2014-02-13 12:59:21 -08:00
defaultMessageVisibilityTimeoutInSeconds = 500
2014-02-10 10:30:08 -08:00
defaultMessageReceiptTimeout = 10
2014-01-22 18:29:02 -08:00
2014-01-22 21:41:29 -08:00
2014-02-06 14:32:35 -08:00
module.exports.initializeQueueClient = (cb) ->
2014-02-10 10:30:08 -08:00
module.exports.queueClient = generateQueueClient ( ) unless queueClient ?
2014-01-27 11:14:44 -08:00
2014-02-10 10:30:08 -08:00
cb ? ( )
2014-01-27 11:14:44 -08:00
2014-02-10 10:30:08 -08:00
generateQueueClient = ->
2014-02-10 13:18:39 -08:00
#if config.queue.accessKeyId
2014-02-11 15:47:06 -08:00
if false #TODO: Change this in production
2014-01-23 10:52:43 -08:00
queueClient = new SQSQueueClient ( )
else
queueClient = new MongoQueueClient ( )
2014-02-05 15:07:15 -08:00
2014-02-10 10:30:08 -08:00
class SQSQueueClient
registerQueue: (queueName, options, callback) ->
queueCreationOptions =
QueueName: queueName
2014-02-05 15:07:15 -08:00
2014-02-10 10:30:08 -08:00
@ sqs . createQueue queueCreationOptions , (err,data) =>
@ _logAndThrowFatalException " There was an error creating a new SQS queue, reason: #{ JSON . stringify err } " if err ?
2014-02-05 15:07:15 -08:00
2014-02-10 10:30:08 -08:00
newQueue = new SQSQueue queueName , data . QueueUrl , @ sqs
2014-02-05 15:07:15 -08:00
2014-02-10 10:30:08 -08:00
callback ? err , newQueue
2014-02-05 15:07:15 -08:00
2014-01-23 10:52:43 -08:00
constructor: ->
2014-02-10 10:30:08 -08:00
@ _configure ( )
@sqs = @ _generateSQSInstance ( )
2014-01-24 17:25:02 -08:00
2014-02-10 10:30:08 -08:00
_configure: ->
2014-01-24 11:20:44 -08:00
aws . config . update
accessKeyId: config . queue . accessKeyId
secretAccessKey: config . queue . secretAccessKey
region: config . queue . region
2014-01-23 10:52:43 -08:00
2014-02-10 10:30:08 -08:00
_generateSQSInstance: -> new aws . SQS ( )
_logAndThrowFatalException: (errorMessage) ->
log . error errorMessage
throw new Error errorMessage
2014-01-22 23:48:47 -08:00
2014-01-22 23:41:26 -08:00
2014-01-23 10:52:43 -08:00
class SQSQueue extends events . EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
2014-02-10 10:30:08 -08:00
2014-01-24 12:57:18 -08:00
subscribe: (eventName, callback) -> @ on eventName , callback
unsubscribe: (eventName, callback) -> @ removeListener eventName , callback
2014-01-23 10:52:43 -08:00
2014-01-22 21:41:29 -08:00
2014-01-22 23:13:14 -08:00
receiveMessage: (callback) ->
2014-02-10 10:30:08 -08:00
queueReceiveOptions =
QueueUrl: @ queueUrl
WaitTimeSeconds: defaultMessageReceiptTimeout
@ sqs . receiveMessage queueReceiveOptions , (err, data) =>
if err ?
@ emit ' error ' , err , originalData
else
originalData = data
data = new SQSMessage originalData , this
@ emit ' message ' , err , data
2014-02-05 15:07:15 -08:00
2014-01-23 10:52:43 -08:00
callback ? err , data
2014-02-10 10:30:08 -08:00
2014-01-22 23:13:14 -08:00
deleteMessage: (receiptHandle, callback) ->
2014-02-10 10:30:08 -08:00
queueDeletionOptions =
QueueUrl: @ queueUrl
ReceiptHandle: receiptHandle
@ sqs . deleteMessage queueDeletionOptions , (err, data) =>
2014-01-23 10:52:43 -08:00
if err ? then @ emit ' error ' , err , data else @ emit ' message ' , err , data
2014-02-10 10:30:08 -08:00
2014-01-23 10:52:43 -08:00
callback ? err , data
2014-02-10 10:30:08 -08:00
2014-02-05 15:07:15 -08:00
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
2014-02-10 10:30:08 -08:00
messageVisibilityTimeoutOptions =
QueueUrl: @ queueUrl
ReceiptHandle: receiptHandle
VisibilityTimeout: secondsFromNow
@ sqs . changeMessageVisibility messageVisibilityTimeoutOptions , (err, data) =>
2014-02-05 15:07:15 -08:00
if err ? then @ emit ' error ' , err , data else @ emit ' edited ' , err , data
2014-02-10 10:30:08 -08:00
2014-02-05 15:07:15 -08:00
callback ? err , data
2014-01-22 23:13:14 -08:00
sendMessage: (messageBody, delaySeconds, callback) ->
2014-02-10 10:30:08 -08:00
queueSendingOptions =
QueueUrl: @ queueUrl
MessageBody: messageBody
DelaySeconds: delaySeconds
@ sqs . sendMessage queueSendingOptions , (err, data) =>
2014-01-23 10:52:43 -08:00
if err ? then @ emit ' error ' , err , data else @ emit ' sent ' , err , data
2014-02-10 10:30:08 -08:00
2014-01-23 10:52:43 -08:00
callback ? err , data
2014-01-23 12:37:18 -08:00
2014-02-10 10:30:08 -08:00
listenForever: => async . forever (asyncCallback) => @ receiveMessage (err, data) -> asyncCallback ( null )
class SQSMessage
2014-02-06 14:32:35 -08:00
constructor: (@originalMessage, @parentQueue) ->
isEmpty: -> not @ originalMessage . Messages ? [ 0 ] ?
getBody: -> @ originalMessage . Messages [ 0 ] . Body
2014-02-05 15:07:15 -08:00
2014-02-06 14:32:35 -08:00
getID: -> @ originalMessage . Messages [ 0 ] . MessageId
2014-02-05 15:07:15 -08:00
2014-02-07 14:24:42 -08:00
removeFromQueue: (callback) -> @ parentQueue . deleteMessage @ getReceiptHandle ( ) , callback
2014-02-05 15:07:15 -08:00
2014-02-07 14:24:42 -08:00
requeue: (callback) -> @ parentQueue . changeMessageVisibilityTimeout 0 , @ getReceiptHandle ( ) , callback
2014-02-05 15:07:15 -08:00
2014-02-06 14:32:35 -08:00
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
2014-02-07 14:24:42 -08:00
@ parentQueue . changeMessageVisibilityTimeout secondsFromFunctionCall , @ getReceiptHandle ( ) , callback
2014-02-05 15:07:15 -08:00
2014-02-06 14:32:35 -08:00
getReceiptHandle: -> @ originalMessage . Messages [ 0 ] . ReceiptHandle
2014-02-05 15:07:15 -08:00
2014-01-23 10:52:43 -08:00
2014-01-22 21:41:29 -08:00
2014-01-24 17:25:02 -08:00
class MongoQueueClient
2014-01-23 12:37:18 -08:00
registerQueue: (queueName, options, callback) ->
2014-01-25 10:10:03 -08:00
newQueue = new MongoQueue queueName , options , @ messageModel
2014-01-24 11:20:44 -08:00
callback ( null , newQueue )
2014-01-23 12:37:18 -08:00
2014-02-10 10:30:08 -08:00
constructor: ->
@ _configure ( )
@ _createMongoConnection ( )
@messageModel = @ _generateMessageModel ( )
_configure: -> @databaseAddress = db . generateMongoConnectionString ( )
2014-01-23 12:37:18 -08:00
2014-02-10 10:30:08 -08:00
_createMongoConnection: ->
@mongooseConnection = mongoose . createConnection @ databaseAddress
@ mongooseConnection . on ' error ' , -> log . error " There was an error connecting to the queue in MongoDB "
@ mongooseConnection . once ' open ' , -> log . info " Successfully connected to MongoDB queue! "
_generateMessageModel: ->
2014-01-24 11:20:44 -08:00
schema = new mongoose . Schema
2014-01-27 11:14:44 -08:00
messageBody: Object ,
2014-02-10 10:30:08 -08:00
queue: { type: String , index : true }
scheduledVisibilityTime: { type: Date , index: true }
receiptHandle: { type: String , index: true }
2014-02-10 13:18:39 -08:00
@ mongooseConnection . model ' messageQueue ' , schema
2014-01-23 12:37:18 -08:00
class MongoQueue extends events . EventEmitter
2014-01-25 10:10:03 -08:00
constructor: (queueName, options, messageModel) ->
@Message = messageModel
@queueName = queueName
2014-01-24 11:20:44 -08:00
2014-01-24 12:57:18 -08:00
subscribe: (eventName, callback) -> @ on eventName , callback
unsubscribe: (eventName, callback) -> @ removeListener eventName , callback
2014-01-22 18:29:02 -08:00
2014-03-15 09:20:13 -07:00
totalMessagesInQueue: (callback) -> @ Message . count { } , callback
2014-02-10 13:18:39 -08:00
receiveMessage: (callback) ->
2014-02-10 10:30:08 -08:00
conditions =
queue: @ queueName
scheduledVisibilityTime:
2014-02-13 12:59:21 -08:00
$lte: new Date ( )
2014-02-10 10:30:08 -08:00
options =
sort: ' scheduledVisibilityTime '
update =
$set:
receiptHandle: @ _generateRandomReceiptHandle ( )
scheduledVisibilityTime: @ _constructDefaultVisibilityTimeoutDate ( )
2014-02-10 13:18:39 -08:00
@ Message . findOneAndUpdate conditions , update , options , (err, data) =>
return @ emit ' error ' , err , data if err ?
2014-02-10 10:30:08 -08:00
2014-02-10 13:18:39 -08:00
originalData = data
data = new MongoMessage originalData , this
@ emit ' message ' , err , data
2014-01-24 12:57:18 -08:00
callback ? err , data
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
2014-01-24 11:20:44 -08:00
deleteMessage: (receiptHandle, callback) ->
2014-02-10 10:30:08 -08:00
conditions =
queue: @ queueName
receiptHandle: receiptHandle
scheduledVisibilityTime:
2014-02-13 12:59:21 -08:00
$gte: new Date ( )
2014-02-10 10:30:08 -08:00
@ Message . findOneAndRemove conditions , { } , (err, data) =>
if err ? then @ emit ' error ' , err , data else @ emit ' delete ' , err , data
2014-01-24 12:57:18 -08:00
callback ? err , data
2014-01-24 11:20:44 -08:00
sendMessage: (messageBody, delaySeconds, callback) ->
2014-01-24 12:57:18 -08:00
messageToSend = new @ Message
2014-01-24 11:20:44 -08:00
messageBody: messageBody
queue: @ queueName
2014-02-10 10:30:08 -08:00
scheduledVisibilityTime: @ _constructDefaultVisibilityTimeoutDate delaySeconds
2014-01-24 17:25:02 -08:00
messageToSend . save (err,data) =>
2014-01-24 12:57:18 -08:00
if err ? then @ emit ' error ' , err , data else @ emit ' sent ' , err , data
callback ? err , data
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
changeMessageVisibilityTimeout: (secondsFromNow, receiptHandle, callback) ->
conditions =
queue: @ queueName
receiptHandle: receiptHandle
scheduledVisibilityTime:
2014-02-13 12:59:21 -08:00
$gte: new Date ( )
2014-02-10 10:30:08 -08:00
update =
$set:
scheduledVisibilityTime: @ _constructDefaultVisibilityTimeoutDate secondsFromNow
2014-01-22 18:29:02 -08:00
2014-02-10 10:30:08 -08:00
@ Message . findOneAndUpdate conditions , update , (err, data) =>
2014-02-13 12:59:21 -08:00
if err ?
log . error " There was a problem updating the message visibility timeout: #{ err } "
@ emit ' error ' , err , data
else
@ emit ' update ' , err , data
log . info " The message visibility time was updated "
2014-01-23 10:52:43 -08:00
2014-02-10 10:30:08 -08:00
callback ? err , data
2014-01-23 10:52:43 -08:00
2014-02-10 10:30:08 -08:00
listenForever: => async . forever (asyncCallback) => @ recieveMessage (err, data) -> asyncCallback ( null )
2014-01-23 10:52:43 -08:00
2014-02-10 10:30:08 -08:00
_constructDefaultVisibilityTimeoutDate: (timeoutSeconds) ->
timeoutSeconds ? = defaultMessageVisibilityTimeoutInSeconds
newDate = new Date ( )
newDate = new Date ( newDate . getTime ( ) + 1000 * timeoutSeconds )
2014-01-22 18:29:02 -08:00
2014-02-10 10:30:08 -08:00
newDate
2014-01-22 18:29:02 -08:00
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
_generateRandomReceiptHandle: -> crypto . randomBytes ( 20 ) . toString ( ' hex ' )
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
class MongoMessage
constructor: (@originalMessage, @parentQueue) ->
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
isEmpty: -> not @ originalMessage
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
getBody: -> @ originalMessage . messageBody
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
getID: -> @ originalMesage . _id
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
removeFromQueue: (callback) -> @ parentQueue . deleteMessage @ getReceiptHandle ( ) , callbacks
2014-01-24 11:20:44 -08:00
2014-02-10 10:30:08 -08:00
requeue: (callback) -> @ parentQueue . changeMessageVisibilityTimeout 0 , @ getReceiptHandle ( ) , callback
changeMessageVisibilityTimeout: (secondsFromFunctionCall, callback) ->
@ parentQueue . changeMessageVisibilityTimeout secondsFromFunctionCall , @ getReceiptHandle ( ) , callback
2014-02-05 15:07:15 -08:00
2014-02-10 10:30:08 -08:00
getReceiptHandle: -> @ originalMessage . receiptHandle