浏览代码

Merge pull request #3399 from ondratra/olympia_query_node_generalized_update_calculations

query node - active video counters generalization
Mokhtar Naamani 3 年之前
父节点
当前提交
f51eb5f332

+ 5 - 0
query-node/mappings/src/common.ts

@@ -17,6 +17,11 @@ export const INT32MAX = 2147483647
 // See: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date
 export const TIMESTAMPMAX = 8640000000000000
 
+// definition of generic type for Hydra DatabaseManager's methods
+export type EntityType<T> = {
+  new (...args: any[]): T
+}
+
 /*
   Simple logger enabling error and informational reporting.
 

+ 7 - 19
query-node/mappings/src/content/channel.ts

@@ -3,17 +3,13 @@ eslint-disable @typescript-eslint/naming-convention
 */
 import { EventContext, StoreContext } from '@joystream/hydra-common'
 import { Content } from '../../generated/types'
-import {
-  convertContentActorToChannelOwner,
-  processChannelMetadata,
-  updateChannelCategoryVideoActiveCounter,
-  unsetAssetRelations,
-} from './utils'
+import { convertContentActorToChannelOwner, processChannelMetadata, unsetAssetRelations } from './utils'
 import { Channel, ChannelCategory, StorageDataObject, Membership } from 'query-node/dist/model'
 import { deserializeMetadata, inconsistentState, logger } from '../common'
 import { ChannelCategoryMetadata, ChannelMetadata } from '@joystream/metadata-protobuf'
 import { integrateMeta } from '@joystream/metadata-protobuf/utils'
 import { In } from 'typeorm'
+import { getAllManagers } from '../derivedPropertiesManager/applications'
 
 export async function content_ChannelCreated(ctx: EventContext & StoreContext): Promise<void> {
   const { store, event } = ctx
@@ -71,8 +67,6 @@ export async function content_ChannelUpdated(ctx: EventContext & StoreContext):
     return inconsistentState('Non-existing channel update requested', channelId)
   }
 
-  const originalCategory = channel.category
-
   // prepare changed metadata
   const newMetadataBytes = channelUpdateParameters.new_meta.unwrapOr(null)
 
@@ -103,12 +97,12 @@ export async function content_ChannelUpdated(ctx: EventContext & StoreContext):
   // set last update time
   channel.updatedAt = new Date(event.blockTimestamp)
 
+  // transfer video active counter value to new category
+  await getAllManagers(store).channels.onMainEntityUpdate(channel)
+
   // save channel
   await store.save<Channel>(channel)
 
-  // transfer video active counter value to new category
-  await updateChannelCategoryVideoActiveCounter(store, originalCategory, channel.category, channel.activeVideosCounter)
-
   // emit log event
   logger.info('Channel has been updated', { id: channel.id })
 }
