More refactoring, added message loop example
This commit is contained in:
parent
ccd94f0f1f
commit
d9b2a36875
1 changed files with 33 additions and 31 deletions
|
@ -16,7 +16,7 @@ module.exports.setupRoutes = (app) ->
|
||||||
|
|
||||||
generateQueueInstance = ->
|
generateQueueInstance = ->
|
||||||
if config.isProduction
|
if config.isProduction
|
||||||
return new RemoteQueue()
|
return new RemoteQueue(config.queue.simulationQueueName)
|
||||||
else
|
else
|
||||||
return new LocalQueue()
|
return new LocalQueue()
|
||||||
|
|
||||||
|
@ -36,45 +36,54 @@ class AbstractQueue
|
||||||
|
|
||||||
|
|
||||||
class RemoteQueue extends AbstractQueue
|
class RemoteQueue extends AbstractQueue
|
||||||
constructor: ->
|
constructor: (queueName) ->
|
||||||
@configure()
|
@configure()
|
||||||
@sqs = @generateSQSInstance()
|
@sqs = @generateSQSInstance()
|
||||||
@createSimulationQueueAndSetUrl (err, data) =>
|
@createSimulationQueueAndSetUrl queueName, (err, data) =>
|
||||||
@sendMessage "This is a new test message",5, =>
|
@sendMessage "This is a new test message",5, (error,data) ->
|
||||||
winston.info "Sent test message!"
|
if err?
|
||||||
setTimeout @receiveMessage.bind(this, (err, data) =>
|
winston.error "#{JSON.stringify error}"
|
||||||
if err?
|
@enterReceieveMessageForeverLoop()
|
||||||
winston.error "Error receiving message, reason: #{JSON.stringify err}"
|
|
||||||
else
|
|
||||||
winston.info "Received message, content: #{JSON.stringify data}"
|
|
||||||
winston.info "Deleting message..."
|
|
||||||
@deleteMessage data.Messages?[0].ReceiptHandle, ->
|
|
||||||
winston.info "Deleted message!"
|
|
||||||
), 5000
|
|
||||||
|
|
||||||
configure: ->
|
configure: ->
|
||||||
aws.config.update @generateAWSConfigurationObject()
|
aws.config.update @generateAWSConfigurationObject()
|
||||||
|
|
||||||
|
enterReceieveMessageForeverLoop: ->
|
||||||
|
async.forever (asyncCallback) =>
|
||||||
|
@receiveMessage (err, data) =>
|
||||||
|
if err?
|
||||||
|
winston.error "Error receiving message, reason: #{JSON.stringify err}"
|
||||||
|
else
|
||||||
|
if data.Messages?
|
||||||
|
winston.info "Received message, content: #{JSON.stringify data.Messages[0].Body}"
|
||||||
|
winston.info "Deleting message..."
|
||||||
|
@deleteMessage data.Messages?[0].ReceiptHandle, ->
|
||||||
|
winston.info "Deleted message!"
|
||||||
|
else
|
||||||
|
winston.info "No messages to receieve"
|
||||||
|
asyncCallback(null)
|
||||||
|
|
||||||
createSimulationQueueAndSetUrl: (callback) ->
|
createSimulationQueueAndSetUrl: (queueName, callback) ->
|
||||||
@sqs.createQueue {QueueName: config.queue.simulationQueueName}, (err, data) =>
|
@sqs.createQueue {QueueName: queueName}, (err, data) =>
|
||||||
if err?
|
if err?
|
||||||
winston.error "Failed to create simulation queue!"
|
throw new Error "Failed to create queue \"#{queueName}\""
|
||||||
throw new Error "Failed to create simulation queue."
|
|
||||||
else
|
else
|
||||||
winston.info "Created simulation queue, URL is #{data.QueueUrl}"
|
winston.info "Created queue, URL is #{data.QueueUrl}"
|
||||||
@simulationQueueUrl = data.QueueUrl
|
@queueUrl = data.QueueUrl
|
||||||
callback?(null,data)
|
callback?(err,data)
|
||||||
|
|
||||||
|
|
||||||
receiveMessage: (callback) ->
|
receiveMessage: (callback) ->
|
||||||
@sqs.receiveMessage {QueueUrl: @simulationQueueUrl, WaitTimeSeconds: 20}, callback
|
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, callback
|
||||||
|
|
||||||
deleteMessage: (receiptHandle, callback) ->
|
deleteMessage: (receiptHandle, callback) ->
|
||||||
@sqs.deleteMessage {QueueUrl: @simulationQueueUrl, ReceiptHandle: receiptHandle}, callback
|
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, callback
|
||||||
|
|
||||||
sendMessage: (messageBody, delaySeconds, callback) ->
|
sendMessage: (messageBody, delaySeconds, callback) ->
|
||||||
@sqs.sendMessage {QueueUrl: @simulationQueueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, callback
|
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, callback
|
||||||
|
|
||||||
|
|
||||||
generateAWSConfigurationObject: ->
|
generateAWSConfigurationObject: ->
|
||||||
|
@ -87,13 +96,6 @@ class RemoteQueue extends AbstractQueue
|
||||||
generateSQSInstance: ->
|
generateSQSInstance: ->
|
||||||
new aws.SQS()
|
new aws.SQS()
|
||||||
|
|
||||||
testConnection: ->
|
|
||||||
@sqs.listQueues {}, (err, data) ->
|
|
||||||
if err?
|
|
||||||
winston.error "Error connecting to SQS, reason: #{err}"
|
|
||||||
throw new Error("Couldn't connect to SQS.")
|
|
||||||
else
|
|
||||||
winston.info "Connected to SQS!"
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Reference in a new issue