sync.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict'
  19. const debug = require('debug')('joystream:sync')
  20. const _ = require('lodash')
  21. const { ContentId } = require('@joystream/types/storage')
  22. const { nextTick } = require('@joystream/storage-utils/sleep')
  23. // The number of concurrent items to attemp to fetch. Must be greater than zero.
  24. const MAX_CONCURRENT_SYNC_ITEMS = 30
  25. async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
  26. if (contentBeingSynced.size === MAX_CONCURRENT_SYNC_ITEMS) return
  27. const knownEncodedContentIds = (await api.assets.getAcceptedContentIds()).map((id) => id.encode())
  28. // Select ids which we have not yet fully synced
  29. const needsSync = knownEncodedContentIds
  30. .filter((id) => !contentCompleteSynced.has(id))
  31. .filter((id) => !contentBeingSynced.has(id))
  32. // Since we are limiting concurrent content ids being synced, to ensure
  33. // better distribution of content across storage nodes during a potentially long
  34. // sync process we don't want all nodes to replicate items in the same order, so
  35. // we simply shuffle.
  36. const candidatesForSync = _.shuffle(needsSync)
  37. debug(`${candidatesForSync.length} items remaining to process`)
  38. let syncedItemsCount = 0
  39. while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
  40. const id = candidatesForSync.shift()
  41. // Log progress
  42. if (syncedItemsCount % 100 === 0) {
  43. debug(`${candidatesForSync.length} items remaining to process`)
  44. }
  45. try {
  46. contentBeingSynced.set(id)
  47. const contentId = ContentId.decode(api.api.registry, id)
  48. await storage.synchronize(contentId, (err, status) => {
  49. if (err) {
  50. contentBeingSynced.delete(id)
  51. debug(`Error Syncing ${err}`)
  52. } else if (status.synced) {
  53. syncedItemsCount++
  54. contentBeingSynced.delete(id)
  55. contentCompleteSynced.set(id)
  56. }
  57. })
  58. // Allow short time for checking if content is already stored locally.
  59. // So we can handle more new items per run.
  60. await nextTick()
  61. } catch (err) {
  62. // Most likely failed to resolve the content id
  63. debug(`Failed calling synchronize ${err}`)
  64. contentBeingSynced.delete(id)
  65. }
  66. }
  67. debug(`Items processed in this sync run ${syncedItemsCount}`)
  68. }
  69. async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced }) {
  70. const retry = () => {
  71. setTimeout(syncPeriodic, flags.syncPeriod, {
  72. api,
  73. flags,
  74. storage,
  75. contentBeingSynced,
  76. contentCompleteSynced,
  77. })
  78. }
  79. try {
  80. const chainIsSyncing = await api.chainIsSyncing()
  81. if (chainIsSyncing) {
  82. debug('Chain is syncing. Postponing sync.')
  83. } else {
  84. await syncContent({ api, storage, contentBeingSynced, contentCompleteSynced })
  85. }
  86. } catch (err) {
  87. debug(`Error during sync ${err.stack}`)
  88. }
  89. // always try again
  90. retry()
  91. }
  92. function startSyncing(api, flags, storage) {
  93. // ids of content currently being synced
  94. const contentBeingSynced = new Map()
  95. // ids of content that completed sync and may require creating a new relationship
  96. const contentCompleteSynced = new Map()
  97. syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
  98. }
  99. module.exports = {
  100. startSyncing,
  101. }