Parcourir la source

Merge pull request #1312 from mnaamani/colossus/minor-performance-enhancements

Colossus/minor performance enhancements
Martin il y a 4 ans
Parent
commit
812cd880e1

+ 1 - 1
storage-node/packages/colossus/bin/cli.js

@@ -18,7 +18,7 @@ const debug = require('debug')('joystream:colossus')
 const PROJECT_ROOT = path.resolve(__dirname, '..')
 
 // Number of milliseconds to wait between synchronization runs.
-const SYNC_PERIOD_MS = 300000 // 5min
+const SYNC_PERIOD_MS = 120000 // 2min
 
 // Parse CLI
 const FLAG_DEFINITIONS = {

+ 98 - 60
storage-node/packages/colossus/lib/sync.js

@@ -19,83 +19,115 @@
 'use strict'
 
 const debug = require('debug')('joystream:sync')
+const _ = require('lodash')
 
-async function syncCallback(api, storage) {
-  // The first step is to gather all data objects from chain.
-  // TODO: in future, limit to a configured tranche
-  // FIXME this isn't actually on chain yet, so we'll fake it.
-  const knownContentIds = (await api.assets.getKnownContentIds()) || []
+// The number of concurrent sync sessions allowed. Must be greater than zero.
+const MAX_CONCURRENT_SYNC_ITEMS = 20
 
-  const roleAddress = api.identities.key.address
-  const providerId = api.storageProviderId
+async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
+  const knownContentIds = await api.assets.getKnownContentIds()
 
-  // Iterate over all sync objects, and ensure they're synced.
-  const allChecks = knownContentIds.map(async (contentId) => {
-    // eslint-disable-next-line prefer-const
-    let { relationship, relationshipId } = await api.assets.getStorageRelationshipAndId(providerId, contentId)
+  // Select ids which we have not yet fully synced
+  const needsSync = knownContentIds
+    .filter((id) => !contentCompleteSynced.has(id))
+    .filter((id) => !contentBeingSynced.has(id))
 
-    // get the data object
-    // make sure the data object was Accepted by the liaison,
-    // don't just blindly attempt to fetch them
+  // Since we are limiting concurrent content ids being synced, to ensure
+  // better distribution of content across storage nodes during a potentially long
+  // sync process we don't want all nodes to replicate items in the same order, so
+  // we simply shuffle.
+  const candidatesForSync = _.shuffle(needsSync)
+
+  // TODO: get the data object
+  // make sure the data object was Accepted by the liaison,
+  // don't just blindly attempt to fetch them
+  while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
+    const contentId = candidatesForSync.shift()
 
-    let fileLocal
     try {
-      // check if we have content or not
-      const stats = await storage.stat(contentId)
-      fileLocal = stats.local
+      contentBeingSynced.set(contentId)
+      await storage.synchronize(contentId, (err, status) => {
+        if (err) {
+          contentBeingSynced.delete(contentId)
+          debug(`Error Syncing ${err}`)
+        } else if (status.synced) {
+          contentBeingSynced.delete(contentId)
+          contentCompleteSynced.set(contentId)
+        }
+      })
     } catch (err) {
-      // on error stating or timeout
-      debug(err.message)
-      // we don't have content if we can't stat it
-      fileLocal = false
+      // Most likely failed to resolve the content id
+      debug(`Failed calling synchronize ${err}`)
+      contentBeingSynced.delete(contentId)
     }
+  }
+}
 
-    if (!fileLocal) {
-      try {
-        await storage.synchronize(contentId)
-      } catch (err) {
-        // duplicate logging
-        // debug(err.message)
-        return
-      }
-      // why are we returning, if we synced the file
-      return
-    }
+async function createNewRelationships({ api, contentCompleteSynced }) {
+  const roleAddress = api.identities.key.address
+  const providerId = api.storageProviderId
 
-    if (!relationship) {
-      // create relationship
-      debug(`Creating new storage relationship for ${contentId.encode()}`)
-      try {
-        relationshipId = await api.assets.createStorageRelationship(roleAddress, providerId, contentId)
-        await api.assets.toggleStorageRelationshipReady(roleAddress, providerId, relationshipId, true)
-      } catch (err) {
-        debug(`Error creating new storage relationship ${contentId.encode()}: ${err.stack}`)
-      }
-    } else if (!relationship.ready) {
-      debug(`Updating storage relationship to ready for ${contentId.encode()}`)
-      // update to ready. (Why would there be a relationship set to ready: false?)
+  // Create new relationships for synced content if required and
+  // compose list of relationship ids to be set to ready.
+  return (
+    await Promise.all(
+      [...contentCompleteSynced.keys()].map(async (contentId) => {
+        const { relationship, relationshipId } = await api.assets.getStorageRelationshipAndId(providerId, contentId)
+
+        if (relationship) {
+          // maybe prior transaction to set ready failed for some reason..
+          if (!relationship.ready) {
+            return relationshipId
+          }
+        } else {
+          // create relationship
+          debug(`Creating new storage relationship for ${contentId.encode()}`)
+          try {
+            return await api.assets.createStorageRelationship(roleAddress, providerId, contentId)
+          } catch (err) {
+            debug(`Error creating new storage relationship ${contentId.encode()}: ${err.stack}`)
+          }
+        }
+
+        return null
+      })
+    )
+  ).filter((id) => id !== null)
+}
+
+async function setRelationshipsReady({ api, relationshipIds }) {
+  const roleAddress = api.identities.key.address
+  const providerId = api.storageProviderId
+
+  return Promise.all(
+    relationshipIds.map(async (relationshipId) => {
       try {
         await api.assets.toggleStorageRelationshipReady(roleAddress, providerId, relationshipId, true)
       } catch (err) {
-        debug(`Error setting relationship ready ${contentId.encode()}: ${err.stack}`)
+        debug('Error setting relationship ready')
       }
-    } else {
-      // we already have content and a ready relationship set. No need to do anything
-      // debug(`content already stored locally ${contentId.encode()}`);
-    }
-  })
-
-  return Promise.all(allChecks)
+    })
+  )
 }
 