@@ -145,17 +139,11 @@ export async function content_ChannelCensorshipStatusUpdated({
   // set last update time
   channel.updatedAt = new Date(event.blockTimestamp)
 
+  await getAllManagers(store).channels.onMainEntityUpdate(channel)
+
   // save channel
   await store.save<Channel>(channel)
 
-  // update active video counter for category (if any)
-  await updateChannelCategoryVideoActiveCounter(
-    store,
-    isCensored.isTrue ? channel.category : undefined,
-    isCensored.isTrue ? undefined : channel.category,
-    channel.activeVideosCounter
-  )
-
   // emit log event
   logger.info('Channel censorship status has been updated', { id: channelId, isCensored: isCensored.isTrue })
 }

+ 1 - 6
query-node/mappings/src/content/nft.ts

@@ -1,7 +1,7 @@
 // TODO: solve events' relations to videos and other entites that can be changed or deleted
 
 import { DatabaseManager, EventContext, StoreContext, SubstrateEvent } from '@joystream/hydra-common'
-import { genericEventFields, inconsistentState, logger } from '../common'
+import { genericEventFields, inconsistentState, logger, EntityType } from '../common'
 import {
   // entities
   Auction,
@@ -45,11 +45,6 @@ import { Content } from '../../generated/types'
 import { FindConditions } from 'typeorm'
 import BN from 'bn.js'
 
-// definition of generic type for Hydra DatabaseManager's methods
-type EntityType<T> = {
-  new (...args: any[]): T
-}
-
 async function getExistingEntity<Type extends Video | Membership>(
   store: DatabaseManager,
   entityType: EntityType<Type>,

+ 0 - 124
query-node/mappings/src/content/utils.ts

@@ -495,9 +495,6 @@ export async function unsetAssetRelations(store: DatabaseManager, dataObject: St
     relations: [...videoAssets, ...videoRelationsForCountersBare],
   })
 
-  // remember if video is fully active before update
-  const wasFullyActive = video && getVideoActiveStatus(video)
-
   if (channel) {
     channelAssets.forEach((assetName) => {
       if (channel[assetName] && channel[assetName]?.id === dataObject.id) {
@@ -521,9 +518,6 @@ export async function unsetAssetRelations(store: DatabaseManager, dataObject: St
     })
     await store.save<Video>(video)
 
-    // update video active counters
-    await updateVideoActiveCounters(store, wasFullyActive, undefined)
-
     // emit log event
     logger.info('Content has been disconnected from Video', {
       videoId: video.id.toString(),
@@ -534,121 +528,3 @@ export async function unsetAssetRelations(store: DatabaseManager, dataObject: St
   // remove data object
   await store.remove<StorageDataObject>(dataObject)
 }
-
-export interface IVideoActiveStatus {
-  isFullyActive: boolean
-  video: Video
-  videoCategory: VideoCategory | undefined
-  channel: Channel
-  channelCategory: ChannelCategory | undefined
-}
-
-export function getVideoActiveStatus(video: Video): IVideoActiveStatus {
-  const isFullyActive =
-    !!video.isPublic && !video.isCensored && !!video.thumbnailPhoto?.isAccepted && !!video.media?.isAccepted
-
-  const videoCategory = video.category
-  const channel = video.channel
-  const channelCategory = channel.category
-
-  return {
-    isFullyActive,
-    video,
-    videoCategory,
-    channel,
-    channelCategory,
-  }
-}
-
-export async function updateVideoActiveCounters(
-  store: DatabaseManager,
-  initialActiveStatus: IVideoActiveStatus | null | undefined,
-  activeStatus: IVideoActiveStatus | null | undefined
-): Promise<void> {
-  async function updateSingleEntity<Entity extends VideoCategory | Channel>(
-    entity: Entity,
-    counterChange: number
-  ): Promise<void> {
-    entity.activeVideosCounter += counterChange
-
-    await store.save(entity)
-  }
-
-  async function reflectUpdate<Entity extends VideoCategory | Channel>(
-    oldEntity: Entity | undefined,
-    newEntity: Entity | undefined,
-    initFullyActive: boolean,
-    nowFullyActive: boolean
-  ): Promise<void> {
-    if (!oldEntity && !newEntity) {
-      return
-    }
-
-    const didEntityChange = oldEntity?.id.toString() !== newEntity?.id.toString()
-    const didFullyActiveChange = initFullyActive !== nowFullyActive
-
-    // escape if nothing changed
-    if (!didEntityChange && !didFullyActiveChange) {
-      return
-    }
-
-    if (!didEntityChange) {
-      // && didFullyActiveChange
-      const counterChange = nowFullyActive ? 1 : -1
-
-      await updateSingleEntity(newEntity as Entity, counterChange)
-
-      return
-    }
-
-    // didEntityChange === true
-
-    if (oldEntity && initFullyActive) {
-      // if video was fully active before, prepare to decrease counter
-      const counterChange = -1
-
-      await updateSingleEntity(oldEntity, counterChange)
-    }
-
-    if (newEntity && nowFullyActive) {
-      // if video is fully active now, prepare to increase counter
-      const counterChange = 1
-
-      await updateSingleEntity(newEntity, counterChange)
-    }
-  }
-
-  const items = ['videoCategory', 'channel', 'channelCategory']
-  const promises = items.map(
-    async (item) =>
-      await reflectUpdate(
-        initialActiveStatus?.[item],
-        activeStatus?.[item],
-        initialActiveStatus?.isFullyActive || false,
-        activeStatus?.isFullyActive || false
-      )
-  )
-  await Promise.all(promises)
-}
-
-export async function updateChannelCategoryVideoActiveCounter(
-  store: DatabaseManager,
-  originalCategory: ChannelCategory | undefined,
-  newCategory: ChannelCategory | undefined,
-  videosCount: number
-): Promise<void> {
-  // escape if no counter change needed
-  if (!videosCount || originalCategory === newCategory) {
-    return
-  }
-
-  if (originalCategory) {
-    originalCategory.activeVideosCounter -= videosCount
-    await store.save<ChannelCategory>(originalCategory)
-  }
-
-  if (newCategory) {
-    newCategory.activeVideosCounter += videosCount
-    await store.save<ChannelCategory>(newCategory)
-  }
-}

+ 11 - 31
query-node/mappings/src/content/video.ts

@@ -5,18 +5,13 @@ import { EventContext, StoreContext } from '@joystream/hydra-common'
 import { In } from 'typeorm'
 import { Content } from '../../generated/types'
 import { deserializeMetadata, inconsistentState, logger } from '../common'
-import {
-  processVideoMetadata,
-  getVideoActiveStatus,
-  updateVideoActiveCounters,
-  videoRelationsForCountersBare,
-  videoRelationsForCounters,
-} from './utils'
+import { processVideoMetadata, videoRelationsForCountersBare, videoRelationsForCounters } from './utils'
 import { Channel, Video, VideoCategory } from 'query-node/dist/model'
 import { VideoMetadata, VideoCategoryMetadata } from '@joystream/metadata-protobuf'
 import { integrateMeta } from '@joystream/metadata-protobuf/utils'
 import _ from 'lodash'
 import { createNft } from './nft'
+import { getAllManagers } from '../derivedPropertiesManager/applications'
 
 export async function content_VideoCategoryCreated({ store, event }: EventContext & StoreContext): Promise<void> {
   // read event data
@@ -137,11 +132,7 @@ export async function content_VideoCreated(ctx: EventContext & StoreContext): Pr
     await createNft(store, video, issuanceParameters, event.blockNumber)
   }
 
-  // update video active counters (if needed)
-  const videoActiveStatus = getVideoActiveStatus(video)
-  if (videoActiveStatus.isFullyActive) {
-    await updateVideoActiveCounters(store, undefined, videoActiveStatus)
-  }
+  await getAllManagers(store).videos.onMainEntityCreation(video)
 
   // emit log event
   logger.info('Video has been created', { id: videoId })
@@ -163,9 +154,6 @@ export async function content_VideoUpdated(ctx: EventContext & StoreContext): Pr
     return inconsistentState('Non-existing video update requested', videoId)
   }
 
-  // remember if video is fully active before update
-  const initialVideoActiveStatus = getVideoActiveStatus(video)
-
   // prepare changed metadata
   const newMetadataBytes = videoUpdateParameters.new_meta.unwrapOr(null)
 
@@ -178,12 +166,12 @@ export async function content_VideoUpdated(ctx: EventContext & StoreContext): Pr
   // set last update time
   video.updatedAt = new Date(event.blockTimestamp)
 
+  // update video active counters
+  await getAllManagers(store).videos.onMainEntityUpdate(video)
+
   // save video
   await store.save<Video>(video)
 
-  // update video active counters
-  await updateVideoActiveCounters(store, initialVideoActiveStatus, getVideoActiveStatus(video))
-
   // emit log event
   logger.info('Video has been updated', { id: videoId })
 }
@@ -203,17 +191,12 @@ export async function content_VideoDeleted({ store, event }: EventContext & Stor
     return inconsistentState('Non-existing video deletion requested', videoId)
   }
 
-  // remember if video is fully active before update
-  const initialVideoActiveStatus = getVideoActiveStatus(video)
+  // update video active counters
+  await getAllManagers(store).videos.onMainEntityDeletion(video)
 
   // remove video
   await store.remove<Video>(video)
 
-  // update video active counters (if needed)
-  if (initialVideoActiveStatus.isFullyActive) {
-    await updateVideoActiveCounters(store, initialVideoActiveStatus, undefined)
-  }
-
   // emit log event
   logger.info('Video has been deleted', { id: videoId })
 }
@@ -236,21 +219,18 @@ export async function content_VideoCensorshipStatusUpdated({
     return inconsistentState('Non-existing video censoring requested', videoId)
   }
 
-  // remember if video is fully active before update
-  const initialVideoActiveStatus = getVideoActiveStatus(video)
-
   // update video
   video.isCensored = isCensored.isTrue
 
   // set last update time
   video.updatedAt = new Date(event.blockTimestamp)
 
+  // update video active counters
+  await getAllManagers(store).videos.onMainEntityUpdate(video)
+
   // save video
   await store.save<Video>(video)
 
-  // update video active counters
-  await updateVideoActiveCounters(store, initialVideoActiveStatus, getVideoActiveStatus(video))
-
   // emit log event
   logger.info('Video censorship status has been updated', { id: videoId, isCensored: isCensored.isTrue })
 }

+ 327 - 0
query-node/mappings/src/derivedPropertiesManager/applications/activeVideoCounters.ts

@@ -0,0 +1,327 @@
+import { DerivedPropertiesManager } from '../classes'
+import { IExecutor, IListener, IChangePair } from '../interfaces'
+import { DatabaseManager } from '@joystream/hydra-common'
+import { Channel, ChannelCategory, Video, VideoCategory, StorageDataObject } from 'query-node/dist/model'
+import { videoRelationsForCountersBare } from '../../content/utils'
+
+export type IVideoDerivedEntites = 'channel' | 'channel.category' | 'category'
+export type IAvcChange = 1 | -1 | [1 | -1, IVideoDerivedEntites[]]
+export type IAvcChannelChange = number
+
+/*
+  Decides if video is considered active.
+*/
+function isVideoActive(video: Video): boolean {
+  return !!video.isPublic && !video.isCensored && !!video.thumbnailPhoto?.isAccepted && !!video.media?.isAccepted
+}
+
+/*
+  Compares original and updated videos and calculates if video active status changed.
+*/
+function hasVideoChanged(
+  oldValue: Video | undefined,
+  newValue: Video | undefined
+): IChangePair<IAvcChange> | undefined {
+  // at least one video should always exists but due to TS type limitations
+  // (can't define at least one-of-two parameters required) this safety condition needs to be here
+  if (!oldValue && !newValue) {
+    return undefined
+  }
+
+  // video is being created?
+  if (!oldValue) {
+    return {
+      old: undefined,
+      new: (Number(isVideoActive(newValue as Video)) as IAvcChange) || undefined,
+    }
+  }
+
+  // video is being deleted?
+  if (!newValue) {
+    return {
+      old: (-Number(isVideoActive(oldValue)) as IAvcChange) || undefined,
+      new: undefined,
+    }
+  }
+
+  // calculate active status
+  const originalState = isVideoActive(oldValue)
+  const newState = isVideoActive(newValue)
+
+  // escape if no change and video is not active
+  if (originalState === newState && !newState) {
+    return undefined
+  }
+
+  // active status stays unchanged but relation(s) changed, return list of changed relations
+  if (originalState === newState) {
+    return {
+      old: [
+        -1,
+        [
+          oldValue.channel && oldValue.channel.id !== newValue.channel?.id && 'channel',
+          oldValue.channel?.category &&
+            oldValue.channel.category?.id !== newValue.channel?.category?.id &&
+            'channel.category',
+          oldValue.category && oldValue.category.id !== newValue.category?.id && 'category',
+        ].filter((item) => item) as IVideoDerivedEntites[],
+      ],
+      new: [
+        1,
+        [
+          newValue.channel && oldValue.channel?.id !== newValue.channel.id && 'channel',
+          newValue.channel?.category &&
+            oldValue.channel?.category?.id !== newValue.channel.category?.id &&
+            'channel.category',
+          newValue.category && oldValue.category?.id !== newValue.category.id && 'category',
+        ].filter((item) => item) as IVideoDerivedEntites[],
+      ],
+    }
+  }
+
+  // calculate change
+  const change = Number(newState) - Number(originalState)
+
+  return {
+    old: (-change as IAvcChange) || undefined,
+    new: (change as IAvcChange) || undefined,
+  }
+}
+
+/*
+  Listener for video events.
+*/
+class VideoUpdateListener implements IListener<Video, IAvcChange> {
+  getRelationDependencies(): string[] {
+    return ['thumbnailPhoto', 'media']
+  }
+
+  hasValueChanged(oldValue: Video | undefined, newValue: Video): IChangePair<IAvcChange> | undefined
+  hasValueChanged(oldValue: Video, newValue: Video | undefined): IChangePair<IAvcChange> | undefined
+  hasValueChanged(oldValue: Video, newValue: Video): IChangePair<IAvcChange> | undefined {
+    return hasVideoChanged(oldValue, newValue)
+  }
+}
+
+/*
+  Listener for channel's category update.
+*/
+class ChannelsCategoryChangeListener implements IListener<Channel, IAvcChannelChange> {
+  getRelationDependencies(): string[] {
+    return ['category']
+  }
+
+  hasValueChanged(oldValue: Channel | undefined, newValue: Channel): IChangePair<IAvcChannelChange> | undefined
+  hasValueChanged(oldValue: Channel, newValue: Channel | undefined): IChangePair<IAvcChannelChange> | undefined
+  hasValueChanged(oldValue: Channel, newValue: Channel): IChangePair<IAvcChannelChange> | undefined {
+    return {
+      old: -oldValue?.activeVideosCounter || undefined,
+      new: newValue?.activeVideosCounter || undefined,
+    }
+  }
+}
+
+/*
+  Listener for thumbnail photo events.
+*/
+class StorageDataObjectChangeListener_ThumbnailPhoto implements IListener<StorageDataObject, IAvcChange> {
+  getRelationDependencies(): string[] {
+    return [
+      'videoThumbnail',
+      'videoThumbnail.thumbnailPhoto',
+      'videoThumbnail.media',
+      'videoThumbnail.category',
+      'videoThumbnail.channel',
+      'videoThumbnail.channel.category',
+    ]
+  }
+
+  hasValueChanged(
+    oldValue: StorageDataObject | undefined,
+    newValue: StorageDataObject
+  ): IChangePair<IAvcChange> | undefined
+
+  hasValueChanged(
+    oldValue: StorageDataObject,
+    newValue: StorageDataObject | undefined
+  ): IChangePair<IAvcChange> | undefined
+
+  hasValueChanged(oldValue: StorageDataObject, newValue: StorageDataObject): IChangePair<IAvcChange> | undefined {
+    const oldVideo = oldValue?.videoThumbnail
+    const newVideo = newValue?.videoThumbnail
+
+    return hasVideoChanged(oldVideo, newVideo)
+  }
+}
+
+/*
+  Listener for media events.
+*/
+class StorageDataObjectChangeListener_Media implements IListener<StorageDataObject, IAvcChange> {
+  getRelationDependencies(): string[] {
+    return [
+      'videoMedia',
+      'videoMedia.thumbnailPhoto',
+      'videoMedia.media',
+      'videoMedia.category',
+      'videoMedia.channel',
+      'videoMedia.channel.category',
+    ]
+  }
+
+  hasValueChanged(
+    oldValue: StorageDataObject | undefined,
+    newValue: StorageDataObject
+  ): IChangePair<IAvcChange> | undefined
+
+  hasValueChanged(
+    oldValue: StorageDataObject,
+    newValue: StorageDataObject | undefined
+  ): IChangePair<IAvcChange> | undefined
+
+  hasValueChanged(oldValue: StorageDataObject, newValue: StorageDataObject): IChangePair<IAvcChange> | undefined {
+    const oldVideo = oldValue?.videoMedia
+    const newVideo = newValue?.videoMedia
+
+    return hasVideoChanged(oldVideo as Video, newVideo as Video)
+  }
+}
+
+/*
+  Adapter for generalizing AVC executor.
+*/
+interface IAvcExecutorAdapter<Entity> {
+  (item: Entity): Video
+}
+
+/*
+  Active video counter executor reflecting changes to channels, channel cateories, or video categories.
+*/
+class ActiveVideoCounterExecutor<
+  Entity extends Video | StorageDataObject,
+  DerivedEntity extends VideoCategory | Channel | ChannelCategory = VideoCategory | Channel | ChannelCategory
+> implements IExecutor<Entity, IAvcChange, DerivedEntity> {
+  private adapter: IAvcExecutorAdapter<Entity>
+
+  constructor(adapter: IAvcExecutorAdapter<Entity>) {
+    this.adapter = adapter
+  }
+
+  async loadDerivedEntities(store: DatabaseManager, entity: Entity): Promise<DerivedEntity[]> {
+    // TODO: find way to reliably decide if channel, etc. are loaded and throw error if not
+
+    const targetEntity = this.adapter(entity)
+
+    // this expects entity has loaded channel, channel category, and video category
+    return [targetEntity.channel, targetEntity.channel?.category, targetEntity.category].filter(
+      (item) => item
+    ) as DerivedEntity[]
+  }
+
+  async saveDerivedEntities(store: DatabaseManager, entities: DerivedEntity[]): Promise<void> {
+    await Promise.all(entities.map((entity) => store.save(entity)))
+  }
+
+  updateOldValue(entity: DerivedEntity, change: IAvcChange): DerivedEntity {
+    entity = this.updateValueCommon(entity, change)
+
+    return entity
+  }
+
+  updateNewValue(entity: DerivedEntity, change: IAvcChange): DerivedEntity {
+    entity = this.updateValueCommon(entity, change)
+    return entity
+  }
+
+  private updateValueCommon(entity: DerivedEntity, change: IAvcChange): DerivedEntity {
+    if (typeof change === 'number') {
+      entity.activeVideosCounter += change
+      return entity
+    }
+
+    const [counterChange, entitiesToChange] = change
+
+    const shouldChange =
+      false ||
+      (entity instanceof Channel && entitiesToChange.includes('channel')) ||
+      (entity instanceof ChannelCategory && entitiesToChange.includes('channel.category')) ||
+      (entity instanceof VideoCategory && entitiesToChange.includes('category'))
+
+    if (shouldChange) {
+      entity.activeVideosCounter += counterChange
+    }
+
+    return entity
+  }
+}
+
+/*
+  Executor reflecting changes to channel's category.
+*/
+class ChannelCategoryActiveVideoCounterExecutor implements IExecutor<Channel, IAvcChannelChange, ChannelCategory> {
+  async loadDerivedEntities(store: DatabaseManager, channel: Channel): Promise<ChannelCategory[]> {
+    // TODO: find way to reliably decide if channel, etc. are loaded and throw error if not
+
+    // this expects entity has category
+    return [channel.category].filter((item) => item) as ChannelCategory[]
+  }
+
+  async saveDerivedEntities(store: DatabaseManager, [entity]: ChannelCategory[]): Promise<void> {
+    await store.save(entity)
+  }
+
+  updateOldValue(entity: ChannelCategory, change: IAvcChannelChange): ChannelCategory {
+    entity.activeVideosCounter += change
+
+    return entity
+  }
+
+  updateNewValue(entity: ChannelCategory, change: IAvcChannelChange): ChannelCategory {
+    entity.activeVideosCounter += change
+
+    return entity
+  }
+}
+
+export function createVideoManager(store: DatabaseManager): DerivedPropertiesManager<Video, IAvcChange> {
+  const manager = new DerivedPropertiesManager<Video, IAvcChange>(store, Video, videoRelationsForCountersBare)
+
+  // listen to video change
+  const listener = new VideoUpdateListener()
+  const executors = [new ActiveVideoCounterExecutor<Video>((video) => video)]
+  manager.registerListener(listener, executors)
+
+  return manager
+}
+
+export function createChannelManager(store: DatabaseManager): DerivedPropertiesManager<Channel, IAvcChannelChange> {
+  const manager = new DerivedPropertiesManager<Channel, IAvcChannelChange>(store, Channel)
+
+  // listen to change of channel's category
+  const channelListener = new ChannelsCategoryChangeListener()
+  const channelExecutors = [new ChannelCategoryActiveVideoCounterExecutor()]
+  manager.registerListener(channelListener, channelExecutors)
+
+  return manager
+}
+
+export function createStorageDataObjectManager(
+  store: DatabaseManager
+): DerivedPropertiesManager<StorageDataObject, IAvcChange> {
+  const manager = new DerivedPropertiesManager<StorageDataObject, IAvcChange>(store, StorageDataObject)
+
+  // listen to change of channel's category
+  const storageDataObjectListener1 = new StorageDataObjectChangeListener_ThumbnailPhoto()
+  const storageDataObjectListener2 = new StorageDataObjectChangeListener_Media()
+  const storageDataObjectExecutors = (adapter) => [new ActiveVideoCounterExecutor<StorageDataObject>(adapter)]
+  manager.registerListener(
+    storageDataObjectListener1,
+    storageDataObjectExecutors((storageDataObject) => storageDataObject.videoThumbnail)
+  )
+  manager.registerListener(
+    storageDataObjectListener2,
+    storageDataObjectExecutors((storageDataObject) => storageDataObject.videoMedia)
+  )
+
+  return manager
+}

+ 34 - 0
query-node/mappings/src/derivedPropertiesManager/applications/index.ts

@@ -0,0 +1,34 @@
+import {
+  createChannelManager,
+  createStorageDataObjectManager,
+  createVideoManager,
+  IAvcChange,
+  IAvcChannelChange,
+} from './activeVideoCounters'
+import { DatabaseManager } from '@joystream/hydra-common'
+import { Channel, Video, StorageDataObject } from 'query-node/dist/model'
+import { DerivedPropertiesManager } from '../classes'
+
+let managers: IAllManagers
+let lastStore: DatabaseManager
+
+export interface IAllManagers {
+  videos: DerivedPropertiesManager<Video, IAvcChange>
+  channels: DerivedPropertiesManager<Channel, IAvcChannelChange>
+  storageDataObjects: DerivedPropertiesManager<StorageDataObject, IAvcChange>
+}
+
+export function getAllManagers(store: DatabaseManager): IAllManagers {
+  // store can sometimes change - depends on Hydra's internal logic
+  // make sure managers are always fresh
+  if (!managers || lastStore !== store) {
+    lastStore = store
+    managers = {
+      videos: createVideoManager(store),
+      channels: createChannelManager(store),
+      storageDataObjects: createStorageDataObjectManager(store),
+    }
+  }
+
+  return managers
+}

+ 107 - 0
query-node/mappings/src/derivedPropertiesManager/classes.ts

@@ -0,0 +1,107 @@
+import { DatabaseManager } from '@joystream/hydra-common'
+import { IExecutor, IListener, IDerivedPropertiesManager } from './interfaces'
+import { EntityType } from '../common'
+
+interface IListenerWithExecutors<Entity, Change> {
+  listener: IListener<Entity, Change>
+  executors: IExecutor<unknown, Change, unknown>[]
+}
+
+export class DerivedPropertiesManager<Entity extends { id: string }, Change>
+  implements IDerivedPropertiesManager<Entity, Change> {
+  private store: DatabaseManager
+  private listeners: IListenerWithExecutors<Entity, Change>[] = []
+  private entityType: EntityType<Entity>
+  private defaultRelations: string[] = []
+
+  constructor(store: DatabaseManager, entityType: EntityType<Entity>, extraDefaultRelations: string[] = []) {
+    this.store = store
+    this.entityType = entityType
+    this.defaultRelations = extraDefaultRelations
+  }
+
+  /// /////////////// IDerivedPropertiesManager ////////////////////////////////
+
+  registerListener(listener: IListener<Entity, Change>, executors: IExecutor<unknown, Change, unknown>[]): void {
+    this.listeners.push({
+      listener,
+      executors,
+    })
+    this.defaultRelations = this.defaultRelations.concat(listener.getRelationDependencies())
+  }
+
+  async onMainEntityCreation(entity: Entity): Promise<void> {
+    for (let i = 0; i < this.listeners.length; i++) {
+      await this.handleListener(undefined, entity, this.listeners[i])
+    }
+  }
+
+  async onMainEntityUpdate(newEntity: Entity, initialEntity?: Entity): Promise<void> {
+    const oldEntity =
+      initialEntity ||
+      (await this.store.get(this.entityType, { where: { id: newEntity.id }, relations: this.defaultRelations }))
+
+    for (let i = 0; i < this.listeners.length; i++) {
+      await this.handleListener(oldEntity, newEntity, this.listeners[i])
+    }
+  }
+
+  async onMainEntityDeletion(initialEntity: Entity): Promise<void> {
+    for (let i = 0; i < this.listeners.length; i++) {
+      await this.handleListener(initialEntity, undefined, this.listeners[i])
+    }
+  }
+
+  /// /////////////// Utils ////////////////////////////////////////////////////
+  private async handleListener(
+    oldEntity: Entity | undefined,
+    newEntity: Entity,
+    listener: IListenerWithExecutors<Entity, Change>
+  ): Promise<void>
+
+  private async handleListener(
+    oldEntity: Entity,
+    newEntity: Entity | undefined,
+    listener: IListenerWithExecutors<Entity, Change>
+  ): Promise<void>
+
+  private async handleListener(
+    oldEntity: Entity,
+    newEntity: Entity,
+    listener: IListenerWithExecutors<Entity, Change>
+  ): Promise<void> {
+    const changePair = listener.listener.hasValueChanged(oldEntity, newEntity)
+
+    if (typeof changePair === 'undefined') {
+      return
+    }
+
+    if (oldEntity && typeof changePair.old !== 'undefined') {
+      await this.callExecutors('updateOldValue', listener, oldEntity, changePair.old)
+    }
+
+    if (newEntity && typeof changePair.new !== 'undefined') {
+      await this.callExecutors('updateNewValue', listener, newEntity, changePair.new)
+    }
+  }
+
+  private async callExecutors(
+    listenerMethod: 'updateOldValue' | 'updateNewValue',
+    listener: IListenerWithExecutors<Entity, Change>,
+    value: Entity,
+    change: Change
+  ): Promise<void> {
+    for (let i = 0; i < listener.executors.length; i++) {
+      const executor = listener.executors[i]
+
+      const dependencies = await executor.loadDerivedEntities(this.store, value)
+      if (!dependencies.length) {
+        continue
+      }
+
+      const changedDependencies = dependencies.map((dependency) => executor[listenerMethod](dependency, change))
+
+      await executor.saveDerivedEntities(this.store, changedDependencies)
+    }
+  }
+}

+ 2 - 0
query-node/mappings/src/derivedPropertiesManager/index.ts

@@ -0,0 +1,2 @@
+export * from './classes'
+export * from './interfaces'

+ 28 - 0
query-node/mappings/src/derivedPropertiesManager/interfaces.ts

@@ -0,0 +1,28 @@
+import { DatabaseManager } from '@joystream/hydra-common'
+
+export type IChangePair<Change> = {
+  old: Change | undefined
+  new: Change | undefined
+}
+
+export interface IListener<Entity, Change> {
+  getRelationDependencies(): string[]
+  hasValueChanged(oldValue: Entity, newValue: Entity): IChangePair<Change> | undefined
+  hasValueChanged(oldValue: Entity | undefined, newValue: Entity): IChangePair<Change> | undefined
+  hasValueChanged(oldValue: Entity, newValue: Entity | undefined): IChangePair<Change> | undefined
+}
+
+export interface IExecutor<Entity, Change, DerivedEntity> {
+  loadDerivedEntities(store: DatabaseManager, entity: Entity): Promise<DerivedEntity[]>
+  saveDerivedEntities(store: DatabaseManager, entities: DerivedEntity[]): Promise<void>
+  updateOldValue(entity: DerivedEntity, change: Change): DerivedEntity
+  updateNewValue(entity: DerivedEntity, change: Change): DerivedEntity
+}
+
+export interface IDerivedPropertiesManager<Entity, Change> {
+  registerListener(listener: IListener<Entity, Change>, executors: IExecutor<unknown, Change, unknown>[]): void
+  onMainEntityCreation(entity: Entity): Promise<void>
+  onMainEntityUpdate(newEntity: Entity): Promise<void>
+  onMainEntityUpdate(initialEntity: Entity, newEntity: Entity): Promise<void>
+  onMainEntityDeletion(initialEntity: Entity): Promise<void>
+}

+ 0 - 1
query-node/mappings/src/forum.ts

@@ -69,7 +69,6 @@ import { MAX_TAGS_PER_FORUM_THREAD } from '@joystream/metadata-protobuf/consts'
 import { Not, In } from 'typeorm'
 import { Bytes } from '@polkadot/types'
 import _ from 'lodash'
-import BN from 'bn.js'
 
 async function getCategory(store: DatabaseManager, categoryId: string, relations?: string[]): Promise<ForumCategory> {
   const category = await store.get(ForumCategory, { where: { id: categoryId }, relations })

+ 55 - 57
query-node/mappings/src/storage/index.ts

@@ -18,16 +18,10 @@ import {
   StorageDataObject,
   StorageSystemParameters,
   GeoCoordinates,
-  Video,
 } from 'query-node/dist/model'
 import BN from 'bn.js'
 import { getById, inconsistentState, INT32MAX, toNumber } from '../common'
-import {
-  getVideoActiveStatus,
-  updateVideoActiveCounters,
-  videoRelationsForCounters,
-  unsetAssetRelations,
-} from '../content/utils'
+import { videoRelationsForCounters, unsetAssetRelations } from '../content/utils'
 import {
   processDistributionBucketFamilyMetadata,
   processDistributionOperatorMetadata,
@@ -48,7 +42,7 @@ import {
   distributionOperatorId,
   distributionBucketIdByFamilyAndIndex,
 } from './utils'
-import { In } from 'typeorm'
+import { getAllManagers } from '../derivedPropertiesManager/applications'
 
 // STORAGE BUCKETS
 
@@ -235,54 +229,66 @@ export async function storage_DataObjectsUploaded({ event, store }: EventContext
 
 export async function storage_PendingDataObjectsAccepted({ event, store }: EventContext & StoreContext): Promise<void> {
   const [, , bagId, dataObjectIds] = new Storage.PendingDataObjectsAcceptedEvent(event).params
-  const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds, ['videoThumbnail', 'videoMedia'])
-
-  // get ids of videos that are in relation with accepted data objects
-  const notUniqueVideoIds = dataObjects
-    .map((item) => [item.videoMedia?.id.toString(), item.videoThumbnail?.id.toString()])
-    .flat()
-    .filter((item) => item)
-  const videoIds = [...new Set(notUniqueVideoIds)]
-
-  // load videos
-  const videosPre = await store.getMany(Video, {
-    where: { id: In(videoIds) },
-    relations: videoRelationsForCounters,
-  })
+  const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds, [
+    'videoThumbnail',
+    ...videoRelationsForCounters.map((item) => `videoThumbnail.${item}`),
+    'videoMedia',
+    ...videoRelationsForCounters.map((item) => `videoMedia.${item}`),
+  ])
+
+  /*
+    This function helps to workaround `store.get*` functions not return objects
+    shared by mutliple entities (at least now). Because of that when updating for example
+    `dataObject.videoThumnail.channel.activeVideoCounter` on dataObject A, this change is not
+    reflected on `dataObject.videoMedia.channel.activeVideoCounter` on dataObject B.
+  */
+  function applyUpdate<Entity extends { id: { toString(): string } }>(
+    entities: Entity[],
+    updateEntity: (entity: Entity) => void,
+    relations: string[][]
+  ): void {
+    const ids = entities.map((entity) => entity.id.toString())
+
+    for (const entity of entities) {
+      updateEntity(entity)
+
+      for (const relation of relations) {
+        const target = relation.reduce((acc, relationPart) => {
+          if (!acc) {
+            return acc
+          }
+
+          return acc[relationPart]
+        }, entity)
+
+        if (!target || target === entity || !ids.includes(target.id.toString())) {
+          continue
+        }
+
+        updateEntity(target)
+      }
+    }
+  }
 
-  // remember if videos are fully active before data objects update
-  const initialActiveStates = videosPre.map((video) => getVideoActiveStatus(video)).filter((item) => item)
+  // ensure update is reflected in all objects
+  applyUpdate(dataObjects, (dataObject) => (dataObject.isAccepted = true), [
+    ['videoThumbnail', 'thumbnailPhoto'],
+    ['videoThumbnail', 'media'],
+    ['videoMedia', 'thumbnailPhoto'],
+    ['videoMedia', 'media'],
+  ])
 
   // accept storage data objects
   await Promise.all(
     dataObjects.map(async (dataObject) => {
       dataObject.isAccepted = true
+
+      // update video active counters
+      await getAllManagers(store).storageDataObjects.onMainEntityUpdate(dataObject)
+
       await store.save<StorageDataObject>(dataObject)
     })
   )
-
-  /*
-    This approach of reloading videos one by one is not optimal, but it is straightforward algorithm.
-
-    This reduces otherwise complex situation caused by `store.get*` functions not return objects
-    shared by mutliple entities (at least now). Because of that when updating for example
-    `dataObject.videoThumnail.channel.activeVideoCounter` on dataObject A, this change is not
-    reflected on `dataObject.videoMedia.channel.activeVideoCounter` on dataObject B.
-
-    We can upgrade this algorithm in the future if this event mapping proves to have serious
-    performance issues. In that case, a unit test for this mapping will be required.
-  */
-  // load relevant videos one by one and update related active-video-counters
-  for (const initialActiveState of initialActiveStates) {
-    // load refreshed version of videos and related entities (channel, channel category, category)
-
-    const video = (await store.get(Video, {
-      where: { id: initialActiveState.video.id.toString() },
-      relations: videoRelationsForCounters,
-    })) as Video
-
-    await updateVideoActiveCounters(store, initialActiveState, getVideoActiveStatus(video))
-  }
 }
 
 export async function storage_DataObjectsMoved({ event, store }: EventContext & StoreContext): Promise<void> {
@@ -308,18 +314,10 @@ export async function storage_DataObjectsDeleted({ event, store }: EventContext
 
   await Promise.all(
     dataObjects.map(async (dataObject) => {
-      // remember if video is fully active before update
-      const initialVideoActiveStatus =
-        (dataObject.videoThumbnail && getVideoActiveStatus(dataObject.videoThumbnail)) ||
-        (dataObject.videoMedia && getVideoActiveStatus(dataObject.videoMedia)) ||
-        null
+      // update video active counters
+      await getAllManagers(store).storageDataObjects.onMainEntityDeletion(dataObject)
 
       await unsetAssetRelations(store, dataObject)
-
-      // update video active counters
-      if (initialVideoActiveStatus) {
-        await updateVideoActiveCounters(store, initialVideoActiveStatus, undefined)
-      }
     })
   )
 }

+ 1 - 0
query-node/package.json

@@ -19,6 +19,7 @@
     "db:create": "yarn workspace query-node db:create",
     "db:drop": "yarn workspace query-node db:drop",
     "db:prepare": "yarn workspace query-node db:create && yarn workspace query-node db:sync",
+    "db:reset": "yarn db:drop && yarn db:prepare && yarn db:migrate",
     "db:schema:migrate": "yarn workspace query-node db:migrate",
     "db:processor:migrate": "hydra-processor migrate --env ../.env",
     "db:migrate": "yarn db:schema:migrate && yarn db:processor:migrate",

+ 4 - 1
query-node/run-tests.sh

@@ -28,9 +28,12 @@ docker-compose down -v
 docker-compose -f ../docker-compose.yml up -d joystream-node
 ./start.sh
 
+../tests/network-tests/start-storage.sh
+export REUSE_KEYS=true
+
 # pass the scenario name without .ts extension
 SCENARIO=$1
 # fallback if scenario if not specified
-SCENARIO=${SCENARIO:=full}
+SCENARIO=${SCENARIO:="content-directory"}
 
 time yarn workspace network-tests run-test-scenario ${SCENARIO}