From c00321208d318e33bdd943efc4ee9c553d5e8db2 Mon Sep 17 00:00:00 2001 From: Michael Schmatz Date: Fri, 24 Jan 2014 12:57:18 -0800 Subject: [PATCH] MongoQueue nearing completion --- server/queue.coffee | 42 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/server/queue.coffee b/server/queue.coffee index c8d70029b..29406e625 100644 --- a/server/queue.coffee +++ b/server/queue.coffee @@ -58,12 +58,8 @@ class SQSQueueClient extends AbstractQueueClient class SQSQueue extends events.EventEmitter constructor: (@queueName, @queueUrl, @sqs) -> - subscribe: (eventName, callback) -> - @on eventName, callback - return {eventName, callback} - - unsubscribe: (subscriptionObject) -> - @removeListener subscriptionObject.eventName, subscriptionObject.callback + subscribe: (eventName, callback) -> @on eventName, callback + unsubscribe: (eventName, callback) -> @removeListener eventName, callback publish: (messageBody,delayInSeconds, callback) -> @@ -130,11 +126,8 @@ class MongoQueueClient extends AbstractQueueClient class MongoQueue extends events.EventEmitter constructor: (@queueName, options, @Message) -> - subscribe: (eventName, callback) -> - @channel.subscribe eventName, callback - - unsubscribe: (subscriptionObject) -> - subscriptionObject.unsubscribe() + subscribe: (eventName, callback) -> @on eventName, callback + unsubscribe: (eventName, callback) -> @removeListener eventName, callback publish: (messageBody, delayInSeconds, callback) -> @sendMessage messageBody, delayInSeconds, callback @@ -143,30 +136,33 @@ class MongoQueue extends events.EventEmitter conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}} options = {sort: 'insertionTime'} update = {$set:{processing: true}} - @Message.findOneAndUpdate conditions, update, options, callback + @Message.findOneAndUpdate conditions, update, options, => + if err? then @emit 'error',err,data else @emit 'message',err,data + callback? err,data deleteMessage: (receiptHandle, callback) -> #receiptHandle in this case is an ID - conditions = {queue: @queueName, processing: false, _id : receiptHandle} - @Message.findOneAndRemove conditions, {}, callback - + conditions = {queue: @queueName, _id : receiptHandle} + @Message.findOneAndRemove conditions, {}, => + if err? then @emit 'error',err,data else @emit 'message',err,data + callback? err,data + sendMessage: (messageBody, delaySeconds, callback) -> - messageContents = + messageToSend = new @Message messageBody: messageBody processing: false queue: @queueName scheduledVisibilityTime: Date.now() + (delaySeconds * 1000) - messageToSend = new @Message messageContents - messageToSend.save (err) => - callback(err,messageContents) + if err? then @emit 'error',err,data else @emit 'sent',err, data + callback? err,data - - - deleteMessage: (callback) -> - throw new Error "MongoQueue " + listenForever: => + async.forever (asyncCallback) => + @recieveMessage (err, data) -> + asyncCallback(null)