-async function syncPeriodic(api, flags, storage) {
+async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced }) {
+  const retry = () => {
+    setTimeout(syncPeriodic, flags.syncPeriod, {
+      api,
+      flags,
+      storage,
+      contentBeingSynced,
+      contentCompleteSynced,
+    })
+  }
+
   try {
     debug('Starting sync run...')
 
     const chainIsSyncing = await api.chainIsSyncing()
     if (chainIsSyncing) {
       debug('Chain is syncing. Postponing sync run.')
-      return setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+      return retry()
     }
 
     const recommendedBalance = await api.providerHasMinimumBalance(300)
@@ -106,20 +138,26 @@ async function syncPeriodic(api, flags, storage) {
     const sufficientBalance = await api.providerHasMinimumBalance(100)
     if (!sufficientBalance) {
       debug('Provider role account does not have sufficient balance. Postponing sync run!')
-      return setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+      return retry()
     }
 
-    await syncCallback(api, storage)
+    await syncContent({ api, storage, contentBeingSynced, contentCompleteSynced })
+    const relationshipIds = await createNewRelationships({ api, contentCompleteSynced })
+    await setRelationshipsReady({ api, relationshipIds })
+
     debug('sync run complete')
   } catch (err) {
-    debug(`Error in syncPeriodic ${err.stack}`)
+    debug(`Error in sync run ${err.stack}`)
   }
+
   // always try again
-  setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+  retry()
 }
 
 function startSyncing(api, flags, storage) {
-  syncPeriodic(api, flags, storage)
+  const contentBeingSynced = new Map()
+  const contentCompleteSynced = new Map()
+  syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
 }
 
 module.exports = {

+ 29 - 17
storage-node/packages/storage/storage.js

@@ -217,7 +217,8 @@ class Storage {
 
     this.ipfs = ipfsClient(this.options.ipfs.connect_options)
 
-    this.pins = {}
+    this.pinned = {}
+    this.pinning = {}
 
     this.ipfs.id((err, identity) => {
       if (err) {
@@ -367,27 +368,38 @@ class Storage {
   /*
    * Synchronize the given content ID
    */
-  async synchronize(contentId) {
+  async synchronize(contentId, callback) {
     const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
 
-    // validate resolved id is proper ipfs_cid, not null or empty string
+    // TODO: validate resolved id is proper ipfs_cid, not null or empty string
 
-    if (this.pins[resolved]) {
-      return
-    }
+    if (!this.pinning[resolved] && !this.pinned[resolved]) {
+      debug(`Pinning hash: ${resolved} content-id: ${contentId}`)
+      this.pinning[resolved] = true
 
-    debug(`Pinning ${resolved}`)
+      // Callback passed to add() will be called on error or when the entire file
+      // is retrieved. So on success we consider the content synced.
+      this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
+        delete this.pinning[resolved]
+        if (err) {
+          debug(`Error Pinning: ${resolved}`)
+          callback && callback(err)
+        } else {
+          debug(`Pinned ${resolved}`)
+          this.pinned[resolved] = true
+          callback && callback(null, this.syncStatus(resolved))
+        }
+      })
+    } else {
+      callback && callback(null, this.syncStatus(resolved))
+    }
+  }
 
-    // This call blocks until file is retrieved..
-    this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
-      if (err) {
-        debug(`Error Pinning: ${resolved}`)
-        delete this.pins[resolved]
-      } else {
-        debug(`Pinned ${resolved}`)
-        // why aren't we doing this.pins[resolved] = true
-      }
-    })
+  syncStatus(ipfsHash) {
+    return {
+      syncing: this.pinning[ipfsHash] === true,
+      synced: this.pinned[ipfsHash] === true,
+    }
   }
 }
 

+ 7 - 34
storage-node/packages/util/ranges.js

@@ -21,18 +21,20 @@
 const uuid = require('uuid')
 const streamBuf = require('stream-buffers')
 
-const debug = require('debug')('joystream:util:ranges')
-
 /*
  * Range parsing
  */
 
+// Increase performance by "pre-computing" these regex expressions
+const PARSE_RANGE_REGEX = /^(\d+-\d+|\d+-|-\d+|\*)$/u
+const PARSE_RANGE_HEADERS_REGEX = /^(([^\s]+)=)?((?:(?:\d+-\d+|-\d+|\d+-),?)+)$/u
+
 /*
  * Parse a range string, e.g. '0-100' or '-100' or '0-'. Return the values
  * in an array of int or undefined (if not provided).
  */
 function parseRange(range) {
-  const matches = range.match(/^(\d+-\d+|\d+-|-\d+|\*)$/u)
+  const matches = range.match(PARSE_RANGE_REGEX)
   if (!matches) {
     throw new Error(`Not a valid range: ${range}`)
   }
@@ -56,8 +58,7 @@ function parseRange(range) {
  */
 function parse(rangeStr) {
   const res = {}
-  debug('Parse range header value:', rangeStr)
-  const matches = rangeStr.match(/^(([^\s]+)=)?((?:(?:\d+-\d+|-\d+|\d+-),?)+)$/u)
+  const matches = rangeStr.match(PARSE_RANGE_HEADERS_REGEX)
   if (!matches) {
     throw new Error(`Not a valid range header: ${rangeStr}`)
   }
@@ -74,15 +75,12 @@ function parse(rangeStr) {
 
   // Merge ranges into result.
   ranges.forEach((newRange) => {
-    debug('Found range:', newRange)
-
     let isMerged = false
     for (const i in res.ranges) {
       const oldRange = res.ranges[i]
 
       // Skip if the new range is fully separate from the old range.
       if (oldRange[1] + 1 < newRange[0] || newRange[1] + 1 < oldRange[0]) {
-        debug('Range does not overlap with', oldRange)
         continue
       }
 
@@ -92,11 +90,9 @@ function parse(rangeStr) {
       const merged = [Math.min(oldRange[0], newRange[0]), Math.max(oldRange[1], newRange[1])]
       res.ranges[i] = merged
       isMerged = true
-      debug('Merged', newRange, 'into', oldRange, 'as', merged)
     }
 
     if (!isMerged) {
-      debug('Non-overlapping range!')
       res.ranges.push(newRange)
     }
   })
@@ -110,7 +106,6 @@ function parse(rangeStr) {
     return first[0] < second[0] ? -1 : 1
   })
 
-  debug('Result of parse is', res)
   return res
 }
 
@@ -159,11 +154,6 @@ class RangeSender {
     this.handlers = {}
     this.opened = false
 
-    debug('RangeSender:', this)
-    if (opts.ranges) {
-      debug('Parsed ranges:', opts.ranges.ranges)
-    }
-
     // Parameters
     this.response = response
     this.stream = stream
@@ -173,7 +163,6 @@ class RangeSender {
 
   onError(err) {
     // Assume hiding the actual error is best, and default to 404.
-    debug('Error:', err)
     if (!this.response.headersSent) {
       this.response.status(err.code || 404).send({
         message: err.message || `File not found: ${this.name}`,
@@ -185,7 +174,6 @@ class RangeSender {
   }
 
   onEnd() {
-    debug('End of stream.')
     this.response.end()
     if (this.endCallback) {
       this.endCallback()
@@ -195,7 +183,6 @@ class RangeSender {
   // **** No ranges
   onOpenNoRange() {
     // File got opened, so we can set headers/status
-    debug('Open succeeded:', this.name, this.type)
     this.opened = true
 
     this.response.status(200)
@@ -228,7 +215,6 @@ class RangeSender {
     // Next range
     this.rangeIndex += 1
     if (this.rangeIndex >= this.ranges.ranges.length) {
-      debug('Cannot advance range index; we are done.')
       return undefined
     }
 
@@ -276,7 +262,6 @@ class RangeSender {
 
   nextRange() {
     if (this.ranges.ranges.length === 1) {
-      debug('Cannot start new range; only one requested.')
       this.stream.off('data', this.handlers.data)
       return false
     }
@@ -294,20 +279,17 @@ class RangeSender {
       }
       onDataRanges.write('\r\n')
       this.response.write(onDataRanges.getContents())
-      debug('New range started.')
       return true
     }
 
     // No headers means we're finishing the last range.
     this.response.write(`\r\n--${this.rangeBoundary}--\r\n`)
-    debug('End of ranges sent.')
     this.stream.off('data', this.handlers.data)
     return false
   }
 
   onOpenRanges() {
     // File got opened, so we can set headers/status
-    debug('Open succeeded:', this.name, this.type)
     this.opened = true
 
     this.response.header('Accept-Ranges', 'bytes')
@@ -347,34 +329,29 @@ class RangeSender {
     // The simplest optimization would be at ever range start to seek() to the
     // start.
     const chunkRange = [this.readOffset, this.readOffset + chunk.length - 1]
-    debug('= Got chunk with byte range', chunkRange)
     while (true) {
       let reqRange = this.ranges.ranges[this.rangeIndex]
       if (!reqRange) {
         break
       }
-      debug('Current requested range is', reqRange)
+
       if (!reqRange[1]) {
         reqRange = [reqRange[0], Number.MAX_SAFE_INTEGER]
-        debug('Treating as', reqRange)
       }
 
       // No overlap in the chunk and requested range; don't write.
       if (chunkRange[1] < reqRange[0] || chunkRange[0] > reqRange[1]) {
-        debug('Ignoring chunk; it is out of range.')
         break
       }
 
       // Since there is overlap, find the segment that's entirely within the
       // chunk.
       const segment = [Math.max(chunkRange[0], reqRange[0]), Math.min(chunkRange[1], reqRange[1])]
-      debug('Segment to send within chunk is', segment)
 
       // Normalize the segment to a chunk offset
       const start = segment[0] - this.readOffset
       const end = segment[1] - this.readOffset
       const len = end - start + 1
-      debug('Offsets into buffer are', [start, end], 'with length', len)
 
       // Write the slice that we want to write. We first create a buffer from the
       // chunk. Then we slice a new buffer from the same underlying ArrayBuffer,
@@ -385,12 +362,10 @@ class RangeSender {
 
       // If the requested range is finished, we should start the next one.
       if (reqRange[1] > chunkRange[1]) {
-        debug('Chunk is finished, but the requested range is missing bytes.')
         break
       }
 
       if (reqRange[1] <= chunkRange[1]) {
-        debug('Range is finished.')
         if (!this.nextRange(segment)) {
           break
         }
@@ -424,11 +399,9 @@ class RangeSender {
     this.handlers.end = this.onEnd.bind(this)
 
     if (this.ranges) {
-      debug('Preparing to handle ranges.')
       this.handlers.open = this.onOpenRanges.bind(this)
       this.handlers.data = this.onDataRanges.bind(this)
     } else {
-      debug('No ranges, just send the whole file.')
       this.handlers.open = this.onOpenNoRange.bind(this)
       this.handlers.data = this.onDataNoRange.bind(this)
     }