sync.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict'
  19. const debug = require('debug')('joystream:sync')
  20. const _ = require('lodash')
  21. const { ContentId } = require('@joystream/types/storage')
  22. const { nextTick } = require('@joystream/storage-utils/sleep')
  23. // Time to wait between sync runs. The lower the better chance to consume all
  24. // available sync sessions allowed.
  25. const INTERVAL_BETWEEN_SYNC_RUNS_MS = 3000
  26. async function syncRun({ api, storage, contentBeingSynced, contentCompletedSync, flags }) {
  27. // The number of concurrent items to attemp to fetch.
  28. const MAX_CONCURRENT_SYNC_ITEMS = Math.max(1, flags.maxSync)
  29. const contentIds = api.assets.getAcceptedIpfsHashes()
  30. // Select ids which may need to be synced
  31. const idsNotSynced = contentIds
  32. .filter((id) => !contentCompletedSync.has(id))
  33. .filter((id) => !contentBeingSynced.has(id))
  34. // We are limiting how many content ids can be synced concurrently, so to ensure
  35. // better distribution of content across storage nodes during a potentially long
  36. // sync process we don't want all nodes to replicate items in the same order, so
  37. // we simply shuffle.
  38. const idsToSync = _.shuffle(idsNotSynced)
  39. while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && idsToSync.length) {
  40. const id = idsToSync.shift()
  41. try {
  42. contentBeingSynced.set(id)
  43. await storage.pin(id, (err, status) => {
  44. if (err) {
  45. contentBeingSynced.delete(id)
  46. debug(`Error Syncing ${err}`)
  47. } else if (status.synced) {
  48. contentBeingSynced.delete(id)
  49. contentCompletedSync.set(id)
  50. }
  51. })
  52. } catch (err) {
  53. // Most likely failed to resolve the content id
  54. debug(`Failed calling synchronize ${err}`)
  55. contentBeingSynced.delete(id)
  56. }
  57. // Allow callbacks to call to storage.synchronize() to be invoked during this sync run
  58. // This will happen if content is found to be local and will speed overall sync process.
  59. await nextTick()
  60. }
  61. }
  62. async function syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync }) {
  63. const retry = () => {
  64. setTimeout(syncRunner, INTERVAL_BETWEEN_SYNC_RUNS_MS, {
  65. api,
  66. flags,
  67. storage,
  68. contentBeingSynced,
  69. contentCompletedSync,
  70. })
  71. }
  72. try {
  73. if (await api.chainIsSyncing()) {
  74. debug('Chain is syncing. Postponing sync.')
  75. } else {
  76. await syncRun({
  77. api,
  78. storage,
  79. contentBeingSynced,
  80. contentCompletedSync,
  81. flags,
  82. })
  83. }
  84. } catch (err) {
  85. debug(`Error during sync ${err.stack}`)
  86. }
  87. // schedule next sync run
  88. retry()
  89. }
  90. function startSyncing(api, flags, storage) {
  91. // ids of content currently being synced
  92. const contentBeingSynced = new Map()
  93. // ids of content that completed sync
  94. const contentCompletedSync = new Map()
  95. syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync })
  96. setInterval(() => {
  97. debug(`objects syncing: ${contentBeingSynced.size}`)
  98. debug(`objects local: ${contentCompletedSync.size}`)
  99. }, 60000)
  100. }
  101. module.exports = {
  102. startSyncing,
  103. }