@@ -1,18 +1,33 @@
import ContentDirectoryCommandBase from './ContentDirectoryCommandBase'
-import { VideoFFProbeMetadata, VideoFileMetadata, AssetType, InputAsset, InputAssetDetails } from '../Types'
-import { ContentId, ContentParameters } from '@joystream/types/storage'
+import {
+ AssetToUpload,
+ ResolvedAsset,
+ StorageNodeInfo,
+ TokenRequest,
+ TokenRequestData,
+ VideoFFProbeMetadata,
+ VideoFileMetadata,
+} from '../Types'
import { MultiBar, Options, SingleBar } from 'cli-progress'
-import { Assets } from '../json-schemas/typings/Assets.schema'
import ExitCodes from '../ExitCodes'
-import ipfsHash from 'ipfs-only-hash'
import fs from 'fs'
import _ from 'lodash'
-import axios, { AxiosRequestConfig } from 'axios'
+import axios from 'axios'
import ffprobeInstaller from '@ffprobe-installer/ffprobe'
import ffmpeg from 'fluent-ffmpeg'
import path from 'path'
-import chalk from 'chalk'
import mimeTypes from 'mime-types'
+import { Assets } from '../schemas/typings/Assets.schema'
+import chalk from 'chalk'
+import { DataObjectCreationParameters } from '@joystream/types/storage'
+import { createHash } from 'blake3-wasm'
+import * as multihash from 'multihashes'
+import { u8aToHex, formatBalance } from '@polkadot/util'
+import { KeyringPair } from '@polkadot/keyring/types'
+import FormData from 'form-data'
+import BN from 'bn.js'
+import { createTypeFromConstructor } from '@joystream/types'
+import { StorageAssets } from '@joystream/types/content'
@@ -21,19 +36,19 @@ ffmpeg.setFfprobePath(ffprobeInstaller.path)
export default abstract class UploadCommandBase extends ContentDirectoryCommandBase {
private fileSizeCache: Map<string, number> = new Map<string, number>()
+ private maxFileSize: undefined | BN = undefined
private progressBarOptions: Options = {
+ noTTYOutput: true,
format: `{barTitle} | {bar} | {value}/{total} KB processed`,
+ protected requiresQueryNode = true
getFileSize(path: string): number {
const cachedSize = this.fileSizeCache.get(path)
return cachedSize !== undefined ? cachedSize : fs.statSync(path).size
- normalizeEndpoint(endpoint: string) {
- return endpoint.endsWith('/') ? endpoint : endpoint + '/'
- }
filePath: string,
barTitle: string,
@@ -49,9 +64,13 @@ export default abstract class UploadCommandBase extends ContentDirectoryCommandB
let processedKB = 0
const fileSizeKB = Math.ceil(fileSize / 1024)
const progress = multiBar
- ? multiBar.create(fileSizeKB, processedKB, { barTitle })
+ ? (multiBar.create(fileSizeKB, processedKB, { barTitle }) as SingleBar | undefined)
: new SingleBar(this.progressBarOptions)
+ if (!progress) {
+ throw new Error('Provided multibar does not support noTTY mode!')
+ }
progress.start(fileSizeKB, processedKB, { barTitle })
return {
fileStream: fs
@@ -103,7 +122,7 @@ export default abstract class UploadCommandBase extends ContentDirectoryCommandB
try {
ffProbeMetadata = await this.getVideoFFProbeMetadata(filePath)
} catch (e) {
- const message = e.message || e
+ const message = e instanceof Error ? e.message : e
this.warn(`Failed to get video metadata via ffprobe (${message})`)
@@ -118,114 +137,183 @@ export default abstract class UploadCommandBase extends ContentDirectoryCommandB
- async calculateFileIpfsHash(filePath: string): Promise<string> {
+ async calculateFileHash(filePath: string): Promise<string> {
const { fileStream } = this.createReadStreamWithProgressBar(filePath, 'Calculating file hash')
- const hash: string = await ipfsHash.of(fileStream)
- return hash
+ let blake3Hash: Uint8Array
+ return new Promise<string>((resolve, reject) => {
+ fileStream
+ .pipe(createHash())
+ .on('data', (data) => (blake3Hash = data))
+ .on('end', () => resolve(multihash.toB58String(multihash.encode(blake3Hash, 'blake3'))))
+ .on('error', (err) => reject(err))
+ })
- validateFile(filePath: string): void {
+ async validateFile(filePath: string): Promise<void> {
// Basic file validation
if (!fs.existsSync(filePath)) {
this.error(`${filePath} - file does not exist under provided path!`, { exit: ExitCodes.FileNotFound })
+ if (!this.maxFileSize) {
+ this.maxFileSize = await this.getOriginalApi().consts.storage.maxDataObjectSize
+ }
+ if (this.maxFileSize.ltn(this.getFileSize(filePath))) {
+ this.error(`${filePath} - file is too big. Max file size is ${this.maxFileSize.toString()} bytes`)
+ }
- assetUrl(endpointRoot: string, contentId: ContentId): string {
- // This will also make sure the resulting url is a valid url
- return new URL(`asset/v0/${contentId.encode()}`, this.normalizeEndpoint(endpointRoot)).toString()
- }
- async getRandomProviderEndpoint(): Promise<string | null> {
- const endpoints = _.shuffle(await this.getApi().allStorageProviderEndpoints())
- for (const endpoint of endpoints) {
- try {
- const url = new URL('swagger.json', this.normalizeEndpoint(endpoint)).toString()
- await axios.head(url)
- return endpoint
- } catch (e) {
- continue
+ async getRandomActiveStorageNodeInfo(bagId: string, retryTime = 6, retryCount = 5): Promise<StorageNodeInfo | null> {
+ for (let i = 0; i <= retryCount; ++i) {
+ const nodesInfo = _.shuffle(await this.getQNApi().storageNodesInfoByBagId(bagId))
+ for (const info of nodesInfo) {
+ try {
+ await axios.get(info.apiEndpoint + '/version', {
+ headers: {
+ connection: 'close',
+ },
+ })
+ return info
+ } catch (err) {
+ continue
+ }
+ }
+ if (i !== retryCount) {
+ this.log(`No storage provider can serve the request yet, retrying in ${retryTime}s (${i + 1}/${retryCount})...`)
+ await new Promise((resolve) => setTimeout(resolve, retryTime * 1000))
return null
- async generateContentParameters(filePath: string, type: AssetType): Promise<ContentParameters> {
- return this.createType('ContentParameters', {
- content_id: ContentId.generate(this.getTypesRegistry()),
- type_id: type,
+ async generateDataObjectParameters(filePath: string): Promise<DataObjectCreationParameters> {
+ return createTypeFromConstructor(DataObjectCreationParameters, {
size: this.getFileSize(filePath),
- ipfs_content_id: await this.calculateFileIpfsHash(filePath),
+ ipfsContentId: await this.calculateFileHash(filePath),
- async prepareInputAssets(paths: string[], basePath?: string): Promise<InputAssetDetails[]> {
- // Resolve assets
- if (basePath) {
- paths = paths.map((p) => basePath && path.resolve(path.dirname(basePath), p))
+ async resolveAndValidateAssets<T extends Record<string, string | null | undefined>>(
+ paths: T,
+ basePath: string
+ ): Promise<[ResolvedAsset[], { [K in keyof T]?: number }]> {
+ const assetIndices: { [K in keyof T]?: number } = {}
+ const resolvedAssets: ResolvedAsset[] = []
+ for (let [assetKey, assetPath] of Object.entries(paths)) {
+ const assetType = assetKey as keyof T
+ if (!assetPath) {
+ assetIndices[assetType] = undefined
+ continue
+ }
+ if (basePath) {
+ assetPath = path.resolve(path.dirname(basePath), assetPath)
+ }
+ await this.validateFile(assetPath)
+ const parameters = await this.generateDataObjectParameters(assetPath)
+ assetIndices[assetType] = resolvedAssets.length
+ resolvedAssets.push({
+ path: assetPath,
+ parameters,
+ })
+ }
+ return [resolvedAssets, assetIndices]
+ }
+ async getStorageNodeUploadToken(
+ storageNodeInfo: StorageNodeInfo,
+ account: KeyringPair,
+ memberId: number,
+ objectId: BN,
+ bagId: string
+ ): Promise<string> {
+ const data: TokenRequestData = {
+ storageBucketId: storageNodeInfo.bucketId,
+ accountId: account.address,
+ bagId,
+ memberId,
+ dataObjectId: objectId.toNumber(),
+ }
+ const message = JSON.stringify(data)
+ const signature = u8aToHex(account.sign(message))
+ const postData: TokenRequest = { data, signature }
+ const {
+ data: { token },
+ } = await axios.post(`${storageNodeInfo.apiEndpoint}/authToken`, postData)
+ if (!token) {
+ this.error('Recieved empty token from the storage node!', { exit: ExitCodes.StorageNodeError })
- // Validate assets
- paths.forEach((p) => this.validateFile(p))
- // Return data
- return await Promise.all(
- paths.map(async (path) => {
- const parameters = await this.generateContentParameters(path, AssetType.AnyAsset)
- return {
- path,
- contentId: parameters.content_id,
- parameters,
- }
- })
- )
+ return token
- async uploadAsset(contentId: ContentId, filePath: string, endpoint?: string, multiBar?: MultiBar): Promise<void> {
- const providerEndpoint = endpoint || (await this.getRandomProviderEndpoint())
- if (!providerEndpoint) {
- this.error('No active provider found!', { exit: ExitCodes.ActionCurrentlyUnavailable })
+ async uploadAsset(
+ account: KeyringPair,
+ memberId: number,
+ objectId: BN,
+ bagId: string,
+ filePath: string,
+ storageNode?: StorageNodeInfo,
+ multiBar?: MultiBar
+ ): Promise<void> {
+ const storageNodeInfo = storageNode || (await this.getRandomActiveStorageNodeInfo(bagId))
+ if (!storageNodeInfo) {
+ this.error('No active storage node found!', { exit: ExitCodes.ActionCurrentlyUnavailable })
- const uploadUrl = this.assetUrl(providerEndpoint, contentId)
- const fileSize = this.getFileSize(filePath)
+ this.log(`Chosen storage node endpoint: ${storageNodeInfo.apiEndpoint}`)
const { fileStream, progressBar } = this.createReadStreamWithProgressBar(
- `Uploading ${contentId.encode()}`,
+ `Uploading ${filePath}`,
fileStream.on('end', () => {
// Temporarly disable because with Promise.all it breaks the UI
// cli.action.start('Waiting for the file to be processed...')
+ const formData = new FormData()
+ formData.append('dataObjectId', objectId.toString())
+ formData.append('storageBucketId', storageNodeInfo.bucketId)
+ formData.append('bagId', bagId)
+ formData.append('file', fileStream, {
+ filename: path.basename(filePath),
+ filepath: filePath,
+ knownLength: this.getFileSize(filePath),
+ })
+ this.log(`Uploading object ${objectId.toString()} (${filePath})`)
try {
- const config: AxiosRequestConfig = {
+ await axios.post(`${storageNodeInfo.apiEndpoint}/files`, formData, {
+ maxBodyLength: Infinity,
+ maxContentLength: Infinity,
headers: {
- 'Content-Type': '', // https://github.com/Joystream/storage-node-joystream/issues/16
- 'Content-Length': fileSize.toString(),
+ 'content-type': 'multipart/form-data',
+ ...formData.getHeaders(),
- maxBodyLength: fileSize,
- }
- await axios.put(uploadUrl, fileStream, config)
+ })
} catch (e) {
- const msg = (e.response && e.response.data && e.response.data.message) || e.message || e
- this.error(`Unexpected error when trying to upload a file: ${msg}`, {
- exit: ExitCodes.ExternalInfrastructureError,
- })
+ if (axios.isAxiosError(e)) {
+ const msg = e.response && e.response.data ? JSON.stringify(e.response.data) : e.message
+ this.error(`Unexpected error when trying to upload a file: ${msg}`, {
+ exit: ExitCodes.StorageNodeError,
+ })
+ } else {
+ throw e
+ }
async uploadAssets(
- assets: InputAsset[],
+ account: KeyringPair,
+ memberId: number,
+ bagId: string,
+ assets: AssetToUpload[],
inputFilePath: string,
outputFilePostfix = '__rejectedContent'
): Promise<void> {
- const endpoint = await this.getRandomProviderEndpoint()
- if (!endpoint) {
+ const storageNodeInfo = await this.getRandomActiveStorageNodeInfo(bagId)
+ if (!storageNodeInfo) {
this.warn('No storage provider is currently available!')
+ bagId,
assets.map(() => false),
@@ -234,39 +322,67 @@ export default abstract class UploadCommandBase extends ContentDirectoryCommandB
const multiBar = new MultiBar(this.progressBarOptions)
+ const errors: [string, string][] = []
// Workaround replacement for Promise.allSettled (which is only available in ES2020)
const results = await Promise.all(
assets.map(async (a) => {
try {
- await this.uploadAsset(a.contentId, a.path, endpoint, multiBar)
+ await this.uploadAsset(account, memberId, a.dataObjectId, bagId, a.path, storageNodeInfo, multiBar)
return true
} catch (e) {
+ errors.push([a.dataObjectId.toString(), e instanceof Error ? e.message : 'Unknown error'])
return false
- this.handleRejectedUploads(assets, results, inputFilePath, outputFilePostfix)
+ errors.forEach(([objectId, message]) => this.warn(`Upload of object ${objectId} failed: ${message}`))
+ this.handleRejectedUploads(bagId, assets, results, inputFilePath, outputFilePostfix)
- public assetsIndexes(originalPaths: (string | undefined)[], filteredPaths: string[]): (number | undefined)[] {
- let lastIndex = -1
- return originalPaths.map((path) => (filteredPaths.includes(path as string) ? ++lastIndex : undefined))
+ async prepareAssetsForExtrinsic(resolvedAssets: ResolvedAsset[]): Promise<StorageAssets | undefined> {
+ const feePerMB = await this.getOriginalApi().query.storage.dataObjectPerMegabyteFee()
+ const { dataObjectDeletionPrize } = this.getOriginalApi().consts.storage
+ if (resolvedAssets.length) {
+ const totalBytes = resolvedAssets
+ .reduce((a, b) => {
+ return a.add(b.parameters.getField('size'))
+ }, new BN(0))
+ .toNumber()
+ const totalStorageFee = feePerMB.muln(Math.ceil(totalBytes / 1024 / 1024))
+ const totalDeletionPrize = dataObjectDeletionPrize.muln(resolvedAssets.length)
+ await this.requireConfirmation(
+ `Some additional costs will be associated with this operation:\n` +
+ `Total data storage fee: ${chalk.cyan(formatBalance(totalStorageFee))}\n` +
+ `Total deletion prize: ${chalk.cyan(
+ formatBalance(totalDeletionPrize)
+ )} (recoverable on data object(s) removal)\n` +
+ `Are you sure you want to continue?`
+ )
+ return createTypeFromConstructor(StorageAssets, {
+ expected_data_size_fee: feePerMB,
+ object_creation_list: resolvedAssets.map((a) => a.parameters),
+ })
+ }
+ return undefined
private handleRejectedUploads(
- assets: InputAsset[],
+ bagId: string,
+ assets: AssetToUpload[],
results: boolean[],
inputFilePath: string,
outputFilePostfix: string
): void {
// Try to save rejected contentIds and paths for reupload purposes
- const rejectedAssetsOutput: Assets = []
+ const rejectedAssetsOutput: Assets = { bagId, assets: [] }
(r, i) =>
- r === false && rejectedAssetsOutput.push({ contentId: assets[i].contentId.encode(), path: assets[i].path })
+ r === false &&
+ rejectedAssetsOutput.assets.push({ objectId: assets[i].dataObjectId.toString(), path: assets[i].path })
- if (rejectedAssetsOutput.length) {
+ if (rejectedAssetsOutput.assets.length) {
`Some assets were not uploaded successfully. Try reuploading them with ${chalk.magentaBright(