|
@@ -8,6 +8,8 @@ import { CHANNEL_AVATAR_TARGET_SIZE, CHANNEL_COVER_TARGET_SIZE } from './ImageRe
|
|
import { ChannelId } from '@joystream/types/common'
|
|
import { ChannelId } from '@joystream/types/common'
|
|
import _ from 'lodash'
|
|
import _ from 'lodash'
|
|
import { MigrationResult } from './BaseMigration'
|
|
import { MigrationResult } from './BaseMigration'
|
|
|
|
+import { Logger } from 'winston'
|
|
|
|
+import { createLogger } from '../logging'
|
|
|
|
|
|
export type ChannelsMigrationConfig = AssetsMigrationConfig & {
|
|
export type ChannelsMigrationConfig = AssetsMigrationConfig & {
|
|
channelIds: number[]
|
|
channelIds: number[]
|
|
@@ -29,11 +31,13 @@ export class ChannelMigration extends AssetsMigration {
|
|
protected config: ChannelsMigrationConfig
|
|
protected config: ChannelsMigrationConfig
|
|
protected videoIds: number[] = []
|
|
protected videoIds: number[] = []
|
|
protected forcedChannelOwner: { id: string; controllerAccount: string } | undefined
|
|
protected forcedChannelOwner: { id: string; controllerAccount: string } | undefined
|
|
|
|
+ protected logger: Logger
|
|
|
|
|
|
public constructor(params: ChannelsMigrationParams) {
|
|
public constructor(params: ChannelsMigrationParams) {
|
|
super(params)
|
|
super(params)
|
|
this.config = params.config
|
|
this.config = params.config
|
|
this.forcedChannelOwner = params.forcedChannelOwner
|
|
this.forcedChannelOwner = params.forcedChannelOwner
|
|
|
|
+ this.logger = createLogger(this.name)
|
|
}
|
|
}
|
|
|
|
|
|
private getChannelOwnerMember({ id, ownerMember }: ChannelFieldsFragment) {
|
|
private getChannelOwnerMember({ id, ownerMember }: ChannelFieldsFragment) {
|
|
@@ -48,21 +52,46 @@ export class ChannelMigration extends AssetsMigration {
|
|
return ownerMember
|
|
return ownerMember
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected async migrateBatch(channels: ChannelFieldsFragment[]): Promise<void> {
|
|
|
|
+ const { api } = this
|
|
|
|
+ const txs = _.flatten(await Promise.all(channels.map((c) => this.prepareChannel(c))))
|
|
|
|
+ const result = await api.sendExtrinsic(this.sudo, api.tx.utility.batch(txs))
|
|
|
|
+ const channelCreatedEvents = api.findEvents(result, 'content', 'ChannelCreated')
|
|
|
|
+ const newChannelIds: ChannelId[] = channelCreatedEvents.map((e) => e.data[1])
|
|
|
|
+ if (channelCreatedEvents.length !== channels.length) {
|
|
|
|
+ this.extractFailedMigrations(result, channels)
|
|
|
|
+ }
|
|
|
|
+ const newChannelMapEntries: [number, number][] = []
|
|
|
|
+ let newChannelIdIndex = 0
|
|
|
|
+ channels.forEach(({ id }) => {
|
|
|
|
+ if (this.failedMigrations.has(parseInt(id))) {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ const newChannelId = newChannelIds[newChannelIdIndex++].toNumber()
|
|
|
|
+ this.idsMap.set(parseInt(id), newChannelId)
|
|
|
|
+ newChannelMapEntries.push([parseInt(id), newChannelId])
|
|
|
|
+ })
|
|
|
|
+ if (newChannelMapEntries.length) {
|
|
|
|
+ this.logger.info('Channel map entries added!', { newChannelMapEntries })
|
|
|
|
+ const dataObjectsUploadedEvents = this.api.findEvents(result, 'storage', 'DataObjectsUploaded')
|
|
|
|
+ this.assetsManager.queueUploadsFromEvents(dataObjectsUploadedEvents)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public async run(): Promise<ChannelsMigrationResult> {
|
|
public async run(): Promise<ChannelsMigrationResult> {
|
|
await this.init()
|
|
await this.init()
|
|
const {
|
|
const {
|
|
- api,
|
|
|
|
config: { channelIds, channelBatchSize },
|
|
config: { channelIds, channelBatchSize },
|
|
} = this
|
|
} = this
|
|
const ids = channelIds.sort((a, b) => a - b)
|
|
const ids = channelIds.sort((a, b) => a - b)
|
|
while (ids.length) {
|
|
while (ids.length) {
|
|
const idsBatch = ids.splice(0, channelBatchSize)
|
|
const idsBatch = ids.splice(0, channelBatchSize)
|
|
- console.log(`Fetching a batch of ${idsBatch.length} channels...`)
|
|
|
|
|
|
+ this.logger.info(`Fetching a batch of ${idsBatch.length} channels...`)
|
|
const channelsBatch = (await this.queryNodeApi.getChannelsByIds(idsBatch)).sort(
|
|
const channelsBatch = (await this.queryNodeApi.getChannelsByIds(idsBatch)).sort(
|
|
(a, b) => parseInt(a.id) - parseInt(b.id)
|
|
(a, b) => parseInt(a.id) - parseInt(b.id)
|
|
)
|
|
)
|
|
if (channelsBatch.length < idsBatch.length) {
|
|
if (channelsBatch.length < idsBatch.length) {
|
|
- console.error(
|
|
|
|
|
|
+ this.logger.warn(
|
|
`Some channels were not be found: ${_.difference(
|
|
`Some channels were not be found: ${_.difference(
|
|
idsBatch,
|
|
idsBatch,
|
|
channelsBatch.map((c) => parseInt(c.id))
|
|
channelsBatch.map((c) => parseInt(c.id))
|
|
@@ -71,34 +100,14 @@ export class ChannelMigration extends AssetsMigration {
|
|
}
|
|
}
|
|
const channelsToMigrate = channelsBatch.filter((c) => !this.idsMap.has(parseInt(c.id)))
|
|
const channelsToMigrate = channelsBatch.filter((c) => !this.idsMap.has(parseInt(c.id)))
|
|
if (channelsToMigrate.length < channelsBatch.length) {
|
|
if (channelsToMigrate.length < channelsBatch.length) {
|
|
- console.log(
|
|
|
|
|
|
+ this.logger.info(
|
|
`${channelsToMigrate.length ? 'Some' : 'All'} channels in this batch were already migrated ` +
|
|
`${channelsToMigrate.length ? 'Some' : 'All'} channels in this batch were already migrated ` +
|
|
`(${channelsBatch.length - channelsToMigrate.length}/${channelsBatch.length})`
|
|
`(${channelsBatch.length - channelsToMigrate.length}/${channelsBatch.length})`
|
|
)
|
|
)
|
|
}
|
|
}
|
|
if (channelsToMigrate.length) {
|
|
if (channelsToMigrate.length) {
|
|
- const txs = _.flatten(await Promise.all(channelsToMigrate.map((c) => this.prepareChannel(c))))
|
|
|
|
- const result = await api.sendExtrinsic(this.sudo, api.tx.utility.batch(txs))
|
|
|
|
- const channelCreatedEvents = api.findEvents(result, 'content', 'ChannelCreated')
|
|
|
|
- const newChannelIds: ChannelId[] = channelCreatedEvents.map((e) => e.data[1])
|
|
|
|
- if (channelCreatedEvents.length !== channelsToMigrate.length) {
|
|
|
|
- this.extractFailedSudoAsMigrations(result, channelsToMigrate)
|
|
|
|
- }
|
|
|
|
- const dataObjectsUploadedEvents = api.findEvents(result, 'storage', 'DataObjectsUploaded')
|
|
|
|
- const newChannelMapEntries: [number, number][] = []
|
|
|
|
- let newChannelIdIndex = 0
|
|
|
|
- channelsToMigrate.forEach(({ id }) => {
|
|
|
|
- if (this.failedMigrations.has(parseInt(id))) {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- const newChannelId = newChannelIds[newChannelIdIndex++].toNumber()
|
|
|
|
- this.idsMap.set(parseInt(id), newChannelId)
|
|
|
|
- newChannelMapEntries.push([parseInt(id), newChannelId])
|
|
|
|
- })
|
|
|
|
- if (newChannelMapEntries.length) {
|
|
|
|
- console.log('Channel map entries added!', newChannelMapEntries)
|
|
|
|
- await this.assetsManager.uploadFromEvents(dataObjectsUploadedEvents)
|
|
|
|
- }
|
|
|
|
|
|
+ await this.executeBatchMigration(channelsToMigrate)
|
|
|
|
+ await this.assetsManager.processQueuedUploads()
|
|
}
|
|
}
|
|
const videoIdsToMigrate: number[] = channelsBatch.reduce<number[]>(
|
|
const videoIdsToMigrate: number[] = channelsBatch.reduce<number[]>(
|
|
(res, { id, videos }) => (this.idsMap.has(parseInt(id)) ? res.concat(videos.map((v) => parseInt(v.id))) : res),
|
|
(res, { id, videos }) => (this.idsMap.has(parseInt(id)) ? res.concat(videos.map((v) => parseInt(v.id))) : res),
|
|
@@ -106,7 +115,7 @@ export class ChannelMigration extends AssetsMigration {
|
|
)
|
|
)
|
|
this.videoIds = this.videoIds.concat(videoIdsToMigrate)
|
|
this.videoIds = this.videoIds.concat(videoIdsToMigrate)
|
|
if (videoIdsToMigrate.length) {
|
|
if (videoIdsToMigrate.length) {
|
|
- console.log(`Added ${videoIdsToMigrate.length} video ids to the list of videos to migrate`)
|
|
|
|
|
|
+ this.logger.info(`Added ${videoIdsToMigrate.length} video ids to the list of videos to migrate`)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return {
|
|
return {
|