From 34207d28c90c8c1593a213498395d4a8e5f2b238 Mon Sep 17 00:00:00 2001 From: Scott Erickson Date: Thu, 14 Aug 2014 16:40:35 -0700 Subject: [PATCH] Streamifying and optimizing the recalculating of stats scripts. --- scripts/recalculateStatistics.coffee | 4 + scripts/setupAchievements.coffee | 10 +- server/users/user_handler.coffee | 189 ++++++++++++++++++++------- 3 files changed, 148 insertions(+), 55 deletions(-) diff --git a/scripts/recalculateStatistics.coffee b/scripts/recalculateStatistics.coffee index 7177acf9d..7c54478af 100644 --- a/scripts/recalculateStatistics.coffee +++ b/scripts/recalculateStatistics.coffee @@ -28,15 +28,19 @@ whenAllFinished = -> async.parallel [ # Misc (c) -> report UserHandler.recalculateStats, 'gamesCompleted', c + # Edits (c) -> report UserHandler.recalculateStats, 'articleEdits', c (c) -> report UserHandler.recalculateStats, 'levelEdits', c (c) -> report UserHandler.recalculateStats, 'levelComponentEdits', c (c) -> report UserHandler.recalculateStats, 'levelSystemEdits', c (c) -> report UserHandler.recalculateStats, 'thangTypeEdits', c + # Patches (c) -> report UserHandler.recalculateStats, 'patchesContributed', c (c) -> report UserHandler.recalculateStats, 'patchesSubmitted', c + + # Patches in memory (c) -> report UserHandler.recalculateStats, 'totalTranslationPatches', c (c) -> report UserHandler.recalculateStats, 'totalMiscPatches', c diff --git a/scripts/setupAchievements.coffee b/scripts/setupAchievements.coffee index 42cfb4e9f..54e7a6f5f 100644 --- a/scripts/setupAchievements.coffee +++ b/scripts/setupAchievements.coffee @@ -31,7 +31,7 @@ unlockableAchievements = worth: 10 collection: 'users' userField: '_id' - category: 'miscellaneous' + category: 'misc' difficulty: 1 recalculable: true @@ -42,7 +42,7 @@ unlockableAchievements = worth: 20 collection: 'users' userField: '_id' - category: 'levels' + category: 'level' difficulty: 1 recalculable: true @@ -53,7 +53,7 @@ unlockableAchievements = worth: 50 collection: 'users' userField: '_id' - category: 'levels' + category: 'level' difficulty: 2 recalculable: true @@ -64,7 +64,7 @@ unlockableAchievements = worth: 500 collection: 'users' userField: '_id' - category: 'levels' + category: 'level' difficulty: 3 recalculable: true @@ -189,7 +189,7 @@ repeatableAchievements = worth: 1 collection: 'users' userField: '_id' - category: 'miscellaneous' + category: 'ladder' difficulty: 1 proportionalTo: 'simulatedBy' function: diff --git a/server/users/user_handler.coffee b/server/users/user_handler.coffee index c87073cd8..3fe79fde9 100644 --- a/server/users/user_handler.coffee +++ b/server/users/user_handler.coffee @@ -391,22 +391,45 @@ UserHandler = class UserHandler extends Handler countEdits = (model, done) -> statKey = User.statsMapping.edits[model.modelName] return done(new Error 'Could not resolve statKey for model') unless statKey? - User.find {}, (err, users) -> - async.eachSeries users, ((user, doneWithUser) -> - userObjectID = user.get('_id') - userStringID = userObjectID.toHexString() + + total = 100000 + User.count {anonymous:false}, (err, count) -> total = count + + stream = User.find({anonymous:false}).sort('_id').limit(10).stream() + numberRunning = 0 + numberRan = 0 + streamClosed = false + t0 = new Date().getTime() + + stream.on 'close', -> streamClosed = true + + stream.on 'data', (user) -> + numberRunning += 1 + stream.pause() if numberRunning > 20 + userObjectID = user.get('_id') + userStringID = userObjectID.toHexString() - model.count {$or: [creator: userObjectID, creator: userStringID]}, (err, count) -> - if count - update = $set: {} - update.$set[statKey] = count - else - update = $unset: {} - update.$unset[statKey] = '' - User.findByIdAndUpdate user.get('_id'), update, (err) -> - log.error err if err? - doneWithUser() - ), done + model.count {$or: [creator: userObjectID, creator: userStringID]}, (err, count) -> + if count + update = $set: {} + update.$set[statKey] = count + else + update = $unset: {} + update.$unset[statKey] = '' + User.findByIdAndUpdate user.get('_id'), update, (err) -> + log.error err if err? + numberRan += 1 + pctDone = (100 * numberRan / total).toFixed(2) + console.log "Counted #{statKey} edits for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)" + numberRunning -= 1 + + if streamClosed and not numberRunning + t1 = new Date().getTime() + runningTime = ((t1-t0)/1000/60/60).toFixed(2) + console.log "we finished in #{runningTime} hours" + return done() + + stream.resume() # I don't like leaking big variables, could remove this for readability # Meant for passing into MongoDB @@ -431,53 +454,119 @@ UserHandler = class UserHandler extends Handler update[method][statName] = count or '' User.findByIdAndUpdate user.get('_id'), update, doneUpdatingUser - User.find {}, (err, users) -> - async.eachSeries users, ((user, doneWithUser) -> - userObjectID = user.get '_id' - userStringID = userObjectID.toHexString() - # Extend query with a patch ownership test - _.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]} + total = 100000 + User.count {anonymous:false}, (err, count) -> total = count - count = 0 - stream = Patch.where(query).stream() - stream.on 'data', (doc) -> ++count if filter doc - stream.on 'error', (err) -> - updateUser user, count, doneWithUser - log.error "Recalculating #{statName} for user #{user} stopped prematurely because of error" - stream.on 'close', -> - updateUser user, count, doneWithUser - ), done + userStream = User.find({anonymous:false}).sort('_id').stream() + numberRunning = 0 + numberRan = 0 + streamClosed = false + t0 = new Date().getTime() + + userStream.on 'close', -> streamClosed = true + + userStream.on 'data', (user) -> + numberRunning += 1 + userStream.pause() if numberRunning > 8 + userObjectID = user.get '_id' + userStringID = userObjectID.toHexString() + # Extend query with a patch ownership test + _.extend query, {creator: userObjectID} + + count = 0 + patchStream = Patch.where(query).stream() + patchStream.on 'data', (doc) -> ++count if filter doc +# stream.on 'error', (err) -> +# updateUser user, count, doneWithUser +# log.error "Recalculating #{statName} for user #{user} stopped prematurely because of error" + patchStream.on 'close', -> + updateUser user, count, -> + numberRan += 1 + numberRunning -= 1 + pctDone = (100 * numberRan / total).toFixed(2) + console.log "Counted #{count} #{statName} for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)" + if streamClosed and not numberRunning + t1 = new Date().getTime() + runningTime = ((t1-t0)/1000/60/60).toFixed(2) + console.log "we finished in #{runningTime} hours" + return done() + userStream.resume() + countPatchesByUsers = (query, statName, done) -> Patch = require '../patches/Patch' - User.find {}, (err, users) -> - async.eachSeries users, ((user, doneWithUser) -> - userObjectID = user.get '_id' - userStringID = userObjectID.toHexString() - # Extend query with a patch ownership test - _.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]} + total = 100000 + User.count {anonymous:false}, (err, count) -> total = count + + stream = User.find({anonymous:false}).sort('_id').stream() + numberRunning = 0 + numberRan = 0 + streamClosed = false + t0 = new Date().getTime() + + stream.on 'close', -> streamClosed = true + + stream.on 'data', (user) -> + numberRunning += 1 + stream.pause() if numberRunning > 50 + userObjectID = user.get '_id' + userStringID = userObjectID.toHexString() + # Extend query with a patch ownership test + _.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]} + + Patch.count query, (err, count) -> + method = if count then '$set' else '$unset' + update = {} + update[method] = {} + update[method][statName] = count or '' + User.findByIdAndUpdate user.get('_id'), update, -> + numberRan += 1 + numberRunning -= 1 + pctDone = (100 * numberRan / total).toFixed(2) + console.log "Counted #{statName} patches for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)" + if streamClosed and not numberRunning + t1 = new Date().getTime() + runningTime = ((t1-t0)/1000/60/60).toFixed(2) + console.log "we finished in #{runningTime} hours" + return done() + stream.resume() - Patch.count query, (err, count) -> - method = if count then '$set' else '$unset' - update = {} - update[method] = {} - update[method][statName] = count or '' - User.findByIdAndUpdate user.get('_id'), update, doneWithUser - ), done statRecalculators: gamesCompleted: (done) -> LevelSession = require '../levels/sessions/LevelSession' - User.find {}, (err, users) -> - async.eachSeries users, ((user, doneWithUser) -> - userID = user.get('_id').toHexString() + total = 1000000 + User.count {}, (err, count) -> total = count - LevelSession.count {creator: userID, 'state.completed': true}, (err, count) -> - update = if count then {$set: 'stats.gamesCompleted': count} else {$unset: 'stats.gamesCompleted': ''} - User.findByIdAndUpdate user.get('_id'), update, doneWithUser - ), done + stream = User.find().sort('_id').stream() + numberRunning = 0 + numberRan = 0 + streamClosed = false + t0 = new Date().getTime() + + stream.on 'close', -> streamClosed = true + + stream.on 'data', (user) -> + numberRunning += 1 + stream.pause() if numberRunning > 100 + userID = user.get('_id').toHexString() + + LevelSession.count {creator: userID, 'state.complete': true}, (err, count) -> + update = if count then {$set: 'stats.gamesCompleted': count} else {$unset: 'stats.gamesCompleted': ''} + User.findByIdAndUpdate user.get('_id'), update, -> + numberRan += 1 + numberRunning -= 1 + pctDone = (100 * numberRan / total).toFixed(2) + console.log "Counted #{count} levels played for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)" + if streamClosed and not numberRunning + t1 = new Date().getTime() + runningTime = ((t1-t0)/1000/60/60).toFixed(2) + console.log "we finished in #{runningTime} hours" + return done() + stream.resume() + articleEdits: (done) -> Article = require '../articles/Article'