From 1c2eeff399f90e995e632b8da0cf2bf61ed1f44e Mon Sep 17 00:00:00 2001 From: Michael Schmatz Date: Thu, 23 Jan 2014 10:52:43 -0800 Subject: [PATCH] Unifying SQS with mubsub interface --- package.json | 3 +- server/db.coffee | 20 ++++--- server/queue.coffee | 139 ++++++++++++++++++++++++++++++-------------- 3 files changed, 111 insertions(+), 51 deletions(-) diff --git a/package.json b/package.json index 523c52dae..ea4daa798 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,8 @@ "gridfs-stream": "0.4.x", "stream-buffers": "0.2.x", "sendwithus": "2.0.x", - "aws-sdk":"~2.0.0" + "aws-sdk":"~2.0.0", + "mubsub": "~0.2.1" }, "devDependencies": { "jade": "0.33.x", diff --git a/server/db.coffee b/server/db.coffee index 84a4d34fe..19f637e17 100644 --- a/server/db.coffee +++ b/server/db.coffee @@ -8,13 +8,7 @@ errors = require './errors' testing = '--unittest' in process.argv module.exports.connectDatabase = () -> - dbName = config.mongo.db - dbName += '_unittest' if testing - address = config.mongo.host + ":" + config.mongo.port - if config.mongo.username and config.mongo.password - address = config.mongo.username + ":" + config.mongo.password + "@" + address -# address = config.mongo.username + "@" + address # if connecting to production server - address = "mongodb://#{address}/#{dbName}" + address = exports.generateDatabaseAddress() console.log "got address:", address mongoose.connect address mongoose.connection.once 'open', -> @@ -55,3 +49,15 @@ getSchema = (req, res, moduleName) -> catch error winston.error("Error trying to grab schema from #{name}: #{error}") errors.notFound(res, "Schema #{moduleName} not found.") + + +module.exports.generateDatabaseAddress = -> + dbName = config.mongo.db + dbName += '_unittest' if testing + address = config.mongo.host + ":" + config.mongo.port + if config.mongo.username and config.mongo.password + address = config.mongo.username + ":" + config.mongo.password + "@" + address + # address = config.mongo.username + "@" + address # if connecting to production server + address = "mongodb://#{address}/#{dbName}" + + diff --git a/server/queue.coffee b/server/queue.coffee index c6e15a957..ae5a76379 100644 --- a/server/queue.coffee +++ b/server/queue.coffee @@ -4,41 +4,46 @@ mongoose = require 'mongoose' async = require 'async' errors = require './errors' aws = require 'aws-sdk' +mubsub = require 'mubsub' +db = require './db' +events = require 'events' -queueInstance = null +queueClient = null +simulationQueue = null module.exports.setupRoutes = (app) -> - queueInstance = generateQueueInstance() + generateQueueInstance() + queueClient.registerQueue "simulationQueue", {}, (err,data) -> + simulationQueue = data + simulationQueue.subscribe 'message', (err, data) -> + winston.info "Receieved message #{data.Messages?[0].Body}" + + simulationQueue.listenForever() + + + generateQueueInstance = -> - if config.isProduction - return new RemoteQueue(config.queue.simulationQueueName) + if config.isProduction || true + queueClient = new SQSQueueClient() else - return new LocalQueue() + queueClient = new MongoQueueClient() -class AbstractQueue - configure: -> - throw new Error "Subclasses must override the configure method" - - connect: -> - throw new Error "Subclasses must override the connect method" - - createSimulationQueue: -> - throw new Error "Subclasses must override the createSimulationQueue method" +class AbstractQueueClient + registerQueue: (queueName, callback) -> + return -class RemoteQueue extends AbstractQueue - constructor: (queueName) -> +class SQSQueueClient extends AbstractQueueClient + constructor: -> @configure() @sqs = @generateSQSInstance() - @createSimulationQueueAndSetUrl queueName, (err, data) => - @enterReceieveMessageForeverLoop() configure: -> aws.config.update @@ -46,50 +51,98 @@ class RemoteQueue extends AbstractQueue secretAccessKey: config.queue.secretAccessKey region: config.queue.region + registerQueue: (queueName, options, callback) -> + #returns new queue in data argument of callback + @sqs.createQueue {QueueName: queueName}, (err,data) => + if err? + winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}") + throw new Error("Fatal SQS error, see Winston output") + newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs) + callback err, newQueue + + generateSQSInstance: -> new aws.SQS() - createSimulationQueueAndSetUrl: (queueName, callback) -> - @sqs.createQueue {QueueName: queueName}, (err, data) => - if err? - throw new Error "Failed to create queue \"#{queueName}\"" - else - winston.info "Created queue, URL is #{data.QueueUrl}" - @queueUrl = data.QueueUrl - callback?(err,data) + + +class MongoQueueClient extends AbstractQueueClient + constructor: -> + @configure() + @generateMubsubClientInstance() + + registerQueue: (queueName, options, callback) -> + channel = @localQueueClient.channel(queueName, options) + callback(null, channel) + + configure: -> + @databaseAddress = databaseAddress = db.generateDatabaseAddress() + + generateMubsubClientInstance: -> + @localQueueClient = mubsub(@databaseAddress) - enterReceieveMessageForeverLoop: -> - async.forever (asyncCallback) => - @receiveMessage (err, data) => - if err? - winston.error "Error receiving message, reason: #{JSON.stringify err}" - else - if data.Messages? - winston.info "Received message, content: #{JSON.stringify data.Messages[0].Body}" - winston.info "Deleting message..." - @deleteMessage data.Messages?[0].ReceiptHandle, -> - winston.info "Deleted message!" - asyncCallback(null) + + +class AbstractQueue + configure: -> + throw new Error "Subclasses must override the configure method" + + + +class SQSQueue extends events.EventEmitter + constructor: (@queueName, @queueUrl, @sqs) -> + + + subscribe: (eventName, callback) -> + this.on eventName, callback + + publish: (eventName) -> + return + + receiveMessage: (callback) -> - @sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, callback + @sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) => + if err? then @emit 'error',err,data else @emit 'message',err,data + callback? err,data + deleteMessage: (receiptHandle, callback) -> - @sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, callback + @sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) => + if err? then @emit 'error',err,data else @emit 'message',err,data + callback? err,data + sendMessage: (messageBody, delaySeconds, callback) -> - @sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, callback + @sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) => + if err? then @emit 'error',err,data else @emit 'sent',err, data + callback? err,data + + listenForever: => + async.forever (asyncCallback) => + @receiveMessage (err, data) -> + asyncCallback(null) + class LocalQueue extends AbstractQueue - constructor: ()-> + constructor: (queueName)-> return configure: () -> - super() + @client = @generateMubsubClient + + + generateMubsubClient: -> + + client = mubsub(databaseAddress) + + registerQueueAndReturnChannel: (queueName) -> + return +