Browse Source

Migration scripts: Fix failed uploads/downloads handling

Leszek Wiesner 3 years ago
parent
commit
586b74401d

+ 4 - 1
utils/migration-scripts/src/commands/sumer-giza/retryFailedUploads.ts

@@ -38,7 +38,10 @@ export class RetryFailedUploadsCommand extends Command {
       await api.isReadyOrError
       const assetsManager = await AssetsManager.create({
         api,
-        config: opts,
+        config: {
+          ...opts,
+          migrationStatePath: path.dirname(opts.failedUploadsPath),
+        },
       })
       assetsManager.loadQueue(opts.failedUploadsPath)
       await assetsManager.processQueuedUploads()

+ 1 - 0
utils/migration-scripts/src/logging.ts

@@ -10,6 +10,7 @@ winston.addColors(colors)
 
 export function createLogger(label: string): Logger {
   return winston.createLogger({
+    level: 'debug',
     transports: [new winston.transports.Console()],
     defaultMeta: { label },
     format: winston.format.combine(

+ 20 - 16
utils/migration-scripts/src/sumer-giza/AssetsManager.ts

@@ -25,6 +25,7 @@ export type AssetsManagerConfig = {
   uploadSpBucketId: number
   uploadSpEndpoint: string
   dataDir: string
+  migrationStatePath: string
 }
 
 export type AssetsManagerParams = {
@@ -58,6 +59,7 @@ export class AssetsManager {
   private resizer: ImageResizer
   private queuedUploads: Set<string>
   private isQueueProcessing = false
+  private queueFilePath: string
   private logger: Logger
 
   public get queueSize(): number {
@@ -85,6 +87,8 @@ export class AssetsManager {
     this.logger = createLogger('Assets Manager')
     fs.mkdirSync(this.tmpAssetPath(''), { recursive: true })
     fs.mkdirSync(this.assetPath(''), { recursive: true })
+    this.queueFilePath = path.join(this.config.migrationStatePath, `unprocessedUploads_${Date.now()}.json`)
+    this.logger.info(`Failed/pending uploads will be saved to ${this.queueFilePath}`)
   }
 
   private static async getSumerStorageProviderEndpoints(queryNodeApi: QueryNodeApi): Promise<string[]> {
@@ -126,13 +130,7 @@ export class AssetsManager {
       return
     }
     let objectSize = new BN(data.size).toNumber()
-    let path: string
-    try {
-      path = await this.fetchAssetWithRetry(data.joystreamContentId, objectSize)
-    } catch (e) {
-      this.logger.error(`Data object ${data.joystreamContentId} was not fetched: ${(e as Error).message}`)
-      return
-    }
+    const path = await this.fetchAssetWithRetry(data.joystreamContentId, objectSize)
     if (targetSize) {
       try {
         await this.resizer.resize(path, targetSize)
@@ -264,13 +262,7 @@ export class AssetsManager {
     }
 
     if (uploadSuccesful) {
-      // Remove asset from queuedUploads and temporary storage
-      this.queuedUploads.delete(`${bagId}|${dataObjectId}`)
-      try {
-        fs.rmSync(dataPath)
-      } catch (e) {
-        this.logger.error(`Could not remove file "${dataPath}" after succesful upload...`)
-      }
+      this.finalizeUpload(bagId, dataObjectId, dataPath)
     }
   }
 
@@ -294,13 +286,25 @@ export class AssetsManager {
     this.queuedUploads = new Set(queue)
   }
 
-  public saveQueue(queueFilePath: string): void {
-    fs.writeFileSync(queueFilePath, JSON.stringify(Array.from(this.queuedUploads)))
+  public saveQueue(): void {
+    fs.writeFileSync(this.queueFilePath, JSON.stringify(Array.from(this.queuedUploads)))
+    this.logger.debug(`${this.queueFilePath} updated`, { queueSize: this.queuedUploads.size })
   }
 
   private queueUpload(bagId: BagId, objectId: DataObjectId): void {
     const bagIdStr = `dynamic:channel:${bagId.asType('Dynamic').asType('Channel').toString()}`
     this.queuedUploads.add(`${bagIdStr}|${objectId.toString()}`)
+    this.saveQueue()
+  }
+
+  private finalizeUpload(bagId: string, dataObjectId: number, dataPath: string) {
+    this.queuedUploads.delete(`${bagId}|${dataObjectId}`)
+    this.saveQueue()
+    try {
+      fs.rmSync(dataPath)
+    } catch (e) {
+      this.logger.error(`Could not remove file "${dataPath}" after succesful upload...`)
+    }
   }
 
   public queueUploadsFromEvents(events: IEvent<[Vec<DataObjectId>, UploadParameters, Balance]>[]): void {

+ 5 - 25
utils/migration-scripts/src/sumer-giza/AssetsMigration.ts

@@ -1,40 +1,20 @@
-import { BaseMigration, BaseMigrationConfig, BaseMigrationParams, MigrationResult } from './BaseMigration'
+import { BaseMigration, BaseMigrationConfig, BaseMigrationParams } from './BaseMigration'
 import { AssetsManager, AssetsManagerConfig } from './AssetsManager'
 
 export type AssetsMigrationConfig = BaseMigrationConfig & AssetsManagerConfig
 
 export type AssetsMigrationParams = BaseMigrationParams & {
+  assetsManager: AssetsManager
   config: AssetsMigrationConfig
 }
 
 export abstract class AssetsMigration extends BaseMigration {
   protected config: AssetsMigrationConfig
-  protected assetsManager!: AssetsManager
+  protected assetsManager: AssetsManager
 
-  public constructor({ api, queryNodeApi, config }: AssetsMigrationParams) {
+  public constructor({ api, queryNodeApi, config, assetsManager }: AssetsMigrationParams) {
     super({ api, queryNodeApi, config })
     this.config = config
-  }
-
-  public async init(): Promise<void> {
-    await super.init()
-    this.assetsManager = await AssetsManager.create({
-      api: this.api,
-      queryNodeApi: this.queryNodeApi,
-      config: this.config,
-    })
-  }
-
-  public abstract run(): Promise<MigrationResult>
-
-  protected saveMigrationState(isExitting: boolean): void {
-    super.saveMigrationState(isExitting)
-    if (isExitting && this.assetsManager.queueSize) {
-      const failedUploadsFilePath = this.getMigrationStateFilePath().replace(
-        '.json',
-        `FailedUploads_${Date.now()}.json`
-      )
-      this.assetsManager.saveQueue(failedUploadsFilePath)
-    }
+    this.assetsManager = assetsManager
   }
 }

+ 7 - 0
utils/migration-scripts/src/sumer-giza/ContentMigration.ts

@@ -3,6 +3,7 @@ import { QueryNodeApi } from './sumer-query-node/api'
 import { RuntimeApi } from '../RuntimeApi'
 import { VideosMigration } from './VideosMigration'
 import { ChannelMigration } from './ChannelsMigration'
+import { AssetsManager } from './AssetsManager'
 
 export type ContentMigrationConfig = {
   queryNodeUri: string
@@ -51,11 +52,16 @@ export class ContentMigration {
     const { api, queryNodeApi, config } = this
     await this.api.isReadyOrError
     const forcedChannelOwner = await this.getForcedChannelOwner()
+    const assetsManager = await AssetsManager.create({
+      api,
+      config,
+    })
     const { idsMap: channelsMap, videoIds } = await new ChannelMigration({
       api,
       queryNodeApi,
       config,
       forcedChannelOwner,
+      assetsManager,
     }).run()
     await new VideosMigration({
       api,
@@ -64,6 +70,7 @@ export class ContentMigration {
       channelsMap,
       videoIds,
       forcedChannelOwner,
+      assetsManager,
     }).run()
   }
 }

+ 6 - 6
utils/migration-scripts/src/sumer-giza/VideosMigration.ts

@@ -31,12 +31,12 @@ export class VideosMigration extends AssetsMigration {
   protected forcedChannelOwner: { id: string; controllerAccount: string } | undefined
   protected logger: Logger
 
-  public constructor({ api, queryNodeApi, config, videoIds, channelsMap, forcedChannelOwner }: VideosMigrationParams) {
-    super({ api, queryNodeApi, config })
-    this.config = config
-    this.channelsMap = channelsMap
-    this.videoIds = videoIds
-    this.forcedChannelOwner = forcedChannelOwner
+  public constructor(params: VideosMigrationParams) {
+    super(params)
+    this.config = params.config
+    this.channelsMap = params.channelsMap
+    this.videoIds = params.videoIds
+    this.forcedChannelOwner = params.forcedChannelOwner
     this.logger = createLogger(this.name)
   }