UploadCommandBase.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. import ContentDirectoryCommandBase from './ContentDirectoryCommandBase'
  2. import {
  3. AssetToUpload,
  4. ResolvedAsset,
  5. StorageNodeInfo,
  6. TokenRequest,
  7. TokenRequestData,
  8. VideoFFProbeMetadata,
  9. VideoFileMetadata,
  10. } from '../Types'
  11. import { MultiBar, Options, SingleBar } from 'cli-progress'
  12. import ExitCodes from '../ExitCodes'
  13. import fs from 'fs'
  14. import _ from 'lodash'
  15. import axios from 'axios'
  16. import ffprobeInstaller from '@ffprobe-installer/ffprobe'
  17. import ffmpeg from 'fluent-ffmpeg'
  18. import path from 'path'
  19. import mimeTypes from 'mime-types'
  20. import { Assets } from '../schemas/typings/Assets.schema'
  21. import chalk from 'chalk'
  22. import { DataObjectCreationParameters } from '@joystream/types/storage'
  23. import { createHash } from 'blake3-wasm'
  24. import * as multihash from 'multihashes'
  25. import { u8aToHex, formatBalance } from '@polkadot/util'
  26. import { KeyringPair } from '@polkadot/keyring/types'
  27. import FormData from 'form-data'
  28. import BN from 'bn.js'
  29. import { createType } from '@joystream/types'
  30. import { StorageAssets } from '@joystream/types/content'
  31. ffmpeg.setFfprobePath(ffprobeInstaller.path)
  32. /**
  33. * Abstract base class for commands that require uploading functionality
  34. */
  35. export default abstract class UploadCommandBase extends ContentDirectoryCommandBase {
  36. static flags = {
  37. ...ContentDirectoryCommandBase.flags,
  38. }
  39. private fileSizeCache: Map<string, number> = new Map<string, number>()
  40. private maxFileSize: undefined | BN = undefined
  41. private progressBarOptions: Options = {
  42. noTTYOutput: true,
  43. format: `{barTitle} | {bar} | {value}/{total} KB processed`,
  44. }
  45. protected requiresQueryNode = true
  46. getFileSize(path: string): number {
  47. const cachedSize = this.fileSizeCache.get(path)
  48. return cachedSize !== undefined ? cachedSize : fs.statSync(path).size
  49. }
  50. createReadStreamWithProgressBar(
  51. filePath: string,
  52. barTitle: string,
  53. multiBar?: MultiBar
  54. ): {
  55. fileStream: fs.ReadStream
  56. progressBar: SingleBar
  57. } {
  58. // Progress CLI UX:
  59. // https://github.com/oclif/cli-ux#cliprogress
  60. // https://www.npmjs.com/package/cli-progress
  61. const fileSize = this.getFileSize(filePath)
  62. let processedKB = 0
  63. const fileSizeKB = Math.ceil(fileSize / 1024)
  64. const progress = multiBar
  65. ? (multiBar.create(fileSizeKB, processedKB, { barTitle }) as SingleBar | undefined)
  66. : new SingleBar(this.progressBarOptions)
  67. if (!progress) {
  68. throw new Error('Provided multibar does not support noTTY mode!')
  69. }
  70. progress.start(fileSizeKB, processedKB, { barTitle })
  71. return {
  72. fileStream: fs
  73. .createReadStream(filePath)
  74. .pause() // Explicitly pause to prevent switching to flowing mode (https://nodejs.org/api/stream.html#stream_event_data)
  75. .on('error', () => {
  76. progress.stop()
  77. this.error(`Error while trying to read data from: ${filePath}!`, {
  78. exit: ExitCodes.FsOperationFailed,
  79. })
  80. })
  81. .on('data', (data) => {
  82. processedKB += data.length / 1024
  83. progress.update(processedKB)
  84. })
  85. .on('end', () => {
  86. progress.update(fileSizeKB)
  87. progress.stop()
  88. }),
  89. progressBar: progress,
  90. }
  91. }
  92. async getVideoFFProbeMetadata(filePath: string): Promise<VideoFFProbeMetadata> {
  93. return new Promise<VideoFFProbeMetadata>((resolve, reject) => {
  94. ffmpeg.ffprobe(filePath, (err, data) => {
  95. if (err) {
  96. reject(err)
  97. return
  98. }
  99. const videoStream = data.streams.find((s) => s.codec_type === 'video')
  100. if (videoStream) {
  101. resolve({
  102. width: videoStream.width,
  103. height: videoStream.height,
  104. codecName: videoStream.codec_name,
  105. codecFullName: videoStream.codec_long_name,
  106. duration: videoStream.duration !== undefined ? Math.ceil(Number(videoStream.duration)) || 0 : undefined,
  107. })
  108. } else {
  109. reject(new Error('No video stream found in file'))
  110. }
  111. })
  112. })
  113. }
  114. async getVideoFileMetadata(filePath: string): Promise<VideoFileMetadata> {
  115. let ffProbeMetadata: VideoFFProbeMetadata = {}
  116. try {
  117. ffProbeMetadata = await this.getVideoFFProbeMetadata(filePath)
  118. } catch (e) {
  119. const message = e instanceof Error ? e.message : e
  120. this.warn(`Failed to get video metadata via ffprobe (${message})`)
  121. }
  122. const size = this.getFileSize(filePath)
  123. const container = path.extname(filePath).slice(1)
  124. const mimeType = mimeTypes.lookup(container) || `unknown`
  125. return {
  126. size,
  127. container,
  128. mimeType,
  129. ...ffProbeMetadata,
  130. }
  131. }
  132. async calculateFileHash(filePath: string): Promise<string> {
  133. const { fileStream } = this.createReadStreamWithProgressBar(filePath, 'Calculating file hash')
  134. let blake3Hash: Uint8Array
  135. return new Promise<string>((resolve, reject) => {
  136. fileStream
  137. .pipe(createHash())
  138. .on('data', (data) => (blake3Hash = data))
  139. .on('end', () => resolve(multihash.toB58String(multihash.encode(blake3Hash, 'blake3'))))
  140. .on('error', (err) => reject(err))
  141. })
  142. }
  143. async validateFile(filePath: string): Promise<void> {
  144. // Basic file validation
  145. if (!fs.existsSync(filePath)) {
  146. this.error(`${filePath} - file does not exist under provided path!`, { exit: ExitCodes.FileNotFound })
  147. }
  148. if (!this.maxFileSize) {
  149. this.maxFileSize = await this.getOriginalApi().consts.storage.maxDataObjectSize
  150. }
  151. if (this.maxFileSize.ltn(this.getFileSize(filePath))) {
  152. this.error(`${filePath} - file is too big. Max file size is ${this.maxFileSize.toString()} bytes`)
  153. }
  154. }
  155. async getRandomActiveStorageNodeInfo(bagId: string, retryTime = 6, retryCount = 5): Promise<StorageNodeInfo | null> {
  156. for (let i = 0; i <= retryCount; ++i) {
  157. const nodesInfo = _.shuffle(await this.getQNApi().storageNodesInfoByBagId(bagId))
  158. for (const info of nodesInfo) {
  159. try {
  160. await axios.get(info.apiEndpoint + '/version', {
  161. headers: {
  162. connection: 'close',
  163. },
  164. })
  165. return info
  166. } catch (err) {
  167. continue
  168. }
  169. }
  170. if (i !== retryCount) {
  171. this.log(`No storage provider can serve the request yet, retrying in ${retryTime}s (${i + 1}/${retryCount})...`)
  172. await new Promise((resolve) => setTimeout(resolve, retryTime * 1000))
  173. }
  174. }
  175. return null
  176. }
  177. async generateDataObjectParameters(filePath: string): Promise<DataObjectCreationParameters> {
  178. return createType<DataObjectCreationParameters, 'DataObjectCreationParameters'>('DataObjectCreationParameters', {
  179. size: this.getFileSize(filePath),
  180. ipfsContentId: await this.calculateFileHash(filePath),
  181. })
  182. }
  183. async resolveAndValidateAssets<T extends Record<string, string | null | undefined>>(
  184. paths: T,
  185. basePath: string
  186. ): Promise<[ResolvedAsset[], { [K in keyof T]?: number }]> {
  187. const assetIndices: { [K in keyof T]?: number } = {}
  188. const resolvedAssets: ResolvedAsset[] = []
  189. for (let [assetKey, assetPath] of Object.entries(paths)) {
  190. const assetType = assetKey as keyof T
  191. if (!assetPath) {
  192. assetIndices[assetType] = undefined
  193. continue
  194. }
  195. if (basePath) {
  196. assetPath = path.resolve(path.dirname(basePath), assetPath)
  197. }
  198. await this.validateFile(assetPath)
  199. const parameters = await this.generateDataObjectParameters(assetPath)
  200. assetIndices[assetType] = resolvedAssets.length
  201. resolvedAssets.push({
  202. path: assetPath,
  203. parameters,
  204. })
  205. }
  206. return [resolvedAssets, assetIndices]
  207. }
  208. async getStorageNodeUploadToken(
  209. storageNodeInfo: StorageNodeInfo,
  210. account: KeyringPair,
  211. memberId: number,
  212. objectId: BN,
  213. bagId: string
  214. ): Promise<string> {
  215. const data: TokenRequestData = {
  216. storageBucketId: storageNodeInfo.bucketId,
  217. accountId: account.address,
  218. bagId,
  219. memberId,
  220. dataObjectId: objectId.toNumber(),
  221. }
  222. const message = JSON.stringify(data)
  223. const signature = u8aToHex(account.sign(message))
  224. const postData: TokenRequest = { data, signature }
  225. const {
  226. data: { token },
  227. } = await axios.post(`${storageNodeInfo.apiEndpoint}/authToken`, postData)
  228. if (!token) {
  229. this.error('Recieved empty token from the storage node!', { exit: ExitCodes.StorageNodeError })
  230. }
  231. return token
  232. }
  233. async uploadAsset(
  234. account: KeyringPair,
  235. memberId: number,
  236. objectId: BN,
  237. bagId: string,
  238. filePath: string,
  239. storageNode?: StorageNodeInfo,
  240. multiBar?: MultiBar
  241. ): Promise<void> {
  242. const storageNodeInfo = storageNode || (await this.getRandomActiveStorageNodeInfo(bagId))
  243. if (!storageNodeInfo) {
  244. this.error('No active storage node found!', { exit: ExitCodes.ActionCurrentlyUnavailable })
  245. }
  246. this.log(`Chosen storage node endpoint: ${storageNodeInfo.apiEndpoint}`)
  247. const { fileStream, progressBar } = this.createReadStreamWithProgressBar(
  248. filePath,
  249. `Uploading ${filePath}`,
  250. multiBar
  251. )
  252. fileStream.on('end', () => {
  253. // Temporarly disable because with Promise.all it breaks the UI
  254. // cli.action.start('Waiting for the file to be processed...')
  255. })
  256. const formData = new FormData()
  257. formData.append('dataObjectId', objectId.toString())
  258. formData.append('storageBucketId', storageNodeInfo.bucketId)
  259. formData.append('bagId', bagId)
  260. formData.append('file', fileStream, {
  261. filename: path.basename(filePath),
  262. filepath: filePath,
  263. knownLength: this.getFileSize(filePath),
  264. })
  265. this.log(`Uploading object ${objectId.toString()} (${filePath})`)
  266. try {
  267. await axios.post(`${storageNodeInfo.apiEndpoint}/files`, formData, {
  268. maxBodyLength: Infinity,
  269. maxContentLength: Infinity,
  270. headers: {
  271. 'content-type': 'multipart/form-data',
  272. ...formData.getHeaders(),
  273. },
  274. })
  275. } catch (e) {
  276. progressBar.stop()
  277. if (axios.isAxiosError(e)) {
  278. const msg = e.response && e.response.data ? JSON.stringify(e.response.data) : e.message
  279. this.error(`Unexpected error when trying to upload a file: ${msg}`, {
  280. exit: ExitCodes.StorageNodeError,
  281. })
  282. } else {
  283. throw e
  284. }
  285. }
  286. }
  287. async uploadAssets(
  288. account: KeyringPair,
  289. memberId: number,
  290. bagId: string,
  291. assets: AssetToUpload[],
  292. inputFilePath: string,
  293. outputFilePostfix = '__rejectedContent'
  294. ): Promise<void> {
  295. const storageNodeInfo = await this.getRandomActiveStorageNodeInfo(bagId)
  296. if (!storageNodeInfo) {
  297. this.warn('No storage provider is currently available!')
  298. this.handleRejectedUploads(
  299. bagId,
  300. assets,
  301. assets.map(() => false),
  302. inputFilePath,
  303. outputFilePostfix
  304. )
  305. this.exit(ExitCodes.ActionCurrentlyUnavailable)
  306. }
  307. const multiBar = new MultiBar(this.progressBarOptions)
  308. const errors: [string, string][] = []
  309. // Workaround replacement for Promise.allSettled (which is only available in ES2020)
  310. const results = await Promise.all(
  311. assets.map(async (a) => {
  312. try {
  313. await this.uploadAsset(account, memberId, a.dataObjectId, bagId, a.path, storageNodeInfo, multiBar)
  314. return true
  315. } catch (e) {
  316. errors.push([a.dataObjectId.toString(), e instanceof Error ? e.message : 'Unknown error'])
  317. return false
  318. }
  319. })
  320. )
  321. errors.forEach(([objectId, message]) => this.warn(`Upload of object ${objectId} failed: ${message}`))
  322. this.handleRejectedUploads(bagId, assets, results, inputFilePath, outputFilePostfix)
  323. multiBar.stop()
  324. }
  325. async prepareAssetsForExtrinsic(resolvedAssets: ResolvedAsset[]): Promise<StorageAssets | undefined> {
  326. const feePerMB = await this.getOriginalApi().query.storage.dataObjectPerMegabyteFee()
  327. const { dataObjectDeletionPrize } = this.getOriginalApi().consts.storage
  328. if (resolvedAssets.length) {
  329. const totalBytes = resolvedAssets
  330. .reduce((a, b) => {
  331. return a.add(b.parameters.getField('size'))
  332. }, new BN(0))
  333. .toNumber()
  334. const totalStorageFee = feePerMB.muln(Math.ceil(totalBytes / 1024 / 1024))
  335. const totalDeletionPrize = dataObjectDeletionPrize.muln(resolvedAssets.length)
  336. await this.requireConfirmation(
  337. `Some additional costs will be associated with this operation:\n` +
  338. `Total data storage fee: ${chalk.cyan(formatBalance(totalStorageFee))}\n` +
  339. `Total deletion prize: ${chalk.cyan(
  340. formatBalance(totalDeletionPrize)
  341. )} (recoverable on data object(s) removal)\n` +
  342. `Are you sure you want to continue?`
  343. )
  344. return createType<StorageAssets, 'StorageAssets'>('StorageAssets', {
  345. expected_data_size_fee: feePerMB,
  346. object_creation_list: resolvedAssets.map((a) => a.parameters),
  347. })
  348. }
  349. return undefined
  350. }
  351. private handleRejectedUploads(
  352. bagId: string,
  353. assets: AssetToUpload[],
  354. results: boolean[],
  355. inputFilePath: string,
  356. outputFilePostfix: string
  357. ): void {
  358. // Try to save rejected contentIds and paths for reupload purposes
  359. const rejectedAssetsOutput: Assets = { bagId, assets: [] }
  360. results.forEach(
  361. (r, i) =>
  362. r === false &&
  363. rejectedAssetsOutput.assets.push({ objectId: assets[i].dataObjectId.toString(), path: assets[i].path })
  364. )
  365. if (rejectedAssetsOutput.assets.length) {
  366. this.warn(
  367. `Some assets were not uploaded successfully. Try reuploading them with ${chalk.magentaBright(
  368. 'content:reuploadAssets'
  369. )}!`
  370. )
  371. console.log(rejectedAssetsOutput)
  372. const outputPath = inputFilePath.replace('.json', `${outputFilePostfix}.json`)
  373. try {
  374. fs.writeFileSync(outputPath, JSON.stringify(rejectedAssetsOutput, null, 4))
  375. this.log(`Rejected content ids successfully saved to: ${chalk.magentaBright(outputPath)}!`)
  376. } catch (e) {
  377. console.error(e)
  378. this.warn(
  379. `Could not write rejected content output to ${outputPath}. Try copying the output above and creating the file manually!`
  380. )
  381. }
  382. }
  383. }
  384. }