Rewriting mongo queue

This commit is contained in:
Michael Schmatz 2014-01-24 11:20:44 -08:00
parent 7c6974955a
commit 750a2f6d0d

View file

@ -35,13 +35,7 @@ class SQSQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@sqs = @generateSQSInstance()
configure: ->
aws.config.update
accessKeyId: config.queue.accessKeyId
secretAccessKey: config.queue.secretAccessKey
region: config.queue.region
###Public API###
registerQueue: (queueName, options, callback) ->
#returns new queue in data argument of callback
@sqs.createQueue {QueueName: queueName}, (err,data) =>
@ -50,6 +44,12 @@ class SQSQueueClient extends AbstractQueueClient
throw new Error("Fatal SQS error, see Winston output")
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
callback err, newQueue
###Public API###
configure: ->
aws.config.update
accessKeyId: config.queue.accessKeyId
secretAccessKey: config.queue.secretAccessKey
region: config.queue.region
generateSQSInstance: ->
new aws.SQS()
@ -96,15 +96,18 @@ class SQSQueue extends events.EventEmitter
class MongoQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@createMongoConnection()
@messageModel = @generateMessageModel()
###Public API###
registerQueue: (queueName, options, callback) ->
channel = new MongoQueue queueName,options,this
callback(null, channel)
newQueue = new MongoQueue queueName,options,this
callback(null, newQueue)
###Public API###
configure: ->
@databaseAddress = db.generateDatabaseAddress()
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
@createMongoConnection()
createMongoConnection: ->
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
@ -113,12 +116,20 @@ class MongoQueueClient extends AbstractQueueClient
@mongooseConnection.once 'open', ->
winston.info "Successfully connected to MongoDB queue!"
generateMessageModel: ->
#do find something like: messages not processing, queue as current queue, visibility time before now, sort by insertion time, findOne
schema = new mongoose.Schema
messageBody : Object,
processing: false,
insertionTime: {type: Date, default: Date.now }
queue: String
scheduledVisibilityTime: Date
@mongooseConnection.model 'Message',schema
class MongoQueue extends events.EventEmitter
constructor: (@queueName, options, mubSubClient) ->
@channel = mubSubClient.channel queueName, options
@subscribe 'message', receieveMessage
constructor: (@queueName, options, @Message) ->
subscribe: (eventName, callback) ->
@channel.subscribe eventName, callback
@ -126,11 +137,34 @@ class MongoQueue extends events.EventEmitter
subscriptionObject.unsubscribe()
publish: (messageBody, delayInSeconds, callback) ->
#TODO: Mongo-based persistence of delayed messages
setTimeout @channel.publish.bind(this), delayInSeconds * 1000, @queueName, messageBody, callback
@sendMessage messageBody, delayInSeconds, callback
receieveMessage: (callback) ->
throw new Error "MongoQueue does not support fetching one message, it continually listens"
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
options = {sort: 'insertionTime'}
update = {$set:{processing: true}}
@Message.findOneAndUpdate conditions, update, options, callback
deleteMessage: (receiptHandle, callback) ->
#receiptHandle in this case is an ID
conditions = {queue: @queueName, processing: false, _id : receiptHandle}
@Message.findOneAndRemove conditions, {}, callback
sendMessage: (messageBody, delaySeconds, callback) ->
messageContents =
messageBody: messageBody
processing: false
queue: @queueName
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
messageToSend = new @Message messageContents
messageToSend.save (err) =>
callback(err,messageContents)
deleteMessage: (callback) ->
throw new Error "MongoQueue "
@ -141,3 +175,12 @@ class MongoQueue extends events.EventEmitter