Streamifying and optimizing the recalculating of stats scripts.

This commit is contained in:
Scott Erickson 2014-08-14 16:40:35 -07:00
parent 4a133f6998
commit 34207d28c9
3 changed files with 148 additions and 55 deletions

View file

@ -28,15 +28,19 @@ whenAllFinished = ->
async.parallel [ async.parallel [
# Misc # Misc
(c) -> report UserHandler.recalculateStats, 'gamesCompleted', c (c) -> report UserHandler.recalculateStats, 'gamesCompleted', c
# Edits # Edits
(c) -> report UserHandler.recalculateStats, 'articleEdits', c (c) -> report UserHandler.recalculateStats, 'articleEdits', c
(c) -> report UserHandler.recalculateStats, 'levelEdits', c (c) -> report UserHandler.recalculateStats, 'levelEdits', c
(c) -> report UserHandler.recalculateStats, 'levelComponentEdits', c (c) -> report UserHandler.recalculateStats, 'levelComponentEdits', c
(c) -> report UserHandler.recalculateStats, 'levelSystemEdits', c (c) -> report UserHandler.recalculateStats, 'levelSystemEdits', c
(c) -> report UserHandler.recalculateStats, 'thangTypeEdits', c (c) -> report UserHandler.recalculateStats, 'thangTypeEdits', c
# Patches # Patches
(c) -> report UserHandler.recalculateStats, 'patchesContributed', c (c) -> report UserHandler.recalculateStats, 'patchesContributed', c
(c) -> report UserHandler.recalculateStats, 'patchesSubmitted', c (c) -> report UserHandler.recalculateStats, 'patchesSubmitted', c
# Patches in memory
(c) -> report UserHandler.recalculateStats, 'totalTranslationPatches', c (c) -> report UserHandler.recalculateStats, 'totalTranslationPatches', c
(c) -> report UserHandler.recalculateStats, 'totalMiscPatches', c (c) -> report UserHandler.recalculateStats, 'totalMiscPatches', c

View file

@ -31,7 +31,7 @@ unlockableAchievements =
worth: 10 worth: 10
collection: 'users' collection: 'users'
userField: '_id' userField: '_id'
category: 'miscellaneous' category: 'misc'
difficulty: 1 difficulty: 1
recalculable: true recalculable: true
@ -42,7 +42,7 @@ unlockableAchievements =
worth: 20 worth: 20
collection: 'users' collection: 'users'
userField: '_id' userField: '_id'
category: 'levels' category: 'level'
difficulty: 1 difficulty: 1
recalculable: true recalculable: true
@ -53,7 +53,7 @@ unlockableAchievements =
worth: 50 worth: 50
collection: 'users' collection: 'users'
userField: '_id' userField: '_id'
category: 'levels' category: 'level'
difficulty: 2 difficulty: 2
recalculable: true recalculable: true
@ -64,7 +64,7 @@ unlockableAchievements =
worth: 500 worth: 500
collection: 'users' collection: 'users'
userField: '_id' userField: '_id'
category: 'levels' category: 'level'
difficulty: 3 difficulty: 3
recalculable: true recalculable: true
@ -189,7 +189,7 @@ repeatableAchievements =
worth: 1 worth: 1
collection: 'users' collection: 'users'
userField: '_id' userField: '_id'
category: 'miscellaneous' category: 'ladder'
difficulty: 1 difficulty: 1
proportionalTo: 'simulatedBy' proportionalTo: 'simulatedBy'
function: function:

View file

@ -391,22 +391,45 @@ UserHandler = class UserHandler extends Handler
countEdits = (model, done) -> countEdits = (model, done) ->
statKey = User.statsMapping.edits[model.modelName] statKey = User.statsMapping.edits[model.modelName]
return done(new Error 'Could not resolve statKey for model') unless statKey? return done(new Error 'Could not resolve statKey for model') unless statKey?
User.find {}, (err, users) ->
async.eachSeries users, ((user, doneWithUser) -> total = 100000
userObjectID = user.get('_id') User.count {anonymous:false}, (err, count) -> total = count
userStringID = userObjectID.toHexString()
stream = User.find({anonymous:false}).sort('_id').limit(10).stream()
numberRunning = 0
numberRan = 0
streamClosed = false
t0 = new Date().getTime()
stream.on 'close', -> streamClosed = true
stream.on 'data', (user) ->
numberRunning += 1
stream.pause() if numberRunning > 20
userObjectID = user.get('_id')
userStringID = userObjectID.toHexString()
model.count {$or: [creator: userObjectID, creator: userStringID]}, (err, count) -> model.count {$or: [creator: userObjectID, creator: userStringID]}, (err, count) ->
if count if count
update = $set: {} update = $set: {}
update.$set[statKey] = count update.$set[statKey] = count
else else
update = $unset: {} update = $unset: {}
update.$unset[statKey] = '' update.$unset[statKey] = ''
User.findByIdAndUpdate user.get('_id'), update, (err) -> User.findByIdAndUpdate user.get('_id'), update, (err) ->
log.error err if err? log.error err if err?
doneWithUser() numberRan += 1
), done pctDone = (100 * numberRan / total).toFixed(2)
console.log "Counted #{statKey} edits for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)"
numberRunning -= 1
if streamClosed and not numberRunning
t1 = new Date().getTime()
runningTime = ((t1-t0)/1000/60/60).toFixed(2)
console.log "we finished in #{runningTime} hours"
return done()
stream.resume()
# I don't like leaking big variables, could remove this for readability # I don't like leaking big variables, could remove this for readability
# Meant for passing into MongoDB # Meant for passing into MongoDB
@ -431,53 +454,119 @@ UserHandler = class UserHandler extends Handler
update[method][statName] = count or '' update[method][statName] = count or ''
User.findByIdAndUpdate user.get('_id'), update, doneUpdatingUser User.findByIdAndUpdate user.get('_id'), update, doneUpdatingUser
User.find {}, (err, users) -> total = 100000
async.eachSeries users, ((user, doneWithUser) -> User.count {anonymous:false}, (err, count) -> total = count
userObjectID = user.get '_id'
userStringID = userObjectID.toHexString()
# Extend query with a patch ownership test
_.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]}
count = 0 userStream = User.find({anonymous:false}).sort('_id').stream()
stream = Patch.where(query).stream() numberRunning = 0
stream.on 'data', (doc) -> ++count if filter doc numberRan = 0
stream.on 'error', (err) -> streamClosed = false
updateUser user, count, doneWithUser t0 = new Date().getTime()
log.error "Recalculating #{statName} for user #{user} stopped prematurely because of error"
stream.on 'close', -> userStream.on 'close', -> streamClosed = true
updateUser user, count, doneWithUser
), done userStream.on 'data', (user) ->
numberRunning += 1
userStream.pause() if numberRunning > 8
userObjectID = user.get '_id'
userStringID = userObjectID.toHexString()
# Extend query with a patch ownership test
_.extend query, {creator: userObjectID}
count = 0
patchStream = Patch.where(query).stream()
patchStream.on 'data', (doc) -> ++count if filter doc
# stream.on 'error', (err) ->
# updateUser user, count, doneWithUser
# log.error "Recalculating #{statName} for user #{user} stopped prematurely because of error"
patchStream.on 'close', ->
updateUser user, count, ->
numberRan += 1
numberRunning -= 1
pctDone = (100 * numberRan / total).toFixed(2)
console.log "Counted #{count} #{statName} for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)"
if streamClosed and not numberRunning
t1 = new Date().getTime()
runningTime = ((t1-t0)/1000/60/60).toFixed(2)
console.log "we finished in #{runningTime} hours"
return done()
userStream.resume()
countPatchesByUsers = (query, statName, done) -> countPatchesByUsers = (query, statName, done) ->
Patch = require '../patches/Patch' Patch = require '../patches/Patch'
User.find {}, (err, users) -> total = 100000
async.eachSeries users, ((user, doneWithUser) -> User.count {anonymous:false}, (err, count) -> total = count
userObjectID = user.get '_id'
userStringID = userObjectID.toHexString() stream = User.find({anonymous:false}).sort('_id').stream()
# Extend query with a patch ownership test numberRunning = 0
_.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]} numberRan = 0
streamClosed = false
t0 = new Date().getTime()
stream.on 'close', -> streamClosed = true
stream.on 'data', (user) ->
numberRunning += 1
stream.pause() if numberRunning > 50
userObjectID = user.get '_id'
userStringID = userObjectID.toHexString()
# Extend query with a patch ownership test
_.extend query, {$or: [{creator: userObjectID}, {creator: userStringID}]}
Patch.count query, (err, count) ->
method = if count then '$set' else '$unset'
update = {}
update[method] = {}
update[method][statName] = count or ''
User.findByIdAndUpdate user.get('_id'), update, ->
numberRan += 1
numberRunning -= 1
pctDone = (100 * numberRan / total).toFixed(2)
console.log "Counted #{statName} patches for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)"
if streamClosed and not numberRunning
t1 = new Date().getTime()
runningTime = ((t1-t0)/1000/60/60).toFixed(2)
console.log "we finished in #{runningTime} hours"
return done()
stream.resume()
Patch.count query, (err, count) ->
method = if count then '$set' else '$unset'
update = {}
update[method] = {}
update[method][statName] = count or ''
User.findByIdAndUpdate user.get('_id'), update, doneWithUser
), done
statRecalculators: statRecalculators:
gamesCompleted: (done) -> gamesCompleted: (done) ->
LevelSession = require '../levels/sessions/LevelSession' LevelSession = require '../levels/sessions/LevelSession'
User.find {}, (err, users) -> total = 1000000
async.eachSeries users, ((user, doneWithUser) -> User.count {}, (err, count) -> total = count
userID = user.get('_id').toHexString()
LevelSession.count {creator: userID, 'state.completed': true}, (err, count) -> stream = User.find().sort('_id').stream()
update = if count then {$set: 'stats.gamesCompleted': count} else {$unset: 'stats.gamesCompleted': ''} numberRunning = 0
User.findByIdAndUpdate user.get('_id'), update, doneWithUser numberRan = 0
), done streamClosed = false
t0 = new Date().getTime()
stream.on 'close', -> streamClosed = true
stream.on 'data', (user) ->
numberRunning += 1
stream.pause() if numberRunning > 100
userID = user.get('_id').toHexString()
LevelSession.count {creator: userID, 'state.complete': true}, (err, count) ->
update = if count then {$set: 'stats.gamesCompleted': count} else {$unset: 'stats.gamesCompleted': ''}
User.findByIdAndUpdate user.get('_id'), update, ->
numberRan += 1
numberRunning -= 1
pctDone = (100 * numberRan / total).toFixed(2)
console.log "Counted #{count} levels played for user #{user.get('name') or '???'} (#{user.get('_id')}) (#{pctDone}%)"
if streamClosed and not numberRunning
t1 = new Date().getTime()
runningTime = ((t1-t0)/1000/60/60).toFixed(2)
console.log "we finished in #{runningTime} hours"
return done()
stream.resume()
articleEdits: (done) -> articleEdits: (done) ->
Article = require '../articles/Article' Article = require '../articles/Article'