Ditching mubsub, moving to a slower yet more extensible method of local queueing

This commit is contained in:
Michael Schmatz 2014-01-23 12:37:18 -08:00
parent 1c2eeff399
commit 7c6974955a
3 changed files with 66 additions and 68 deletions

View file

@ -8,7 +8,10 @@ errors = require './errors'
testing = '--unittest' in process.argv
module.exports.connectDatabase = () ->
dbName = config.mongo.db
dbName += '_unittest' if testing
address = exports.generateDatabaseAddress()
address = "mongodb://#{address}/#{dbName}"
console.log "got address:", address
mongoose.connect address
mongoose.connection.once 'open', ->
@ -52,12 +55,10 @@ getSchema = (req, res, moduleName) ->
module.exports.generateDatabaseAddress = ->
dbName = config.mongo.db
dbName += '_unittest' if testing
address = config.mongo.host + ":" + config.mongo.port
if config.mongo.username and config.mongo.password
address = config.mongo.username + ":" + config.mongo.password + "@" + address
# address = config.mongo.username + "@" + address # if connecting to production server
address = "mongodb://#{address}/#{dbName}"
return address

View file

@ -4,8 +4,8 @@ mongoose = require 'mongoose'
async = require 'async'
errors = require './errors'
aws = require 'aws-sdk'
mubsub = require 'mubsub'
db = require './db'
mongoose = require 'mongoose'
events = require 'events'
queueClient = null
@ -13,33 +13,24 @@ simulationQueue = null
module.exports.setupRoutes = (app) ->
generateQueueInstance()
queueClient.registerQueue "simulationQueue", {}, (err,data) ->
###queueClient.registerQueue "simulationQueue", {}, (err,data) ->
simulationQueue = data
simulationQueue.subscribe 'message', (err, data) ->
winston.info "Receieved message #{data.Messages?[0].Body}"
simulationQueue.listenForever()
if data.Messages?
winston.info "Receieved message #{data.Messages?[0].Body}"
simulationQueue.deleteMessage data.Messages?[0].ReceiptHandle, ->
winston.info "Deleted message"
###
generateQueueInstance = ->
if config.isProduction || true
if config.isProduction
queueClient = new SQSQueueClient()
else
queueClient = new MongoQueueClient()
class AbstractQueueClient
registerQueue: (queueName, callback) ->
return
class SQSQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@ -60,55 +51,29 @@ class SQSQueueClient extends AbstractQueueClient
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
callback err, newQueue
generateSQSInstance: ->
new aws.SQS()
class MongoQueueClient extends AbstractQueueClient
constructor: ->
@configure()
@generateMubsubClientInstance()
registerQueue: (queueName, options, callback) ->
channel = @localQueueClient.channel(queueName, options)
callback(null, channel)
configure: ->
@databaseAddress = databaseAddress = db.generateDatabaseAddress()
generateMubsubClientInstance: ->
@localQueueClient = mubsub(@databaseAddress)
class AbstractQueue
configure: ->
throw new Error "Subclasses must override the configure method"
class SQSQueue extends events.EventEmitter
constructor: (@queueName, @queueUrl, @sqs) ->
subscribe: (eventName, callback) ->
this.on eventName, callback
@on eventName, callback
return {eventName, callback}
publish: (eventName) ->
return
unsubscribe: (subscriptionObject) ->
@removeListener subscriptionObject.eventName, subscriptionObject.callback
publish: (messageBody,delayInSeconds, callback) ->
@sendMessage messageBody, delayInSeconds, callback
receiveMessage: (callback) ->
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
callback? err,data
deleteMessage: (receiptHandle, callback) ->
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
if err? then @emit 'error',err,data else @emit 'message',err,data
@ -123,26 +88,56 @@ class SQSQueue extends events.EventEmitter
listenForever: =>
async.forever (asyncCallback) =>
@receiveMessage (err, data) ->
asyncCallback(null)
asyncCallback(null)
class MongoQueueClient extends AbstractQueueClient
constructor: ->
@configure()
registerQueue: (queueName, options, callback) ->
channel = new MongoQueue queueName,options,this
callback(null, channel)
configure: ->
@databaseAddress = db.generateDatabaseAddress()
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
@createMongoConnection()
createMongoConnection: ->
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
@mongooseConnection.on 'error', ->
winston.error "There was an error connecting to the queue in MongoDB"
@mongooseConnection.once 'open', ->
winston.info "Successfully connected to MongoDB queue!"
class MongoQueue extends events.EventEmitter
constructor: (@queueName, options, mubSubClient) ->
@channel = mubSubClient.channel queueName, options
@subscribe 'message', receieveMessage
subscribe: (eventName, callback) ->
@channel.subscribe eventName, callback
unsubscribe: (subscriptionObject) ->
subscriptionObject.unsubscribe()
publish: (messageBody, delayInSeconds, callback) ->
#TODO: Mongo-based persistence of delayed messages
setTimeout @channel.publish.bind(this), delayInSeconds * 1000, @queueName, messageBody, callback
receieveMessage: (callback) ->
throw new Error "MongoQueue does not support fetching one message, it continually listens"
deleteMessage: (callback) ->
throw new Error "MongoQueue "
class LocalQueue extends AbstractQueue
constructor: (queueName)->
return
configure: () ->
@client = @generateMubsubClient
generateMubsubClient: ->
client = mubsub(databaseAddress)
registerQueueAndReturnChannel: (queueName) ->
return

View file

@ -34,6 +34,8 @@ config.queue.accessKeyId = process.env.COCO_AWS_ACCESS_KEY_ID || '';
config.queue.secretAccessKey = process.env.COCO_AWS_SECRET_ACCESS_KEY || '';
config.queue.region = 'us-east-1';
config.queue.simulationQueueName = "simulationQueue";
config.mongoQueue = {};
config.mongoQueue.queueDatabaseName = "coco_queue";
config.salt = process.env.COCO_SALT || 'pepper';
config.cookie_secret = process.env.COCO_COOKIE_SECRET || 'chips ahoy';