MongoQueue nearing completion

This commit is contained in:
Michael Schmatz 2014-01-24 12:57:18 -08:00
parent 750a2f6d0d
commit c00321208d

View file

@ -58,12 +58,8 @@ class SQSQueueClient extends AbstractQueueClient
class SQSQueue extends events.EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
subscribe: (eventName, callback) ->
@on eventName, callback
return {eventName, callback}
unsubscribe: (subscriptionObject) ->
@removeListener subscriptionObject.eventName, subscriptionObject.callback
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
publish: (messageBody,delayInSeconds, callback) ->
@ -130,11 +126,8 @@ class MongoQueueClient extends AbstractQueueClient
class MongoQueue extends events.EventEmitter
constructor: (@queueName, options, @Message) ->
subscribe: (eventName, callback) ->
@channel.subscribe eventName, callback
unsubscribe: (subscriptionObject) ->
subscriptionObject.unsubscribe()
subscribe: (eventName, callback) -> @on eventName, callback
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
publish: (messageBody, delayInSeconds, callback) ->
@sendMessage messageBody, delayInSeconds, callback
@ -143,30 +136,33 @@ class MongoQueue extends events.EventEmitter
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
options = {sort: 'insertionTime'}
update = {$set:{processing: true}}
@Message.findOneAndUpdate conditions, update, options, callback
@Message.findOneAndUpdate conditions, update, options, =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
deleteMessage: (receiptHandle, callback) ->
#receiptHandle in this case is an ID
conditions = {queue: @queueName, processing: false, _id : receiptHandle}
@Message.findOneAndRemove conditions, {}, callback
conditions = {queue: @queueName, _id : receiptHandle}
@Message.findOneAndRemove conditions, {}, =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
sendMessage: (messageBody, delaySeconds, callback) ->
messageContents =
messageToSend = new @Message
messageBody: messageBody
processing: false
queue: @queueName
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
messageToSend = new @Message messageContents
messageToSend.save (err) =>
callback(err,messageContents)
if err? then @emit 'error',err,data else @emit 'sent',err, data
callback? err,data
deleteMessage: (callback) ->
throw new Error "MongoQueue "
listenForever: =>
async.forever (asyncCallback) =>
@recieveMessage (err, data) ->
asyncCallback(null)