mirror of
https://github.com/codeninjasllc/codecombat.git
synced 2025-04-27 14:33:59 -04:00
Merge branch 'feature/task-queue' into feature/level-runner-interface
Conflicts: server.coffee server/db.coffee
This commit is contained in:
commit
9401a4370e
7 changed files with 299 additions and 12 deletions
|
@ -59,7 +59,9 @@
|
|||
"express-useragent": "~0.0.9",
|
||||
"gridfs-stream": "0.4.x",
|
||||
"stream-buffers": "0.2.x",
|
||||
"sendwithus": "2.0.x"
|
||||
"sendwithus": "2.0.x",
|
||||
"aws-sdk":"~2.0.0",
|
||||
"mubsub": "~0.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"jade": "0.33.x",
|
||||
|
|
|
@ -25,14 +25,3 @@ createAndConfigureApp = ->
|
|||
serverSetup.setupRoutes app
|
||||
app
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
27
server/multiplayer.coffee
Normal file
27
server/multiplayer.coffee
Normal file
|
@ -0,0 +1,27 @@
|
|||
config = require '../server_config'
|
||||
winston = require 'winston'
|
||||
mongoose = require 'mongoose'
|
||||
async = require 'async'
|
||||
errors = require './commons/errors'
|
||||
aws = require 'aws-sdk'
|
||||
db = require './routes/db'
|
||||
mongoose = require 'mongoose'
|
||||
events = require 'events'
|
||||
queues = require 'queue'
|
||||
|
||||
module.exports.connectToScoringQueue = -> queues.initializeScoringTaskQueue() unless queues.scoringTaskQueue?
|
||||
|
||||
module.exports.setupRoutes = (app) ->
|
||||
return
|
||||
|
||||
|
||||
module.exports.setupRoutes = (app) ->
|
||||
app.all '/multiplayer/*', (req, res) ->
|
||||
unless scoringTaskQueue?
|
||||
|
||||
errors.custom res, 503, "Currently initializing scoring queue"
|
||||
|
||||
return
|
||||
|
||||
|
||||
|
209
server/queue.coffee
Normal file
209
server/queue.coffee
Normal file
|
@ -0,0 +1,209 @@
|
|||
config = require '../server_config'
|
||||
winston = require 'winston'
|
||||
mongoose = require 'mongoose'
|
||||
async = require 'async'
|
||||
errors = require './commons/errors'
|
||||
aws = require 'aws-sdk'
|
||||
db = require './routes/db'
|
||||
mongoose = require 'mongoose'
|
||||
events = require 'events'
|
||||
|
||||
queueClient = undefined
|
||||
module.exports.scoringTaskQueue = undefined
|
||||
module.exports.sendwithusQueue = undefined
|
||||
###
|
||||
module.exports.setupRoutes = (app) ->
|
||||
app.get '/multiplayer/'
|
||||
queueClient.registerQueue "simulationQueue", {}, (err,data) ->
|
||||
simulationQueue = data
|
||||
simulationQueue.subscribe 'message', (err, data) ->
|
||||
if data.Messages?
|
||||
winston.info "Receieved message #{data.Messages?[0].Body}"
|
||||
simulationQueue.deleteMessage data.Messages?[0].ReceiptHandle, ->
|
||||
winston.info "Deleted message"
|
||||
###
|
||||
|
||||
module.exports.initializeScoringTaskQueue = (cb) ->
|
||||
queueClient = module.exports.generateQueueClient() unless queueClient?
|
||||
queueClient.registerQueue "scoring", {}, (err,data) ->
|
||||
if err?
|
||||
winston.error "There was an error registering the scoring queue."
|
||||
throw new Error "There was an error registering the scoring queue."
|
||||
module.exports.scoringTaskQueue = data
|
||||
cb? err, module.exports.scoringQueue
|
||||
|
||||
|
||||
module.exports.initializeSendwithusQueue = (cb) ->
|
||||
queueClient = module.exports.generateQueueClient() unless queueClient?
|
||||
queueClient.registerQueue "sendwithus", {}, (err,data) ->
|
||||
if err?
|
||||
errorString = "There was an error registering the sendwithus queue."
|
||||
winston.error errorString
|
||||
throw new Error errorString
|
||||
module.exports.sendwithusQueue = data
|
||||
cb? err, module.exports.sendwithusQueue
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
module.exports.generateQueueClient = ->
|
||||
if config.isProduction
|
||||
queueClient = new SQSQueueClient()
|
||||
else
|
||||
queueClient = new MongoQueueClient()
|
||||
return queueClient
|
||||
|
||||
|
||||
class SQSQueueClient
|
||||
constructor: ->
|
||||
@configure()
|
||||
@sqs = @generateSQSInstance()
|
||||
|
||||
###Public API###
|
||||
registerQueue: (queueName, options, callback) ->
|
||||
#returns new queue in data argument of callback
|
||||
@sqs.createQueue {QueueName: queueName}, (err,data) =>
|
||||
if err?
|
||||
winston.error("There was an error creating a new SQS queue, reason: #{JSON.stringify err}")
|
||||
throw new Error("Fatal SQS error, see Winston output")
|
||||
newQueue = new SQSQueue(queueName, data.QueueUrl, @sqs)
|
||||
callback err, newQueue
|
||||
###Public API###
|
||||
configure: ->
|
||||
aws.config.update
|
||||
accessKeyId: config.queue.accessKeyId
|
||||
secretAccessKey: config.queue.secretAccessKey
|
||||
region: config.queue.region
|
||||
|
||||
generateSQSInstance: ->
|
||||
new aws.SQS()
|
||||
|
||||
|
||||
class SQSQueue extends events.EventEmitter
|
||||
constructor: (@queueName, @queueUrl, @sqs) ->
|
||||
|
||||
subscribe: (eventName, callback) -> @on eventName, callback
|
||||
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
||||
|
||||
|
||||
publish: (messageBody,delayInSeconds, callback) ->
|
||||
@sendMessage messageBody, delayInSeconds, callback
|
||||
|
||||
receiveMessage: (callback) ->
|
||||
@sqs.receiveMessage {QueueUrl: @queueUrl, WaitTimeSeconds: 20}, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
callback? err,data
|
||||
|
||||
deleteMessage: (receiptHandle, callback) ->
|
||||
@sqs.deleteMessage {QueueUrl: @queueUrl, ReceiptHandle: receiptHandle}, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
callback? err,data
|
||||
|
||||
|
||||
sendMessage: (messageBody, delaySeconds, callback) ->
|
||||
@sqs.sendMessage {QueueUrl: @queueUrl, MessageBody: messageBody, DelaySeconds: delaySeconds}, (err, data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
||||
callback? err,data
|
||||
|
||||
listenForever: =>
|
||||
async.forever (asyncCallback) =>
|
||||
@receiveMessage (err, data) ->
|
||||
asyncCallback(null)
|
||||
|
||||
|
||||
|
||||
|
||||
class MongoQueueClient
|
||||
constructor: ->
|
||||
@configure()
|
||||
@createMongoConnection()
|
||||
@messageModel = @generateMessageModel()
|
||||
|
||||
###Public API###
|
||||
registerQueue: (queueName, options, callback) ->
|
||||
newQueue = new MongoQueue queueName,options,@messageModel
|
||||
callback(null, newQueue)
|
||||
###Public API###
|
||||
|
||||
configure: ->
|
||||
@databaseAddress = db.generateDatabaseAddress()
|
||||
@mongoDatabaseName = config.mongoQueue.queueDatabaseName;
|
||||
|
||||
createMongoConnection: ->
|
||||
@mongooseConnection = mongoose.createConnection "mongodb://#{@databaseAddress}/#{@mongoDatabaseName}"
|
||||
@mongooseConnection.on 'error', ->
|
||||
winston.error "There was an error connecting to the queue in MongoDB"
|
||||
@mongooseConnection.once 'open', ->
|
||||
winston.info "Successfully connected to MongoDB queue!"
|
||||
|
||||
generateMessageModel: ->
|
||||
#do find something like: messages not processing, queue as current queue, visibility time before now, sort by insertion time, findOne
|
||||
schema = new mongoose.Schema
|
||||
messageBody: Object,
|
||||
processing: false,
|
||||
insertionTime: {type: Date, default: Date.now }
|
||||
queue: String
|
||||
scheduledVisibilityTime: Date
|
||||
@mongooseConnection.model 'Message',schema
|
||||
|
||||
|
||||
class MongoQueue extends events.EventEmitter
|
||||
constructor: (queueName, options, messageModel) ->
|
||||
@Message = messageModel
|
||||
@queueName = queueName
|
||||
|
||||
|
||||
subscribe: (eventName, callback) -> @on eventName, callback
|
||||
unsubscribe: (eventName, callback) -> @removeListener eventName, callback
|
||||
|
||||
publish: (messageBody, delayInSeconds, callback) ->
|
||||
@sendMessage messageBody, delayInSeconds, callback
|
||||
|
||||
receieveMessage: (callback) ->
|
||||
conditions = {queue: @queueName, processing: false, scheduledVisibilityTime: {$lt:Date.now()}}
|
||||
options = {sort: 'scheduledVisibilityTime'}
|
||||
update = {$set: {processing: true}}
|
||||
@Message.findOneAndUpdate conditions, update, options, =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
callback? err,data
|
||||
|
||||
deleteMessage: (receiptHandle, callback) ->
|
||||
#receiptHandle in this case is an ID
|
||||
conditions = {queue: @queueName, _id: receiptHandle}
|
||||
@Message.findOneAndRemove conditions, {}, =>
|
||||
if err? then @emit 'error',err,data else @emit 'message',err,data
|
||||
callback? err,data
|
||||
|
||||
|
||||
sendMessage: (messageBody, delaySeconds, callback) ->
|
||||
messageToSend = new @Message
|
||||
messageBody: messageBody
|
||||
processing: false
|
||||
queue: @queueName
|
||||
scheduledVisibilityTime: Date.now() + (delaySeconds * 1000)
|
||||
messageToSend.save (err,data) =>
|
||||
if err? then @emit 'error',err,data else @emit 'sent',err, data
|
||||
callback? err,data
|
||||
|
||||
listenForever: =>
|
||||
async.forever (asyncCallback) =>
|
||||
@recieveMessage (err, data) ->
|
||||
asyncCallback(null)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
45
server/queue.spec.coffee
Normal file
45
server/queue.spec.coffee
Normal file
|
@ -0,0 +1,45 @@
|
|||
queues = require './queue'
|
||||
config = require '../server_config.js'
|
||||
winston = require 'winston'
|
||||
describe 'Queue', ->
|
||||
describe 'construction interface', ->
|
||||
it 'should construct a MongoQueueClient if not in production', ->
|
||||
config.isProduction = false
|
||||
queue = queues.generateQueueClient()
|
||||
expect(queue.constructor.name).toEqual 'MongoQueueClient'
|
||||
it 'should construct an SQSQueueClient if in production', ->
|
||||
config.isProduction = true
|
||||
queue = queues.generateQueueClient()
|
||||
expect(queue.constructor.name).toEqual 'SQSQueueClient'
|
||||
describe 'registerQueue', ->
|
||||
mongoQueueClient = null
|
||||
sqsQueueClient = null
|
||||
beforeEach ->
|
||||
config.isProduction = false
|
||||
mongoQueueClient = queues.generateQueueClient()
|
||||
|
||||
it 'should generate the correct type of queue', (done) ->
|
||||
mongoQueueClient.registerQueue "TestQueue", {}, (err, data) ->
|
||||
expect(data.constructor.name).toEqual 'MongoQueue'
|
||||
done()
|
||||
|
||||
describe 'messages', ->
|
||||
mongoQueueClient = queues.generateQueueClient()
|
||||
testQueue = null
|
||||
|
||||
messageIdToDelete = null
|
||||
it 'should send a message', (done) ->
|
||||
mongoQueueClient.registerQueue "TestQueue", {}, (err, data) ->
|
||||
testQueue = data
|
||||
testQueue.sendMessage {"Body":"This is a test message"} ,0, (err2, data2) ->
|
||||
done()
|
||||
it 'should receieve a message', (done) ->
|
||||
testQueue.receieveMessage (err,data) ->
|
||||
winston.info "Data body is #{data.Body}"
|
||||
expect(data.Body).toBe "This is a test message"
|
||||
messageIdToDelete = data._id
|
||||
done()
|
||||
it 'should delete a message', (done) ->
|
||||
expect(true).toBeTruthy()
|
||||
done()
|
||||
|
|
@ -1,6 +1,13 @@
|
|||
config = require '../server_config'
|
||||
sendwithusAPI = require 'sendwithus'
|
||||
swuAPIKey = config.mail.sendwithusAPIKey
|
||||
queues = require './queue'
|
||||
|
||||
module.exports.setupQueue = -> queues.initializeSendwithusQueue() unless queues.sendwithusQueue?
|
||||
|
||||
module.exports.setupRoutes = (app) ->
|
||||
return
|
||||
|
||||
|
||||
options = { DEBUG: not config.isProduction }
|
||||
module.exports.api = new sendwithusAPI swuAPIKey, options
|
||||
|
|
|
@ -31,6 +31,14 @@ config.mail.mailchimpAPIKey = process.env.COCO_MAILCHIMP_API_KEY || '';
|
|||
config.mail.mailchimpWebhook = process.env.COCO_MAILCHIMP_WEBHOOK || '/mail/webhook';
|
||||
config.mail.sendwithusAPIKey = process.env.COCO_SENDWITHUS_API_KEY || '';
|
||||
|
||||
config.queue = {};
|
||||
config.queue.accessKeyId = process.env.COCO_AWS_ACCESS_KEY_ID || '';
|
||||
config.queue.secretAccessKey = process.env.COCO_AWS_SECRET_ACCESS_KEY || '';
|
||||
config.queue.region = 'us-east-1';
|
||||
config.queue.simulationQueueName = "simulationQueue";
|
||||
config.mongoQueue = {};
|
||||
config.mongoQueue.queueDatabaseName = "coco_queue";
|
||||
|
||||
config.salt = process.env.COCO_SALT || 'pepper';
|
||||
config.cookie_secret = process.env.COCO_COOKIE_SECRET || 'chips ahoy';
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue