Explorar el Código

Merge branch 'giza-protobuf-and-query-node' into distributor-node

Leszek Wiesner hace 3 años
padre
commit
82913e551e

+ 3 - 0
query-node/manifest.yml

@@ -83,6 +83,7 @@ typegen:
     - content.VideoDeleted
     - content.VideoCensorshipStatusUpdated
     - content.FeaturedVideosSet
+    - content.ChannelDeleted
 
     # working groups (we're using "storage_working_group" as a reference module)
     - storage_working_group.WorkerStorageUpdated
@@ -172,6 +173,8 @@ mappings:
       handler: content_VideoCensorshipStatusUpdated
     - event: content.FeaturedVideosSet
       handler: content_FeaturedVideosSet
+    - event: content.ChannelDeleted
+      handler: content_ChannelDeleted
 
     # working groups
     ## storage - workers

+ 17 - 7
query-node/mappings/content/channel.ts

@@ -4,7 +4,7 @@ eslint-disable @typescript-eslint/naming-convention
 import { EventContext, StoreContext } from '@joystream/hydra-common'
 import { Content } from '../generated/types'
 import { convertContentActorToChannelOwner, processChannelMetadata } from './utils'
-import { AssetNone, Channel, ChannelCategory, StorageDataObject } from 'query-node/dist/model'
+import { Channel, ChannelCategory, StorageDataObject } from 'query-node/dist/model'
 import { deserializeMetadata, inconsistentState, logger } from '../common'
 import { ChannelCategoryMetadata, ChannelMetadata } from '@joystream/metadata-protobuf'
 import { integrateMeta } from '@joystream/metadata-protobuf/utils'
@@ -27,9 +27,6 @@ export async function content_ChannelCreated(ctx: EventContext & StoreContext):
     createdInBlock: event.blockNumber,
     rewardAccount: channelCreationParameters.reward_account.unwrapOr(undefined)?.toString(),
     deletionPrizeDestAccount: runtimeChannel.deletion_prize_source_account_id.toString(),
-    // assets
-    coverPhoto: new AssetNone(),
-    avatarPhoto: new AssetNone(),
     // fill in auto-generated fields
     createdAt: new Date(event.blockTimestamp),
     updatedAt: new Date(event.blockTimestamp),
@@ -38,8 +35,10 @@ export async function content_ChannelCreated(ctx: EventContext & StoreContext):
   })
 
   // deserialize & process metadata
-  const metadata = deserializeMetadata(ChannelMetadata, channelCreationParameters.meta) || {}
-  await processChannelMetadata(ctx, channel, metadata, channelCreationParameters.assets)
+  if (channelCreationParameters.meta.isSome) {
+    const metadata = deserializeMetadata(ChannelMetadata, channelCreationParameters.meta.unwrap()) || {}
+    await processChannelMetadata(ctx, channel, metadata, channelCreationParameters.assets.unwrapOr(undefined))
+  }
 
   // save entity
   await store.save<Channel>(channel)
@@ -67,7 +66,12 @@ export async function content_ChannelUpdated(ctx: EventContext & StoreContext):
   //  update metadata if it was changed
   if (newMetadataBytes) {
     const newMetadata = deserializeMetadata(ChannelMetadata, newMetadataBytes) || {}
-    await processChannelMetadata(ctx, channel, newMetadata, channelUpdateParameters.assets.unwrapOr(undefined))
+    await processChannelMetadata(
+      ctx,
+      channel,
+      newMetadata,
+      channelUpdateParameters.assets_to_upload.unwrapOr(undefined)
+    )
   }
 
   // prepare changed reward account
@@ -209,3 +213,9 @@ export async function content_ChannelCategoryDeleted({ store, event }: EventCont
   // emit log event
   logger.info('Channel category has been deleted', { id: channelCategory.id })
 }
+
+export async function content_ChannelDeleted({ store, event }: EventContext & StoreContext): Promise<void> {
+  const [, channelId] = new Content.ChannelDeletedEvent(event).params
+
+  await store.remove<Channel>(new Channel({ id: channelId.toString() }))
+}

+ 119 - 113
query-node/mappings/content/utils.ts

@@ -8,7 +8,7 @@ import {
   IChannelMetadata,
 } from '@joystream/metadata-protobuf'
 import { integrateMeta, isSet, isValidLanguageCode } from '@joystream/metadata-protobuf/utils'
-import { invalidMetadata, inconsistentState, unexpectedData, logger } from '../common'
+import { invalidMetadata, inconsistentState, logger } from '../common'
 import {
   // primary entities
   CuratorGroup,
@@ -20,54 +20,121 @@ import {
   License,
   VideoMediaMetadata,
   // asset
-  Asset,
   Membership,
   VideoMediaEncoding,
   ChannelCategory,
-  AssetNone,
-  AssetExternal,
-  AssetJoystreamStorage,
   StorageDataObject,
+  DataObjectTypeChannelAvatar,
+  DataObjectTypeChannelCoverPhoto,
+  DataObjectTypeVideoMedia,
+  DataObjectTypeVideoThumbnail,
 } from 'query-node/dist/model'
 // Joystream types
-import { NewAssets, ContentActor } from '@joystream/types/augment'
+import { ContentActor, StorageAssets } from '@joystream/types/augment'
 import { DecodedMetadataObject } from '@joystream/metadata-protobuf/types'
 import BN from 'bn.js'
 import { getMostRecentlyCreatedDataObjects } from '../storage/utils'
-import { DataObjectCreationParameters as ObjectCreationParams } from '@joystream/types/storage'
-import { registry } from '@joystream/types'
+
+const ASSET_TYPES = {
+  channel: [
+    {
+      DataObjectTypeConstructor: DataObjectTypeChannelCoverPhoto,
+      metaFieldName: 'coverPhoto',
+      schemaFieldName: 'coverPhoto',
+    },
+    {
+      DataObjectTypeConstructor: DataObjectTypeChannelAvatar,
+      metaFieldName: 'avatarPhoto',
+      schemaFieldName: 'avatarPhoto',
+    },
+  ],
+  video: [
+    {
+      DataObjectTypeConstructor: DataObjectTypeVideoMedia,
+      metaFieldName: 'video',
+      schemaFieldName: 'media',
+    },
+    {
+      DataObjectTypeConstructor: DataObjectTypeVideoThumbnail,
+      metaFieldName: 'thumbnailPhoto',
+      schemaFieldName: 'thumbnailPhoto',
+    },
+  ],
+} as const
+
+async function processChannelAssets(
+  { event, store }: EventContext & StoreContext,
+  assets: StorageDataObject[],
+  channel: Channel,
+  meta: DecodedMetadataObject<IChannelMetadata>
+) {
+  await Promise.all(
+    ASSET_TYPES.channel.map(async ({ metaFieldName, schemaFieldName, DataObjectTypeConstructor }) => {
+      const newAssetIndex = meta[metaFieldName]
+      const currentAsset = channel[schemaFieldName]
+      if (isSet(newAssetIndex)) {
+        const asset = findAssetByIndex(assets, newAssetIndex)
+        if (asset) {
+          if (currentAsset) {
+            currentAsset.unsetAt = new Date(event.blockTimestamp)
+            await store.save<StorageDataObject>(currentAsset)
+          }
+          const dataObjectType = new DataObjectTypeConstructor()
+          dataObjectType.channelId = channel.id
+          asset.type = dataObjectType
+          channel[schemaFieldName] = asset
+          await store.save<StorageDataObject>(asset)
+        }
+      }
+    })
+  )
+}
+
+async function processVideoAssets(
+  { event, store }: EventContext & StoreContext,
+  assets: StorageDataObject[],
+  video: Video,
+  meta: DecodedMetadataObject<IVideoMetadata>
+) {
+  await Promise.all(
+    ASSET_TYPES.video.map(async ({ metaFieldName, schemaFieldName, DataObjectTypeConstructor }) => {
+      const newAssetIndex = meta[metaFieldName]
+      const currentAsset = video[schemaFieldName]
+      if (isSet(newAssetIndex)) {
+        const asset = findAssetByIndex(assets, newAssetIndex)
+        if (asset) {
+          if (currentAsset) {
+            currentAsset.unsetAt = new Date(event.blockTimestamp)
+            await store.save<StorageDataObject>(currentAsset)
+          }
+          const dataObjectType = new DataObjectTypeConstructor()
+          dataObjectType.videoId = video.id
+          asset.type = dataObjectType
+          video[schemaFieldName] = asset
+          await store.save<StorageDataObject>(asset)
+        }
+      }
+    })
+  )
+}
 
 export async function processChannelMetadata(
   ctx: EventContext & StoreContext,
   channel: Channel,
   meta: DecodedMetadataObject<IChannelMetadata>,
-  assets?: NewAssets
+  assetsParams?: StorageAssets
 ): Promise<Channel> {
-  const processedAssets = assets ? await processNewAssets(ctx, assets) : []
+  const assets = assetsParams ? await processNewAssets(ctx, assetsParams) : []
 
   integrateMeta(channel, meta, ['title', 'description', 'isPublic'])
 
+  await processChannelAssets(ctx, assets, channel, meta)
+
   // prepare channel category if needed
   if (isSet(meta.category)) {
     channel.category = await processChannelCategory(ctx, channel.category, parseInt(meta.category))
   }
 
-  // prepare cover photo asset if needed
-  if (isSet(meta.coverPhoto)) {
-    const asset = findAssetByIndex(processedAssets, meta.coverPhoto, 'channel cover photo')
-    if (asset) {
-      channel.coverPhoto = asset
-    }
-  }
-
-  // prepare avatar photo asset if needed
-  if (isSet(meta.avatarPhoto)) {
-    const asset = findAssetByIndex(processedAssets, meta.avatarPhoto, 'channel avatar photo')
-    if (asset) {
-      channel.avatarPhoto = asset
-    }
-  }
-
   // prepare language if needed
   if (isSet(meta.language)) {
     channel.language = await processLanguage(ctx, channel.language, meta.language)
@@ -80,21 +147,23 @@ export async function processVideoMetadata(
   ctx: EventContext & StoreContext,
   video: Video,
   meta: DecodedMetadataObject<IVideoMetadata>,
-  assets?: NewAssets
+  assetsParams?: StorageAssets
 ): Promise<Video> {
-  const processedAssets = assets ? await processNewAssets(ctx, assets) : []
+  const assets = assetsParams ? await processNewAssets(ctx, assetsParams) : []
 
   integrateMeta(video, meta, ['title', 'description', 'duration', 'hasMarketing', 'isExplicit', 'isPublic'])
 
+  await processVideoAssets(ctx, assets, video, meta)
+
   // prepare video category if needed
   if (meta.category) {
     video.category = await processVideoCategory(ctx, video.category, parseInt(meta.category))
   }
 
   // prepare media meta information if needed
-  if (isSet(meta.mediaType) || isSet(meta.mediaPixelWidth) || isSet(meta.mediaPixelHeight)) {
+  if (isSet(meta.video) || isSet(meta.mediaType) || isSet(meta.mediaPixelWidth) || isSet(meta.mediaPixelHeight)) {
     // prepare video file size if poosible
-    const videoSize = extractVideoSize(assets, meta.video)
+    const videoSize = extractVideoSize(assets)
     video.mediaMetadata = await processVideoMediaMetadata(ctx, video.mediaMetadata, meta, videoSize)
   }
 
@@ -103,22 +172,6 @@ export async function processVideoMetadata(
     await updateVideoLicense(ctx, video, meta.license)
   }
 
-  // prepare thumbnail photo asset if needed
-  if (isSet(meta.thumbnailPhoto)) {
-    const asset = findAssetByIndex(processedAssets, meta.thumbnailPhoto, 'thumbnail photo')
-    if (asset) {
-      video.thumbnailPhoto = asset
-    }
-  }
-
-  // prepare video asset if needed
-  if (isSet(meta.video)) {
-    const asset = findAssetByIndex(processedAssets, meta.video, 'video')
-    if (asset) {
-      video.media = asset
-    }
-  }
-
   // prepare language if needed
   if (isSet(meta.language)) {
     video.language = await processLanguage(ctx, video.language, meta.language)
@@ -135,7 +188,7 @@ export async function processVideoMetadata(
   return video
 }
 
-function findAssetByIndex(assets: typeof Asset[], index: number, name?: string): typeof Asset | null {
+function findAssetByIndex(assets: StorageDataObject[], index: number, name?: string): StorageDataObject | null {
   if (assets[index]) {
     return assets[index]
   }
@@ -172,7 +225,7 @@ async function processVideoMediaMetadata(
   ctx: StoreContext & EventContext,
   existingVideoMedia: VideoMediaMetadata | undefined,
   metadata: DecodedMetadataObject<IVideoMetadata>,
-  videoSize: number | undefined
+  videoSize: BN | undefined
 ): Promise<VideoMediaMetadata> {
   const { store, event } = ctx
   const videoMedia =
@@ -272,65 +325,16 @@ function processPublishedBeforeJoystream(
   return new Date(timestamp)
 }
 
-async function processNewAssets(ctx: EventContext & StoreContext, assets: NewAssets): Promise<Array<typeof Asset>> {
-  if (assets.isUrls) {
-    return assets.asUrls.map((assetUrls) => {
-      const resultAsset = new AssetExternal()
-      resultAsset.urls = JSON.stringify(assetUrls.map((u) => u.toString()))
-      return resultAsset
-    })
-  }
-
-  if (assets.isUpload) {
-    const assetsUploaded = assets.asUpload.object_creation_list.length
-    // FIXME: Ideally the runtime would provide object ids in ChannelCreated/VideoCreated/ChannelUpdated(...) events
-    const objects = await getMostRecentlyCreatedDataObjects(ctx.store, assetsUploaded)
-    return objects.map((o) => {
-      const resultAsset = new AssetJoystreamStorage()
-      resultAsset.dataObjectId = o.id
-      return resultAsset
-    })
-  }
-
-  unexpectedData('Unrecognized assets type', assets.type)
+async function processNewAssets(ctx: EventContext & StoreContext, assets: StorageAssets): Promise<StorageDataObject[]> {
+  const assetsUploaded = assets.object_creation_list.length
+  // FIXME: Ideally the runtime would provide object ids in ChannelCreated/VideoCreated/ChannelUpdated(...) events
+  const objects = await getMostRecentlyCreatedDataObjects(ctx.store, assetsUploaded)
+  return objects
 }
 
-function extractVideoSize(assets: NewAssets | undefined, assetIndex: number | null | undefined): number | undefined {
-  // escape if no assetIndex is set
-  if (!isSet(assetIndex)) {
-    return undefined
-  }
-
-  // index provided, but there are no assets
-  if (!assets) {
-    invalidMetadata(`Non-existing asset video size extraction requested - no assets were uploaded!`, {
-      assetIndex,
-    })
-    return undefined
-  }
-
-  // cannot extract size from other asset types than "Upload"
-  if (!assets.isUpload) {
-    return undefined
-  }
-
-  const dataObjectsParams = assets.asUpload.object_creation_list
-
-  // ensure asset index is valid
-  if (assetIndex >= dataObjectsParams.length) {
-    invalidMetadata(`Non-existing asset video size extraction requested`, {
-      assetsProvided: dataObjectsParams.length,
-      assetIndex,
-    })
-    return undefined
-  }
-
-  // extract video size from objectParams
-  const objectParams = assets.asUpload.object_creation_list[assetIndex]
-  const params = new ObjectCreationParams(registry, objectParams.toJSON() as any)
-  const videoSize = params.getField('size').toNumber()
-
-  return videoSize
+function extractVideoSize(assets: StorageDataObject[]): BN | undefined {
+  const mediaAsset = assets.find((a) => a.type?.isTypeOf === DataObjectTypeVideoMedia.name)
+  return mediaAsset ? mediaAsset.size : undefined
 }
 
 async function processLanguage(
@@ -476,23 +480,25 @@ export async function unsetAssetRelations(store: DatabaseManager, dataObject: St
   // is allowed to be associated only with one channel/video in runtime
   const channel = await store.get(Channel, {
     where: channelAssets.map((assetName) => ({
-      [assetName]: Raw((alias) => `${alias} ->> 'dataObjectId' = :id`, {
+      [assetName]: {
         id: dataObject.id,
-      }),
+      },
     })),
+    relations: [...channelAssets],
   })
   const video = await store.get(Video, {
     where: videoAssets.map((assetName) => ({
-      [assetName]: Raw((alias) => `${alias} ->> 'dataObjectId' = :id`, {
+      [assetName]: {
         id: dataObject.id,
-      }),
+      },
     })),
+    relations: [...videoAssets],
   })
 
   if (channel) {
     channelAssets.forEach((assetName) => {
-      if (channel[assetName] && (channel[assetName] as AssetJoystreamStorage).dataObjectId === dataObject.id) {
-        channel[assetName] = new AssetNone()
+      if (channel[assetName] && channel[assetName]?.id === dataObject.id) {
+        channel[assetName] = null as any
       }
     })
     await store.save<Channel>(channel)
@@ -506,8 +512,8 @@ export async function unsetAssetRelations(store: DatabaseManager, dataObject: St
 
   if (video) {
     videoAssets.forEach((assetName) => {
-      if (video[assetName] && (video[assetName] as AssetJoystreamStorage).dataObjectId === dataObject.id) {
-        video[assetName] = new AssetNone()
+      if (video[assetName] && video[assetName]?.id === dataObject.id) {
+        video[assetName] = null as any
       }
     })
     await store.save<Video>(video)

+ 6 - 6
query-node/mappings/content/video.ts

@@ -6,7 +6,7 @@ import { In } from 'typeorm'
 import { Content } from '../generated/types'
 import { deserializeMetadata, inconsistentState, logger } from '../common'
 import { processVideoMetadata } from './utils'
-import { Channel, Video, VideoCategory, AssetNone } from 'query-node/dist/model'
+import { Channel, Video, VideoCategory } from 'query-node/dist/model'
 import { VideoMetadata, VideoCategoryMetadata } from '@joystream/metadata-protobuf'
 import { integrateMeta } from '@joystream/metadata-protobuf/utils'
 import _ from 'lodash'
@@ -107,14 +107,14 @@ export async function content_VideoCreated(ctx: EventContext & StoreContext): Pr
     isCensored: false,
     isFeatured: false,
     createdInBlock: event.blockNumber,
-    thumbnailPhoto: new AssetNone(),
-    media: new AssetNone(),
     createdAt: new Date(event.blockTimestamp),
     updatedAt: new Date(event.blockTimestamp),
   })
   // deserialize & process metadata
-  const metadata = deserializeMetadata(VideoMetadata, videoCreationParameters.meta) || {}
-  await processVideoMetadata(ctx, video, metadata, videoCreationParameters.assets)
+  if (videoCreationParameters.meta.isSome) {
+    const metadata = deserializeMetadata(VideoMetadata, videoCreationParameters.meta.unwrap()) || {}
+    await processVideoMetadata(ctx, video, metadata, videoCreationParameters.assets.unwrapOr(undefined))
+  }
 
   // save video
   await store.save<Video>(video)
@@ -145,7 +145,7 @@ export async function content_VideoUpdated(ctx: EventContext & StoreContext): Pr
   // update metadata if it was changed
   if (newMetadataBytes) {
     const newMetadata = deserializeMetadata(VideoMetadata, newMetadataBytes) || {}
-    await processVideoMetadata(ctx, video, newMetadata, videoUpdateParameters.assets.unwrapOr(undefined))
+    await processVideoMetadata(ctx, video, newMetadata, videoUpdateParameters.assets_to_upload.unwrapOr(undefined))
   }
 
   // set last update time

+ 21 - 155
query-node/mappings/storage/index.ts

@@ -1,7 +1,7 @@
 /*
 eslint-disable @typescript-eslint/naming-convention
 */
-import { DatabaseManager, EventContext, StoreContext } from '@joystream/hydra-common'
+import { EventContext, StoreContext } from '@joystream/hydra-common'
 import { Storage } from '../generated/types/storage'
 import {
   DistributionBucket,
@@ -11,11 +11,6 @@ import {
   DistributionBucketOperatorStatus,
   NodeLocationMetadata,
   StorageBag,
-  StorageBagOwner,
-  StorageBagOwnerChannel,
-  StorageBagOwnerCouncil,
-  StorageBagOwnerMember,
-  StorageBagOwnerWorkingGroup,
   StorageBucket,
   StorageBucketOperatorStatusActive,
   StorageBucketOperatorStatusInvited,
@@ -28,156 +23,25 @@ import {
 } from 'query-node/dist/model'
 import BN from 'bn.js'
 import { getById } from '../common'
-import { BTreeSet } from '@polkadot/types'
 import { In } from 'typeorm'
-import _ from 'lodash'
-import { DataObjectId, BagId, DynamicBagId, StaticBagId } from '@joystream/types/augment/all'
 import {
   processDistributionBucketFamilyMetadata,
   processDistributionOperatorMetadata,
   processStorageOperatorMetadata,
 } from './metadata'
-import { createDataObjects, getStorageSystem, removeDataObject } from './utils'
-
-async function getDataObjectsInBag(
-  store: DatabaseManager,
-  bagId: BagId,
-  dataObjectIds: BTreeSet<DataObjectId>
-): Promise<StorageDataObject[]> {
-  const dataObjects = await store.getMany(StorageDataObject, {
-    where: {
-      id: In(Array.from(dataObjectIds).map((id) => id.toString())),
-      storageBag: { id: getBagId(bagId) },
-    },
-  })
-  if (dataObjects.length !== Array.from(dataObjectIds).length) {
-    throw new Error(
-      `Missing data objects: ${_.difference(
-        Array.from(dataObjectIds).map((id) => id.toString()),
-        dataObjects.map((o) => o.id)
-      )} in bag ${getBagId(bagId)}`
-    )
-  }
-  return dataObjects
-}
-
-function getStaticBagOwner(bagId: StaticBagId): typeof StorageBagOwner {
-  if (bagId.isCouncil) {
-    return new StorageBagOwnerCouncil()
-  } else if (bagId.isWorkingGroup) {
-    const owner = new StorageBagOwnerWorkingGroup()
-    owner.workingGroupId = bagId.asWorkingGroup.toString().toLowerCase()
-    return owner
-  } else {
-    throw new Error(`Unexpected static bag type: ${bagId.type}`)
-  }
-}
-
-function getDynamicBagOwner(bagId: DynamicBagId) {
-  if (bagId.isChannel) {
-    const owner = new StorageBagOwnerChannel()
-    owner.channelId = bagId.asChannel.toNumber()
-    return owner
-  } else if (bagId.isMember) {
-    const owner = new StorageBagOwnerMember()
-    owner.memberId = bagId.asMember.toNumber()
-    return owner
-  } else {
-    throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
-  }
-}
-
-function getStaticBagId(bagId: StaticBagId): string {
-  if (bagId.isCouncil) {
-    return `static:council`
-  } else if (bagId.isWorkingGroup) {
-    return `static:wg:${bagId.asWorkingGroup.type.toLowerCase()}`
-  } else {
-    throw new Error(`Unexpected static bag type: ${bagId.type}`)
-  }
-}
-
-function getDynamicBagId(bagId: DynamicBagId): string {
-  if (bagId.isChannel) {
-    return `dynamic:channel:${bagId.asChannel.toString()}`
-  } else if (bagId.isMember) {
-    return `dynamic:member:${bagId.asMember.toString()}`
-  } else {
-    throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
-  }
-}
-
-function getBagId(bagId: BagId) {
-  return bagId.isStatic ? getStaticBagId(bagId.asStatic) : getDynamicBagId(bagId.asDynamic)
-}
-
-async function getDynamicBag(
-  store: DatabaseManager,
-  bagId: DynamicBagId,
-  relations?: 'objects'[]
-): Promise<StorageBag> {
-  return getById(store, StorageBag, getDynamicBagId(bagId), relations)
-}
-
-async function getStaticBag(store: DatabaseManager, bagId: StaticBagId, relations?: 'objects'[]): Promise<StorageBag> {
-  const id = getStaticBagId(bagId)
-  const bag = await store.get(StorageBag, { where: { id }, relations })
-  if (!bag) {
-    console.log(`Creating new static bag: ${id}`)
-    const newBag = new StorageBag({
-      id,
-      owner: getStaticBagOwner(bagId),
-    })
-    await store.save<StorageBag>(newBag)
-    return newBag
-  }
-  return bag
-}
-
-async function getBag(store: DatabaseManager, bagId: BagId, relations?: 'objects'[]): Promise<StorageBag> {
-  return bagId.isStatic
-    ? getStaticBag(store, bagId.asStatic, relations)
-    : getDynamicBag(store, bagId.asDynamic, relations)
-}
-
-async function getDistributionBucketOperatorWithMetadata(
-  store: DatabaseManager,
-  id: string
-): Promise<DistributionBucketOperator> {
-  const operator = await store.get(DistributionBucketOperator, {
-    where: { id },
-    relations: ['metadata', 'metadata.nodeLocation', 'metadata.nodeLocation.coordinates'],
-  })
-  if (!operator) {
-    throw new Error(`DistributionBucketOperator not found by id: ${id}`)
-  }
-  return operator
-}
-
-async function getStorageBucketWithOperatorMetadata(store: DatabaseManager, id: string): Promise<StorageBucket> {
-  const bucket = await store.get(StorageBucket, {
-    where: { id },
-    relations: ['operatorMetadata', 'operatorMetadata.nodeLocation', 'operatorMetadata.nodeLocation.coordinates'],
-  })
-  if (!bucket) {
-    throw new Error(`StorageBucket not found by id: ${id}`)
-  }
-  return bucket
-}
-
-async function getDistributionBucketFamilyWithMetadata(
-  store: DatabaseManager,
-  id: string
-): Promise<DistributionBucketFamily> {
-  const family = await store.get(DistributionBucketFamily, {
-    where: { id },
-    relations: ['metadata', 'metadata.areas'],
-  })
-  if (!family) {
-    throw new Error(`DistributionBucketFamily not found by id: ${id}`)
-  }
-  return family
-}
+import {
+  createDataObjects,
+  getStorageSystem,
+  removeDataObject,
+  getStorageBucketWithOperatorMetadata,
+  getBag,
+  getDynamicBagId,
+  getDynamicBagOwner,
+  getDataObjectsInBag,
+  getDynamicBag,
+  getDistributionBucketFamilyWithMetadata,
+  getDistributionBucketOperatorWithMetadata,
+} from './utils'
 
 // STORAGE BUCKETS
 
@@ -363,7 +227,11 @@ export async function storage_DynamicBagCreated({ event, store }: EventContext &
 export async function storage_DynamicBagDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
   const [, bagId] = new Storage.DynamicBagDeletedEvent(event).params
   const storageBag = await getDynamicBag(store, bagId, ['objects'])
-  // The bag should already be empty, so no cascade-remove required
+  // TODO: Cascade remove on db level (would require changes in Hydra / comitting autogenerated files)
+  const storageAssignments = await store.getMany(StorageBagStorageAssignment, { where: { storageBag } })
+  const distributionAssignments = await store.getMany(StorageBagDistributionAssignment, { where: { storageBag } })
+  await Promise.all(storageAssignments.map((a) => store.remove<StorageBagStorageAssignment>(a)))
+  await Promise.all(distributionAssignments.map((a) => store.remove<StorageBagDistributionAssignment>(a)))
   await store.remove<StorageBag>(storageBag)
 }
 
@@ -371,10 +239,8 @@ export async function storage_DynamicBagDeleted({ event, store }: EventContext &
 
 // Note: "Uploaded" here actually means "created" (the real upload happens later)
 export async function storage_DataObjectsUploaded({ event, store }: EventContext & StoreContext): Promise<void> {
-  const [dataObjectIds, uploadParams] = new Storage.DataObjectsUploadedEvent(event).params
-  const { bagId, objectCreationList } = uploadParams
-  const storageBag = await getBag(store, bagId)
-  await createDataObjects(store, objectCreationList, storageBag, dataObjectIds)
+  const [dataObjectIds, uploadParams, deletionPrize] = new Storage.DataObjectsUploadedEvent(event).params
+  await createDataObjects(store, uploadParams, deletionPrize, dataObjectIds)
 }
 
 export async function storage_PendingDataObjectsAccepted({ event, store }: EventContext & StoreContext): Promise<void> {

+ 173 - 7
query-node/mappings/storage/utils.ts

@@ -1,13 +1,175 @@
 import { DatabaseManager } from '@joystream/hydra-common'
-import { DataObjectCreationParameters } from '@joystream/types/augment'
+import { UploadParameters } from '@joystream/types/augment'
 import { registry } from '@joystream/types'
 import { DataObjectCreationParameters as ObjectCreationParams } from '@joystream/types/storage'
-import { StorageBag, StorageDataObject, StorageSystemParameters } from 'query-node/dist/model'
+import {
+  DataObjectTypeUnknown,
+  StorageBag,
+  StorageDataObject,
+  StorageSystemParameters,
+  StorageBagOwner,
+  StorageBagOwnerChannel,
+  StorageBagOwnerCouncil,
+  StorageBagOwnerMember,
+  StorageBagOwnerWorkingGroup,
+  StorageBucket,
+  DistributionBucketOperator,
+  DistributionBucketFamily,
+} from 'query-node/dist/model'
 import BN from 'bn.js'
-import { bytesToString, inconsistentState } from '../common'
+import { bytesToString, inconsistentState, getById } from '../common'
 import { In } from 'typeorm'
 import { unsetAssetRelations } from '../content/utils'
 
+import { BTreeSet } from '@polkadot/types'
+import _ from 'lodash'
+import { DataObjectId, BagId, DynamicBagId, StaticBagId } from '@joystream/types/augment/all'
+import { Balance } from '@polkadot/types/interfaces'
+
+export async function getDataObjectsInBag(
+  store: DatabaseManager,
+  bagId: BagId,
+  dataObjectIds: BTreeSet<DataObjectId>
+): Promise<StorageDataObject[]> {
+  const dataObjects = await store.getMany(StorageDataObject, {
+    where: {
+      id: In(Array.from(dataObjectIds).map((id) => id.toString())),
+      storageBag: { id: getBagId(bagId) },
+    },
+  })
+  if (dataObjects.length !== Array.from(dataObjectIds).length) {
+    throw new Error(
+      `Missing data objects: ${_.difference(
+        Array.from(dataObjectIds).map((id) => id.toString()),
+        dataObjects.map((o) => o.id)
+      )} in bag ${getBagId(bagId)}`
+    )
+  }
+  return dataObjects
+}
+
+export function getStaticBagOwner(bagId: StaticBagId): typeof StorageBagOwner {
+  if (bagId.isCouncil) {
+    return new StorageBagOwnerCouncil()
+  } else if (bagId.isWorkingGroup) {
+    const owner = new StorageBagOwnerWorkingGroup()
+    owner.workingGroupId = bagId.asWorkingGroup.toString().toLowerCase()
+    return owner
+  } else {
+    throw new Error(`Unexpected static bag type: ${bagId.type}`)
+  }
+}
+
+export function getDynamicBagOwner(bagId: DynamicBagId) {
+  if (bagId.isChannel) {
+    const owner = new StorageBagOwnerChannel()
+    owner.channelId = bagId.asChannel.toNumber()
+    return owner
+  } else if (bagId.isMember) {
+    const owner = new StorageBagOwnerMember()
+    owner.memberId = bagId.asMember.toNumber()
+    return owner
+  } else {
+    throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
+  }
+}
+
+export function getStaticBagId(bagId: StaticBagId): string {
+  if (bagId.isCouncil) {
+    return `static:council`
+  } else if (bagId.isWorkingGroup) {
+    return `static:wg:${bagId.asWorkingGroup.type.toLowerCase()}`
+  } else {
+    throw new Error(`Unexpected static bag type: ${bagId.type}`)
+  }
+}
+
+export function getDynamicBagId(bagId: DynamicBagId): string {
+  if (bagId.isChannel) {
+    return `dynamic:channel:${bagId.asChannel.toString()}`
+  } else if (bagId.isMember) {
+    return `dynamic:member:${bagId.asMember.toString()}`
+  } else {
+    throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
+  }
+}
+
+export function getBagId(bagId: BagId) {
+  return bagId.isStatic ? getStaticBagId(bagId.asStatic) : getDynamicBagId(bagId.asDynamic)
+}
+
+export async function getDynamicBag(
+  store: DatabaseManager,
+  bagId: DynamicBagId,
+  relations?: 'objects'[]
+): Promise<StorageBag> {
+  return getById(store, StorageBag, getDynamicBagId(bagId), relations)
+}
+
+export async function getStaticBag(
+  store: DatabaseManager,
+  bagId: StaticBagId,
+  relations?: 'objects'[]
+): Promise<StorageBag> {
+  const id = getStaticBagId(bagId)
+  const bag = await store.get(StorageBag, { where: { id }, relations })
+  if (!bag) {
+    console.log(`Creating new static bag: ${id}`)
+    const newBag = new StorageBag({
+      id,
+      owner: getStaticBagOwner(bagId),
+    })
+    await store.save<StorageBag>(newBag)
+    return newBag
+  }
+  return bag
+}
+
+export async function getBag(store: DatabaseManager, bagId: BagId, relations?: 'objects'[]): Promise<StorageBag> {
+  return bagId.isStatic
+    ? getStaticBag(store, bagId.asStatic, relations)
+    : getDynamicBag(store, bagId.asDynamic, relations)
+}
+
+export async function getDistributionBucketOperatorWithMetadata(
+  store: DatabaseManager,
+  id: string
+): Promise<DistributionBucketOperator> {
+  const operator = await store.get(DistributionBucketOperator, {
+    where: { id },
+    relations: ['metadata', 'metadata.nodeLocation', 'metadata.nodeLocation.coordinates'],
+  })
+  if (!operator) {
+    throw new Error(`DistributionBucketOperator not found by id: ${id}`)
+  }
+  return operator
+}
+
+export async function getStorageBucketWithOperatorMetadata(store: DatabaseManager, id: string): Promise<StorageBucket> {
+  const bucket = await store.get(StorageBucket, {
+    where: { id },
+    relations: ['operatorMetadata', 'operatorMetadata.nodeLocation', 'operatorMetadata.nodeLocation.coordinates'],
+  })
+  if (!bucket) {
+    throw new Error(`StorageBucket not found by id: ${id}`)
+  }
+  return bucket
+}
+
+export async function getDistributionBucketFamilyWithMetadata(
+  store: DatabaseManager,
+  id: string
+): Promise<DistributionBucketFamily> {
+  const family = await store.get(DistributionBucketFamily, {
+    where: { id },
+    relations: ['metadata', 'metadata.areas'],
+  })
+  if (!family) {
+    throw new Error(`DistributionBucketFamily not found by id: ${id}`)
+  }
+  return family
+}
+
 export async function getStorageSystem(store: DatabaseManager): Promise<StorageSystemParameters> {
   const storageSystem = await store.get(StorageSystemParameters, {})
   if (!storageSystem) {
@@ -19,13 +181,15 @@ export async function getStorageSystem(store: DatabaseManager): Promise<StorageS
 
 export async function createDataObjects(
   store: DatabaseManager,
-  objectsParams: DataObjectCreationParameters[],
-  storageBag: StorageBag,
+  uploadParams: UploadParameters,
+  deletionPrize: Balance,
   objectIds?: BN[]
 ): Promise<StorageDataObject[]> {
   const storageSystem = await getStorageSystem(store)
+  const { objectCreationList, bagId } = uploadParams
+  const storageBag = await getBag(store, bagId)
 
-  const dataObjects = objectsParams.map((objectParams, i) => {
+  const dataObjects = objectCreationList.map((objectParams, i) => {
     const params = new ObjectCreationParams(registry, objectParams.toJSON() as any)
     const objectId = objectIds ? objectIds[i] : storageSystem.nextDataObjectId
     const object = new StorageDataObject({
@@ -33,6 +197,8 @@ export async function createDataObjects(
       isAccepted: false,
       ipfsHash: bytesToString(objectParams.ipfsContentId),
       size: new BN(params.getField('size').toString()),
+      type: new DataObjectTypeUnknown(),
+      deletionPrize,
       storageBag,
     })
     if (objectId.gte(storageSystem.nextDataObjectId)) {
@@ -67,5 +233,5 @@ export async function getMostRecentlyCreatedDataObjects(
 
 export async function removeDataObject(store: DatabaseManager, object: StorageDataObject): Promise<void> {
   await unsetAssetRelations(store, object)
-  await store.save<StorageDataObject>(object)
+  await store.remove<StorageDataObject>(object)
 }

+ 4 - 25
query-node/schemas/content.graphql

@@ -1,21 +1,3 @@
-type AssetExternal @variant {
-  # FIXME: [String!] currnetly not supported in variants
-  "JSON array of the urls"
-  urls: String!
-}
-
-type AssetJoystreamStorage @variant {
-  "Related data object"
-  dataObject: StorageDataObject!
-}
-
-# FIXME: https://github.com/Joystream/hydra/issues/434
-type AssetNone @variant {
-  _phantom: Int
-}
-
-union Asset = AssetExternal | AssetJoystreamStorage | AssetNone
-
 "Category of media channel"
 type ChannelCategory @entity {
   id: ID!
@@ -62,12 +44,11 @@ type Channel @entity {
   "The description of a Channel"
   description: String
 
-  # FIXME: Due to https://github.com/Joystream/hydra/issues/434, Asset is currently non-optional (use AssetNone to unset it)
   "Channel's cover (background) photo asset. Recommended ratio: 16:9."
-  coverPhoto: Asset!
+  coverPhoto: StorageDataObject
 
   "Channel's avatar photo asset."
-  avatarPhoto: Asset!
+  avatarPhoto: StorageDataObject
 
   ##########################
 
@@ -129,9 +110,8 @@ type Video @entity {
   "Video duration in seconds"
   duration: Int
 
-# FIXME: Due to https://github.com/Joystream/hydra/issues/434, Asset is currently non-optional (use AssetNone to unset it)
   "Video thumbnail asset (recommended ratio: 16:9)"
-  thumbnailPhoto: Asset!
+  thumbnailPhoto: StorageDataObject
 
   ##########################
 
@@ -156,9 +136,8 @@ type Video @entity {
   "License under the video is published"
   license: License
 
-  # FIXME: Due to https://github.com/Joystream/hydra/issues/434, Asset is currently non-optional (use AssetNone to unset it)
   "Video media asset"
-  media: Asset!
+  media: StorageDataObject
 
   ##########################
 

+ 36 - 0
query-node/schemas/storage.graphql

@@ -191,6 +191,32 @@ type StorageBagDistributionAssignment @entity {
   distributionBucketId: ID
 }
 
+type DataObjectTypeChannelAvatar @variant {
+  "Related channel entity"
+  channel: Channel!
+}
+
+type DataObjectTypeChannelCoverPhoto @variant {
+  "Related channel entity"
+  channel: Channel!
+}
+
+type DataObjectTypeVideoMedia @variant {
+  "Related video entity"
+  video: Video!
+}
+
+type DataObjectTypeVideoThumbnail @variant {
+  "Related video entity"
+  video: Video!
+}
+
+type DataObjectTypeUnknown @variant {
+  _phantom: Int
+}
+
+union DataObjectType = DataObjectTypeChannelAvatar | DataObjectTypeChannelCoverPhoto | DataObjectTypeVideoMedia | DataObjectTypeVideoThumbnail | DataObjectTypeUnknown
+
 type StorageDataObject @entity {
   "Data object runtime id"
   id: ID!
@@ -206,6 +232,16 @@ type StorageDataObject @entity {
 
   "IPFS content hash"
   ipfsHash: String!
+
+  # FIXME: Cannot be optional because: https://github.com/Joystream/hydra/issues/434
+  "The type of the asset that the data object represents (if known)"
+  type: DataObjectType!
+
+  "Prize for removing the data object"
+  deletionPrize: BigInt!
+
+  "If the object is no longer used as an asset - the time at which it was unset (if known)"
+  unsetAt: DateTime
 }
 
 type DistributionBucketFamilyGeographicArea @entity {