Unifying SQS with mubsub interface

This commit is contained in:
Michael Schmatz 2014-01-23 10:52:43 -08:00
parent 671f6d6ffd
commit 1c2eeff399
3 changed files with 111 additions and 51 deletions

View file

@ -60,7 +60,8 @@
"gridfs-stream": "0.4.x",
"stream-buffers": "0.2.x",
"sendwithus": "2.0.x",
"aws-sdk":"~2.0.0"
"aws-sdk":"~2.0.0",
"mubsub": "~0.2.1"
},
"devDependencies": {
"jade": "0.33.x",

View file

@ -8,13 +8,7 @@ errors = require './errors'
testing = '--unittest' in process.argv
module.exports.connectDatabase = () ->
dbName = config.mongo.db
dbName += '_unittest' if testing
address = config.mongo.host + ":" + config.mongo.port
if config.mongo.username and config.mongo.password
address = config.mongo.username + ":" + config.mongo.password + "@" + address
# address = config.mongo.username + "@" + address # if connecting to production server
address = "mongodb://#{address}/#{dbName}"
address = exports.generateDatabaseAddress()
console.log "got address:", address
mongoose.connect address
mongoose.connection.once 'open', ->
@ -55,3 +49,15 @@ getSchema = (req, res, moduleName) ->
catch error
winston.error("Error trying to grab schema from #{name}: #{error}")
errors.notFound(res, "Schema #{moduleName} not found.")
module.exports.generateDatabaseAddress = ->
dbName = config.mongo.db
dbName += '_unittest' if testing
address = config.mongo.host + ":" + config.mongo.port
if config.mongo.username and config.mongo.password
address = config.mongo.username + ":" + config.mongo.password + "@" + address
# address = config.mongo.username + "@" + address # if connecting to production server
address = "mongodb://#{address}/#{dbName}"

View file

@ -4,41 +4,46 @@ mongoose = require 'mongoose'
async = require 'async'
errors = require './errors'
aws = require 'aws-sdk'
mubsub = require 'mubsub'
db = require './db'
events = require 'events'
queueInstance = null
queueClient = null
simulationQueue = null
module.exports.setupRoutes = (app) ->
queueInstance = generateQueueInstance()
generateQueueInstance()
queueClient.registerQueue "simulationQueue", {}, (err,data) ->
simulationQueue = data
simulationQueue.subscribe 'message', (err, data) ->
winston.info "Receieved message #{data.Messages?[0].Body}"
simulationQueue.listenForever()
generateQueueInstance = ->
if config.isProduction
return new RemoteQueue(config.queue.simulationQueueName)
if config.isProduction || true
queueClient = new SQSQueueClient()
else
return new LocalQueue()
queueClient = new MongoQueueClient()
class AbstractQueue
configure: ->
throw new Error "Subclasses must override the configure method"
connect: ->
throw new Error "Subclasses must override the connect method"
createSimulationQueue: ->
throw new Error "Subclasses must override the createSimulationQueue method"
class AbstractQueueClient
registerQueue: (queueName, callback) ->
return
class RemoteQueue extends AbstractQueue
constructor: (queueName) ->
class SQSQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@sqs = @generateSQSInstance()
@createSimulationQueueAndSetUrl queueName, (err, data) =>
@enterReceieveMessageForeverLoop()
configure: ->
aws.config.update
@ -46,50 +51,98 @@ class RemoteQueue extends AbstractQueue
secretAccessKey: config.queue.secretAccessKey
region: config.queue.region
registerQueue: (queueName, options, callback) ->
#returns new queue in data argument of callback
@sqs.createQueue {QueueName: queueName}, (err,data) =>
if err?
winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}")
throw new Error("Fatal SQS error, see Winston output")
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
callback err, newQueue
generateSQSInstance: ->
new aws.SQS()
createSimulationQueueAndSetUrl: (queueName, callback) ->
@sqs.createQueue {QueueName: queueName}, (err, data) =>
if err?
throw new Error "Failed to create queue \"#{queueName}\""
else
winston.info "Created queue, URL is #{data.QueueUrl}"
@queueUrl = data.QueueUrl
callback?(err,data)
class MongoQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@generateMubsubClientInstance()
registerQueue: (queueName, options, callback) ->
channel = @localQueueClient.channel(queueName, options)
callback(null, channel)
configure: ->
@databaseAddress = databaseAddress = db.generateDatabaseAddress()
generateMubsubClientInstance: ->
@localQueueClient = mubsub(@databaseAddress)
enterReceieveMessageForeverLoop: ->
async.forever (asyncCallback) =>
@receiveMessage (err, data) =>
if err?
winston.error "Error receiving message, reason: #{JSON.stringify err}"
else
if data.Messages?
winston.info "Received message, content: #{JSON.stringify data.Messages[0].Body}"
winston.info "Deleting message..."
@deleteMessage data.Messages?[0].ReceiptHandle, ->
winston.info "Deleted message!"
asyncCallback(null)
class AbstractQueue
configure: ->
throw new Error "Subclasses must override the configure method"
class SQSQueue extends events.EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
subscribe: (eventName, callback) ->
this.on eventName, callback
publish: (eventName) ->
return
receiveMessage: (callback) ->
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, callback
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
deleteMessage: (receiptHandle, callback) ->
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, callback
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
sendMessage: (messageBody, delaySeconds, callback) ->
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, callback
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'sent',err, data
callback? err,data
listenForever: =>
async.forever (asyncCallback) =>
@receiveMessage (err, data) ->
asyncCallback(null)
class LocalQueue extends AbstractQueue
constructor: ()->
constructor: (queueName)->
return
configure: () ->
super()
@client = @generateMubsubClient
generateMubsubClient: ->
client = mubsub(databaseAddress)
registerQueueAndReturnChannel: (queueName) ->
return