|
@@ -20,15 +20,15 @@
|
|
|
|
|
|
const debug = require('debug')('joystream:sync')
|
|
|
const _ = require('lodash')
|
|
|
-
|
|
|
+const { ContentId } = require('@joystream/types/media')
|
|
|
// The number of concurrent sync sessions allowed. Must be greater than zero.
|
|
|
const MAX_CONCURRENT_SYNC_ITEMS = 20
|
|
|
|
|
|
async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
|
|
|
- const knownContentIds = await api.assets.getKnownContentIds()
|
|
|
+ const knownEncodedContentIds = (await api.assets.getKnownContentIds()).map((id) => id.encode())
|
|
|
|
|
|
// Select ids which we have not yet fully synced
|
|
|
- const needsSync = knownContentIds
|
|
|
+ const needsSync = knownEncodedContentIds
|
|
|
.filter((id) => !contentCompleteSynced.has(id))
|
|
|
.filter((id) => !contentBeingSynced.has(id))
|
|
|
|
|
@@ -42,23 +42,24 @@ async function syncContent({ api, storage, contentBeingSynced, contentCompleteSy
|
|
|
// make sure the data object was Accepted by the liaison,
|
|
|
// don't just blindly attempt to fetch them
|
|
|
while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
|
|
|
- const contentId = candidatesForSync.shift()
|
|
|
+ const id = candidatesForSync.shift()
|
|
|
|
|
|
try {
|
|
|
- contentBeingSynced.set(contentId)
|
|
|
+ contentBeingSynced.set(id)
|
|
|
+ const contentId = ContentId.decode(api.api.registry, id)
|
|
|
await storage.synchronize(contentId, (err, status) => {
|
|
|
if (err) {
|
|
|
- contentBeingSynced.delete(contentId)
|
|
|
+ contentBeingSynced.delete(id)
|
|
|
debug(`Error Syncing ${err}`)
|
|
|
} else if (status.synced) {
|
|
|
- contentBeingSynced.delete(contentId)
|
|
|
- contentCompleteSynced.set(contentId)
|
|
|
+ contentBeingSynced.delete(id)
|
|
|
+ contentCompleteSynced.set(id)
|
|
|
}
|
|
|
})
|
|
|
} catch (err) {
|
|
|
// Most likely failed to resolve the content id
|
|
|
debug(`Failed calling synchronize ${err}`)
|
|
|
- contentBeingSynced.delete(contentId)
|
|
|
+ contentBeingSynced.delete(id)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -71,7 +72,8 @@ async function createNewRelationships({ api, contentCompleteSynced }) {
|
|
|
// compose list of relationship ids to be set to ready.
|
|
|
return (
|
|
|
await Promise.all(
|
|
|
- [...contentCompleteSynced.keys()].map(async (contentId) => {
|
|
|
+ [...contentCompleteSynced.keys()].map(async (id) => {
|
|
|
+ const contentId = ContentId.decode(api.api.registry, id)
|
|
|
const { relationship, relationshipId } = await api.assets.getStorageRelationshipAndId(providerId, contentId)
|
|
|
|
|
|
if (relationship) {
|
|
@@ -81,11 +83,11 @@ async function createNewRelationships({ api, contentCompleteSynced }) {
|
|
|
}
|
|
|
} else {
|
|
|
// create relationship
|
|
|
- debug(`Creating new storage relationship for ${contentId.encode()}`)
|
|
|
+ debug(`Creating new storage relationship for ${id}`)
|
|
|
try {
|
|
|
return await api.assets.createStorageRelationship(roleAddress, providerId, contentId)
|
|
|
} catch (err) {
|
|
|
- debug(`Error creating new storage relationship ${contentId.encode()}: ${err.stack}`)
|
|
|
+ debug(`Error creating new storage relationship ${id}: ${err.stack}`)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -122,7 +124,7 @@ async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCo
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- debug('Starting sync run...')
|
|
|
+ debug('Sync run started.')
|
|
|
|
|
|
const chainIsSyncing = await api.chainIsSyncing()
|
|
|
if (chainIsSyncing) {
|
|
@@ -145,7 +147,7 @@ async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCo
|
|
|
const relationshipIds = await createNewRelationships({ api, contentCompleteSynced })
|
|
|
await setRelationshipsReady({ api, relationshipIds })
|
|
|
|
|
|
- debug('sync run complete')
|
|
|
+ debug(`Sync run completed, set ${relationshipIds.length} new relationships to ready`)
|
|
|
} catch (err) {
|
|
|
debug(`Error in sync run ${err.stack}`)
|
|
|
}
|
|
@@ -155,8 +157,11 @@ async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCo
|
|
|
}
|
|
|
|
|
|
function startSyncing(api, flags, storage) {
|
|
|
+ // ids of content currently being synced
|
|
|
const contentBeingSynced = new Map()
|
|
|
+ // ids of content that completed sync and may require creating a new relationship
|
|
|
const contentCompleteSynced = new Map()
|
|
|
+
|
|
|
syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
|
|
|
}
|
|
|
|