Browse Source

storage-node: fix storage.synchronize to always make callback and refactor in sync for clarity

Mokhtar Naamani 4 years ago
parent
commit
6ec9463ead
2 changed files with 87 additions and 89 deletions
  1. 65 60
      storage-node/packages/colossus/lib/sync.js
  2. 22 29
      storage-node/packages/storage/storage.js

+ 65 - 60
storage-node/packages/colossus/lib/sync.js

@@ -21,72 +21,57 @@
 const debug = require('debug')('joystream:sync')
 const _ = require('lodash')
 
-// TODO: refactor these two values into a new class
 // The number of concurrent sync sessions allowed. Must be greater than zero.
-const MAX_CONCURRENT_SYNC_ITEMS = 15
-const contentBeingSynced = new Map()
+const MAX_CONCURRENT_SYNC_ITEMS = 20
 
-async function syncCallback(api, storage) {
+async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
   const knownContentIds = await api.assets.getKnownContentIds()
-  const roleAddress = api.identities.key.address
-  const providerId = api.storageProviderId
 
-  // Iterate over all objects, and start syncing if required.
-  // compile list of already syncedIds (as reported by storage
-  // subsytem). The only async part here is resolving content id
-  // by storage to ipfs cid, maybe we can resolve them locally
-  // and cache result to simplify async code below and reduce
-  // queries
+  // Select ids which we have not yet fully synced
+  const needsSync = knownContentIds
+    .filter((id) => !contentCompleteSynced.has(id))
+    .filter((id) => !contentBeingSynced.has(id))
 
   // Since we are limiting concurrent content ids being synced, to ensure
   // better distribution of content across storage nodes during a potentially long
   // sync process we don't want all nodes to replicate items in the same order, so
-  // we simply shuffle ids around.
-  const shuffledIds = _.shuffle(knownContentIds)
-
-  const syncedIds = (
-    await Promise.all(
-      shuffledIds.map(async (contentId) => {
-        // TODO: get the data object
-        // make sure the data object was Accepted by the liaison,
-        // don't just blindly attempt to fetch them
-
-        try {
-          const { synced, syncing } = await storage.syncStatus(contentId)
-
-          if (synced) {
-            return contentId
-          } else if (!syncing) {
-            if (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS) {
-              try {
-                contentBeingSynced.set(contentId, true)
-
-                await storage.synchronize(contentId, () => {
-                  contentBeingSynced.delete(contentId)
-                })
-              } catch (err) {
-                debug(`Failed calling synchronize ${err}`)
-                contentBeingSynced.delete(contentId)
-              }
-            } else {
-              // Content needs to be synced, but limit on concurrent syncs reached
-              debug('Deferring, concurrent sessions exhausted.')
-            }
-          }
-        } catch (err) {
-          debug(`Failed getting syncStatus. contnetId: ${contentId} ${err}`)
+  // we simply shuffle.
+  const candidatesForSync = _.shuffle(needsSync)
+
+  // TODO: get the data object
+  // make sure the data object was Accepted by the liaison,
+  // don't just blindly attempt to fetch them
+  while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
+    const contentId = candidatesForSync.shift()
+
+    try {
+      contentBeingSynced.set(contentId)
+      await storage.synchronize(contentId, (err, status) => {
+        if (err) {
+          contentBeingSynced.delete(contentId)
+          debug(`Error Syncing ${err}`)
+        } else if (status.synced) {
+          contentBeingSynced.delete(contentId)
+          contentCompleteSynced.set(contentId)
         }
-
-        return null
       })
-    )
-  ).filter((id) => id !== null)
+    } catch (err) {
+      // Most likely failed to resolve the content id
+      debug(`Failed calling synchronize ${err}`)
+      contentBeingSynced.delete(contentId)
+    }
+  }
+}
+
+async function createNewRelationships({ api, contentCompleteSynced }) {
+  const roleAddress = api.identities.key.address
+  const providerId = api.storageProviderId
 
   // Create new relationships for synced content if required and
   // compose list of relationship ids to be set to ready.
-  const relationshipIds = (
+  return (
     await Promise.all(
-      syncedIds.map(async (contentId) => {
+      [...contentCompleteSynced.keys()].map(async (contentId) => {
         const { relationship, relationshipId } = await api.assets.getStorageRelationshipAndId(providerId, contentId)
 
         if (relationship) {
@@ -108,8 +93,12 @@ async function syncCallback(api, storage) {
       })
     )
   ).filter((id) => id !== null)
+}
+
+async function setRelationshipsReady({ api, relationshipIds }) {
+  const roleAddress = api.identities.key.address
+  const providerId = api.storageProviderId
 
-  // Set relationships to ready state
   return Promise.all(
     relationshipIds.map(async (relationshipId) => {
       try {
@@ -121,14 +110,24 @@ async function syncCallback(api, storage) {
   )
 }
 
-async function syncPeriodic(api, flags, storage) {
+async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced }) {
+  const retry = () => {
+    setTimeout(syncPeriodic, flags.syncPeriod, {
+      api,
+      flags,
+      storage,
+      contentBeingSynced,
+      contentCompleteSynced,
+    })
+  }
+
   try {
     debug('Starting sync run...')
 
     const chainIsSyncing = await api.chainIsSyncing()
     if (chainIsSyncing) {
       debug('Chain is syncing. Postponing sync run.')
-      return setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+      return retry()
     }
 
     const recommendedBalance = await api.providerHasMinimumBalance(300)
@@ -139,20 +138,26 @@ async function syncPeriodic(api, flags, storage) {
     const sufficientBalance = await api.providerHasMinimumBalance(100)
     if (!sufficientBalance) {
       debug('Provider role account does not have sufficient balance. Postponing sync run!')
-      return setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+      return retry()
     }
 
-    await syncCallback(api, storage)
+    await syncContent({ api, storage, contentBeingSynced, contentCompleteSynced })
+    const relationshipIds = await createNewRelationships({ api, contentCompleteSynced })
+    await setRelationshipsReady({ api, relationshipIds })
+
     debug('sync run complete')
   } catch (err) {
-    debug(`Error in syncPeriodic ${err.stack}`)
+    debug(`Error in sync run ${err.stack}`)
   }
+
   // always try again
-  setTimeout(syncPeriodic, flags.syncPeriod, api, flags, storage)
+  retry()
 }
 
 function startSyncing(api, flags, storage) {
-  syncPeriodic(api, flags, storage)
+  const contentBeingSynced = new Map()
+  const contentCompleteSynced = new Map()
+  syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
 }
 
 module.exports = {

+ 22 - 29
storage-node/packages/storage/storage.js

@@ -369,39 +369,32 @@ class Storage {
 
     // TODO: validate resolved id is proper ipfs_cid, not null or empty string
 
-    // noop if already pinned or pinning (assuming no external forces remove the pin)
-    // note: ipfs GC will NOT remove pinned content. Only explicit call to remove the pin
-    // will result in inconsistency.
-    if (this.pinning[resolved] || this.pinned[resolved]) {
-      return
+    if (!this.pinning[resolved] && !this.pinned[resolved]) {
+      debug(`Pinning hash: ${resolved} content-id: ${contentId}`)
+      this.pinning[resolved] = true
+
+      // Callback passed to add() will be called on error or when the entire file
+      // is retrieved. So on success we consider the content synced.
+      this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
+        delete this.pinning[resolved]
+        if (err) {
+          debug(`Error Pinning: ${resolved}`)
+          callback && callback(err)
+        } else {
+          debug(`Pinned ${resolved}`)
+          this.pinned[resolved] = true
+          callback && callback(null, this.syncStatus(resolved))
+        }
+      })
+    } else {
+      callback && callback(null, this.syncStatus(resolved))
     }
-
-    debug(`Pinning hash: ${resolved} content-id: ${contentId}`)
-    this.pinning[resolved] = true
-
-    // Callback passed to add() will be called on error or when the entire file
-    // is retrieved. So on success we consider the content synced.
-    this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
-      delete this.pinning[resolved]
-      if (err) {
-        debug(`Error Pinning: ${resolved}`)
-        callback && callback(err)
-      } else {
-        debug(`Pinned ${resolved}`)
-        this.pinned[resolved] = true
-        callback && callback()
-      }
-    })
   }
 
-  /*
-   * Get the syncing status of a content ID
-   */
-  async syncStatus(contentId) {
-    const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
+  async syncStatus(ipfsHash) {
     return {
-      syncing: this.pinning[resolved] === true,
-      synced: this.pinned[resolved] === true,
+      syncing: this.pinning[ipfsHash] === true,
+      synced: this.pinned[ipfsHash] === true,
     }
   }
 }