Browse Source

Improve offchain data migration process (#129)

* Improve offchian data migration process

* OffchainState.import: Add error in case import file does not exist
Leszek Wiesner 1 year ago
parent
commit
c3b02f87f1

+ 2 - 1
.dockerignore

@@ -2,4 +2,5 @@
 /node_modules
 /lib
 /*Versions.jsonl
-/db/persisted
+/db/export.json*
+/db/persisted

+ 2 - 1
.gitignore

@@ -9,4 +9,5 @@
 src/model/generated
 /schema.graphql
 /db/persisted
-/scripts/orion-v1-migration/data
+/scripts/orion-v1-migration/data
+/db/export.json*

+ 2 - 1
.prettierignore

@@ -7,4 +7,5 @@ src/types
 src/model/generated
 db/migrations/*.js
 schema.graphql
-/scripts/orion-v1-migration/data
+/scripts/orion-v1-migration/data
+/db/export.json*

+ 2 - 3
Makefile

@@ -28,7 +28,6 @@ typegen:
 	@npx squid-substrate-typegen typegen.json
 
 prepare: install codegen build
-	@mkdir db/persisted || true
 
 up-squid:
 	@docker network create joystream_default || true
@@ -41,8 +40,8 @@ up-archive:
 up: up-archive up-squid
 
 down-squid:
-	@./db/export.sh
-	@docker-compose down -v
+	@docker-compose stop orion_processor
+	@npm run offchain-state:export && docker-compose down -v
 	
 down-archive:
 	@docker-compose -f archive/docker-compose.yml down -v

+ 0 - 19
db/export.sh

@@ -1,19 +0,0 @@
-#!/bin/bash
-
-SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
-cd $SCRIPT_PATH
-
-source ../.env
-
-DOCKER_COMPOSE="docker-compose -f ../docker-compose.yml"
-
-echo "Exporting video views..."
-$DOCKER_COMPOSE exec orion_db psql -p ${DB_PORT} -U postgres -d ${DB_NAME} -c "\copy processor.video_view_event to '/persisted_data/video_view_event' csv;"
-echo "Exporting channel follows..."
-$DOCKER_COMPOSE exec orion_db psql -p ${DB_PORT} -U postgres -d ${DB_NAME} -c "\copy processor.channel_follow to '/persisted_data/channel_follow' csv;"
-echo "Exporting reports..."
-$DOCKER_COMPOSE exec orion_db psql -p ${DB_PORT} -U postgres -d ${DB_NAME} -c "\copy processor.report to '/persisted_data/report' csv;"
-echo "Exporting gateway config..."
-$DOCKER_COMPOSE exec orion_db psql -p ${DB_PORT} -U postgres -d ${DB_NAME} -c "\copy gateway_config to '/persisted_data/gateway_config' csv;"
-echo "Exporting NFT featuring requests..."
-$DOCKER_COMPOSE exec orion_db psql -p ${DB_PORT} -U postgres -d ${DB_NAME} -c "\copy processor.nft_featuring_request to '/persisted_data/nft_featuring_request' csv;"

+ 0 - 32
db/migrations/2200000000000-PersistedData.js

@@ -1,32 +0,0 @@
-const fs = require('fs')
-const path = require('path')
-module.exports = class PersistedData2200000000000 {
-    name = 'PersistedData2200000000000'
-
-    tablesToImport = [
-      'video_view_event',
-      'report',
-      'channel_follow',
-      'gateway_config',
-      'nft_featuring_request'
-    ]
-
-    async up(db) {
-      if (process.env.SKIP_IMPORT === '1' || process.env.SKIP_IMPORT === 'true') {
-        return
-      }
-      await db.query('SET LOCAL search_path TO processor,public')
-      for (const tableName of this.tablesToImport) {
-        const sourcePath = path.resolve(__dirname, `../persisted/${tableName}`)
-        const containerPath = `/persisted_data/${tableName}`
-        if (fs.existsSync(sourcePath)) {
-          await db.query(`COPY ${tableName} FROM '${containerPath}' DELIMITER ',' CSV;`)
-        }
-      }
-      await db.query('SET LOCAL search_path TO DEFAULT')
-    }
-
-    async down() {
-        // Do nothing
-    }
-  }

+ 0 - 1
docker-compose.yml

@@ -16,7 +16,6 @@ services:
     shm_size: 1g
     volumes:
       - orion_db_data:/var/lib/postgresql/data
-      - ./db/persisted:/persisted_data
       - ./db/postgres.conf:/etc/postgresql/postgresql.conf
 
   orion_processor:

+ 1 - 1
package.json

@@ -17,7 +17,7 @@
     "tests:codegen": "npx graphql-codegen -c ./src/tests/v1/codegen.yml && npx graphql-codegen -c ./src/tests/v2/codegen.yml",
     "tests:compareState": "npx ts-node ./src/tests/compareState.ts",
     "tests:benchmark": "npx ts-node ./src/tests/benchmarks/index.ts",
-    "v1-migration:prepare": "./scripts/orion-v1-migration/export.sh && npx ts-node ./scripts/orion-v1-migration/prepareData.ts"
+    "offchain-state:export": "npx ts-node ./scripts/offchain-state/export.ts"
   },
   "overrides": {
     "@polkadot/api": "8.9.1",

+ 6 - 0
scripts/config.ts

@@ -0,0 +1,6 @@
+import dotenv from 'dotenv'
+import path from 'path'
+
+dotenv.config({
+  path: path.resolve(__dirname, '../.env'),
+})

+ 16 - 0
scripts/offchain-state/export.ts

@@ -0,0 +1,16 @@
+import '../config'
+import { OffchainState } from '../../src/utils/offchainState'
+import { getEm } from '../../src/utils/orm'
+
+async function main() {
+  const offchainState = new OffchainState()
+  const em = await getEm()
+  await offchainState.export(em)
+}
+
+main()
+  .then(() => process.exit(0))
+  .catch((e) => {
+    console.error(e)
+    process.exit(-1)
+  })

+ 0 - 15
scripts/orion-v1-migration/export.sh

@@ -1,15 +0,0 @@
-#!/bin/bash
-
-SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
-cd $SCRIPT_PATH
-
-docker exec -t mongo mongoexport -d orion -c videoEvents -o videoEvents.json --jsonArray
-docker exec -t mongo mongoexport -d orion -c featuredContent -o featuredContent.json
-docker exec -t mongo mongoexport -d orion -c reportedVideos -o reportedVideos.json --jsonArray
-docker exec -t mongo mongoexport -d orion -c reportedChannels -o reportedChannels.json --jsonArray
-mkdir data 2>/dev/null || true
-docker cp mongo:videoEvents.json data/videoEvents.json
-docker cp mongo:featuredContent.json data/featuredContent.json
-docker cp mongo:reportedVideos.json data/reportedVideos.json
-docker cp mongo:reportedChannels.json data/reportedChannels.json
-echo "OK"

+ 0 - 119
scripts/orion-v1-migration/prepareData.ts

@@ -1,119 +0,0 @@
-// import featuredContentJson from './data/featuredContent.json'
-import reportedChannelsJson from './data/reportedChannels.json'
-import reportedVideosJson from './data/reportedVideos.json'
-import videoEventsJson from './data/videoEvents.json'
-import { Report, VideoViewEvent } from '../../src/model'
-import { stringify } from 'csv-stringify/sync'
-import fs from 'fs'
-import path from 'path'
-import { randomAsHex } from '@polkadot/util-crypto'
-
-const OUTPUT_PATH = path.join(__dirname, '../../db/persisted')
-
-// type FeaturedContent = {
-//   featuredVideosPerCategory: {
-//     [categoryId: string]: {
-//       videoId: string
-//       videoCutUrl: string
-//     }[]
-//   }
-//   videoHero: {
-//     videoId: string
-//     heroTitle: string
-//     heroVideoCutUrl: string
-//     heroPosterUrl: string
-//   }
-// }
-
-type ReportedContent = { reporterIp: string; timestamp: { $date: string }; rationale: string }
-type ReportedChannel = ReportedContent & {
-  channelId: string
-}
-
-type ReportedVideo = ReportedContent & {
-  videoId: string
-}
-
-type VideoEvent = {
-  videoId: string
-  channelId: string
-  timestamp: { $date: string }
-  actorId: string
-  type: string
-}
-
-const reportedChannels: ReportedChannel[] = reportedChannelsJson
-const reportedVideos: ReportedVideo[] = reportedVideosJson
-const videoEvents: VideoEvent[] = videoEventsJson
-
-console.log('Preparing Orion v1 data for import...')
-
-const reports = [...reportedChannels, ...reportedVideos].map(
-  (rc) =>
-    new Report({
-      id: randomAsHex(16).replace('0x', ''),
-      channelId: 'channelId' in rc ? rc.channelId : undefined,
-      videoId: 'videoId' in rc ? rc.videoId : undefined,
-      ip: rc.reporterIp,
-      rationale: rc.rationale,
-      timestamp: new Date(rc.timestamp.$date),
-    })
-)
-
-let views = videoEvents
-  .filter((e) => e.type === 'ADD_VIEW')
-  .map(
-    (v) =>
-      new VideoViewEvent({
-        ip: v.actorId,
-        timestamp: new Date(v.timestamp.$date),
-        videoId: v.videoId,
-      })
-  )
-  .sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime())
-
-if (process.env.EXCLUDE_DUPLICATE_VIEWS === 'true' && process.env.VIDEO_VIEW_PER_IP_TIME_LIMIT) {
-  const timeLimitMs = parseInt(process.env.VIDEO_VIEW_PER_IP_TIME_LIMIT) * 1000
-  const viewsReduced = views.reduce((reduced, v) => {
-    return !reduced.find(
-      (vr) =>
-        vr.timestamp.getTime() > v.timestamp.getTime() - timeLimitMs &&
-        vr.ip === v.ip &&
-        vr.videoId === v.videoId
-    )
-      ? reduced.concat(v)
-      : reduced
-  }, [] as VideoViewEvent[])
-  views = viewsReduced
-}
-
-views.forEach((v, i) => {
-  v.id = `${v.videoId}-${views.slice(0, i).filter((v2) => v2.videoId === v.videoId).length + 1}`
-})
-
-const viewColumns: (keyof VideoViewEvent)[] = ['id', 'videoId', 'ip', 'timestamp']
-const reportColumns: (keyof Report)[] = [
-  'id',
-  'ip',
-  'channelId',
-  'videoId',
-  'timestamp',
-  'rationale',
-]
-
-fs.writeFileSync(
-  `${OUTPUT_PATH}/video_view_event`,
-  stringify(views, { columns: viewColumns, cast: { date: (d) => d.toISOString() } })
-)
-console.log(
-  `${views.length} video views saved to "${OUTPUT_PATH}/video_view_event". ` +
-    `Will be imported during Orion v2 migration step.`
-)
-fs.writeFileSync(
-  `${OUTPUT_PATH}/report`,
-  stringify(reports, { columns: reportColumns, cast: { date: (d) => d.toISOString() } })
-)
-console.log(
-  `${reports.length} reports saved to "${OUTPUT_PATH}/report". ` +
-    `Will be imported during Orion v2 migration step.`
-)

+ 87 - 70
src/processor.ts

@@ -90,6 +90,7 @@ import { EntityManagerOverlay } from './utils/overlay'
 import { EventNames, EventHandler, eventConstructors, EventInstance } from './utils/events'
 import { commentCountersManager, videoRelevanceManager } from './mappings/utils'
 import { EntityManager } from 'typeorm'
+import { OffchainState } from './utils/offchainState'
 
 const defaultEventOptions = {
   data: {
@@ -109,76 +110,78 @@ const maxCachedEntities = parseInt(process.env.MAX_CACHED_ENTITIES || '1000')
 const processor = new SubstrateBatchProcessor()
   .setDataSource({ archive: archiveUrl })
   .addEvent('Content.VideoCreated', defaultEventOptions)
-  .addEvent('Content.VideoUpdated', defaultEventOptions)
-  .addEvent('Content.VideoDeleted', defaultEventOptions)
-  .addEvent('Content.VideoDeletedByModerator', defaultEventOptions)
-  .addEvent('Content.VideoVisibilitySetByModerator', defaultEventOptions)
-  .addEvent('Content.ChannelCreated', defaultEventOptions)
-  .addEvent('Content.ChannelUpdated', defaultEventOptions)
-  .addEvent('Content.ChannelDeleted', defaultEventOptions)
-  .addEvent('Content.ChannelDeletedByModerator', defaultEventOptions)
-  .addEvent('Content.ChannelVisibilitySetByModerator', defaultEventOptions)
-  .addEvent('Content.ChannelOwnerRemarked', defaultEventOptions)
-  .addEvent('Content.ChannelAgentRemarked', defaultEventOptions)
-  .addEvent('Content.OpenAuctionStarted', defaultEventOptions)
-  .addEvent('Content.EnglishAuctionStarted', defaultEventOptions)
-  .addEvent('Content.NftIssued', defaultEventOptions)
-  .addEvent('Content.AuctionBidMade', defaultEventOptions)
-  .addEvent('Content.AuctionBidCanceled', defaultEventOptions)
-  .addEvent('Content.AuctionCanceled', defaultEventOptions)
-  .addEvent('Content.EnglishAuctionSettled', defaultEventOptions)
-  .addEvent('Content.BidMadeCompletingAuction', defaultEventOptions)
-  .addEvent('Content.OpenAuctionBidAccepted', defaultEventOptions)
-  .addEvent('Content.OfferStarted', defaultEventOptions)
-  .addEvent('Content.OfferAccepted', defaultEventOptions)
-  .addEvent('Content.OfferCanceled', defaultEventOptions)
-  .addEvent('Content.NftSellOrderMade', defaultEventOptions)
-  .addEvent('Content.NftBought', defaultEventOptions)
-  .addEvent('Content.BuyNowCanceled', defaultEventOptions)
-  .addEvent('Content.BuyNowPriceUpdated', defaultEventOptions)
-  .addEvent('Content.NftSlingedBackToTheOriginalArtist', defaultEventOptions)
-  .addEvent('Content.ChannelPayoutsUpdated', defaultEventOptions)
-  .addEvent('Content.ChannelRewardUpdated', defaultEventOptions)
-  .addEvent('Content.ChannelFundsWithdrawn', defaultEventOptions)
-  .addEvent('Content.ChannelRewardClaimedAndWithdrawn', defaultEventOptions)
-  .addEvent('Storage.StorageBucketCreated', defaultEventOptions)
-  .addEvent('Storage.StorageBucketInvitationAccepted', defaultEventOptions)
-  .addEvent('Storage.StorageBucketsUpdatedForBag', defaultEventOptions)
-  .addEvent('Storage.StorageOperatorMetadataSet', defaultEventOptions)
-  .addEvent('Storage.StorageBucketVoucherLimitsSet', defaultEventOptions)
-  .addEvent('Storage.PendingDataObjectsAccepted', defaultEventOptions)
-  .addEvent('Storage.StorageBucketInvitationCancelled', defaultEventOptions)
-  .addEvent('Storage.StorageBucketOperatorInvited', defaultEventOptions)
-  .addEvent('Storage.StorageBucketOperatorRemoved', defaultEventOptions)
-  .addEvent('Storage.StorageBucketStatusUpdated', defaultEventOptions)
-  .addEvent('Storage.StorageBucketDeleted', defaultEventOptions)
-  .addEvent('Storage.VoucherChanged', defaultEventOptions)
-  .addEvent('Storage.DynamicBagCreated', defaultEventOptions)
-  .addEvent('Storage.DynamicBagDeleted', defaultEventOptions)
-  .addEvent('Storage.DataObjectsUploaded', defaultEventOptions)
-  .addEvent('Storage.DataObjectsUpdated', defaultEventOptions)
-  .addEvent('Storage.DataObjectsMoved', defaultEventOptions)
-  .addEvent('Storage.DataObjectsDeleted', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketCreated', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketStatusUpdated', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketDeleted', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketsUpdatedForBag', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketModeUpdated', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketOperatorInvited', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketInvitationCancelled', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketInvitationAccepted', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketMetadataSet', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketOperatorRemoved', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketFamilyCreated', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketFamilyMetadataSet', defaultEventOptions)
-  .addEvent('Storage.DistributionBucketFamilyDeleted', defaultEventOptions)
-  .addEvent('Members.MemberCreated', defaultEventOptions)
-  .addEvent('Members.MembershipBought', defaultEventOptions)
-  .addEvent('Members.MembershipGifted', defaultEventOptions)
-  .addEvent('Members.MemberInvited', defaultEventOptions)
-  .addEvent('Members.MemberAccountsUpdated', defaultEventOptions)
-  .addEvent('Members.MemberProfileUpdated', defaultEventOptions)
-  .addEvent('Members.MemberRemarked', defaultEventOptions)
+// By adding other events separately, we sacrifice some type safety,
+// but otherwise the compilation of this file takes forever.
+processor.addEvent('Content.VideoUpdated', defaultEventOptions)
+processor.addEvent('Content.VideoDeleted', defaultEventOptions)
+processor.addEvent('Content.VideoDeletedByModerator', defaultEventOptions)
+processor.addEvent('Content.VideoVisibilitySetByModerator', defaultEventOptions)
+processor.addEvent('Content.ChannelCreated', defaultEventOptions)
+processor.addEvent('Content.ChannelUpdated', defaultEventOptions)
+processor.addEvent('Content.ChannelDeleted', defaultEventOptions)
+processor.addEvent('Content.ChannelDeletedByModerator', defaultEventOptions)
+processor.addEvent('Content.ChannelVisibilitySetByModerator', defaultEventOptions)
+processor.addEvent('Content.ChannelOwnerRemarked', defaultEventOptions)
+processor.addEvent('Content.ChannelAgentRemarked', defaultEventOptions)
+processor.addEvent('Content.OpenAuctionStarted', defaultEventOptions)
+processor.addEvent('Content.EnglishAuctionStarted', defaultEventOptions)
+processor.addEvent('Content.NftIssued', defaultEventOptions)
+processor.addEvent('Content.AuctionBidMade', defaultEventOptions)
+processor.addEvent('Content.AuctionBidCanceled', defaultEventOptions)
+processor.addEvent('Content.AuctionCanceled', defaultEventOptions)
+processor.addEvent('Content.EnglishAuctionSettled', defaultEventOptions)
+processor.addEvent('Content.BidMadeCompletingAuction', defaultEventOptions)
+processor.addEvent('Content.OpenAuctionBidAccepted', defaultEventOptions)
+processor.addEvent('Content.OfferStarted', defaultEventOptions)
+processor.addEvent('Content.OfferAccepted', defaultEventOptions)
+processor.addEvent('Content.OfferCanceled', defaultEventOptions)
+processor.addEvent('Content.NftSellOrderMade', defaultEventOptions)
+processor.addEvent('Content.NftBought', defaultEventOptions)
+processor.addEvent('Content.BuyNowCanceled', defaultEventOptions)
+processor.addEvent('Content.BuyNowPriceUpdated', defaultEventOptions)
+processor.addEvent('Content.NftSlingedBackToTheOriginalArtist', defaultEventOptions)
+processor.addEvent('Content.ChannelPayoutsUpdated', defaultEventOptions)
+processor.addEvent('Content.ChannelRewardUpdated', defaultEventOptions)
+processor.addEvent('Content.ChannelFundsWithdrawn', defaultEventOptions)
+processor.addEvent('Content.ChannelRewardClaimedAndWithdrawn', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketCreated', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketInvitationAccepted', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketsUpdatedForBag', defaultEventOptions)
+processor.addEvent('Storage.StorageOperatorMetadataSet', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketVoucherLimitsSet', defaultEventOptions)
+processor.addEvent('Storage.PendingDataObjectsAccepted', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketInvitationCancelled', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketOperatorInvited', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketOperatorRemoved', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketStatusUpdated', defaultEventOptions)
+processor.addEvent('Storage.StorageBucketDeleted', defaultEventOptions)
+processor.addEvent('Storage.VoucherChanged', defaultEventOptions)
+processor.addEvent('Storage.DynamicBagCreated', defaultEventOptions)
+processor.addEvent('Storage.DynamicBagDeleted', defaultEventOptions)
+processor.addEvent('Storage.DataObjectsUploaded', defaultEventOptions)
+processor.addEvent('Storage.DataObjectsUpdated', defaultEventOptions)
+processor.addEvent('Storage.DataObjectsMoved', defaultEventOptions)
+processor.addEvent('Storage.DataObjectsDeleted', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketCreated', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketStatusUpdated', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketDeleted', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketsUpdatedForBag', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketModeUpdated', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketOperatorInvited', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketInvitationCancelled', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketInvitationAccepted', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketMetadataSet', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketOperatorRemoved', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketFamilyCreated', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketFamilyMetadataSet', defaultEventOptions)
+processor.addEvent('Storage.DistributionBucketFamilyDeleted', defaultEventOptions)
+processor.addEvent('Members.MemberCreated', defaultEventOptions)
+processor.addEvent('Members.MembershipBought', defaultEventOptions)
+processor.addEvent('Members.MembershipGifted', defaultEventOptions)
+processor.addEvent('Members.MemberInvited', defaultEventOptions)
+processor.addEvent('Members.MemberAccountsUpdated', defaultEventOptions)
+processor.addEvent('Members.MemberProfileUpdated', defaultEventOptions)
+processor.addEvent('Members.MemberRemarked', defaultEventOptions)
 
 type Item = BatchProcessorItem<typeof processor>
 type Ctx = BatchContext<Store, Item>
@@ -260,6 +263,9 @@ const eventHandlers: { [E in EventNames]: EventHandler<E> } = {
   'Members.MemberRemarked': processMemberRemarkedEvent,
 }
 
+const offchainState = new OffchainState()
+const exportBlockNumber = offchainState.getExportBlockNumber()
+
 async function processEvent<EventName extends EventNames>(
   ctx: Ctx,
   name: EventName,
@@ -308,6 +314,17 @@ processor.run(new TypeormDatabase({ isolationLevel: 'READ COMMITTED' }), async (
         }
       }
     }
+    // Importing exported offchain state
+    if (block.header.height >= exportBlockNumber && !offchainState.isImported) {
+      ctx.log.info(`Export block ${exportBlockNumber} reached, importing offchain state...`)
+      await overlay.updateDatabase()
+      const em = overlay.getEm()
+      await offchainState.import(em)
+      await commentCountersManager.updateVideoCommentsCounters(em, true)
+      await commentCountersManager.updateParentRepliesCounters(em, true)
+      await videoRelevanceManager.updateVideoRelevanceValue(em, true)
+      ctx.log.info(`Offchain state successfully imported!`)
+    }
   }
 
   ctx.log.info(`Saving database updates...`)

+ 1 - 1
src/server-extension/resolvers/AssetsResolver/index.ts

@@ -7,7 +7,7 @@ import {
   DistributionBucketOperatorStatus,
 } from '../../../model'
 import _ from 'lodash'
-import { getEm } from '../../orm'
+import { getEm } from '../../../utils/orm'
 import { performance } from 'perf_hooks'
 import urljoin from 'url-join'
 import { Context } from '@subsquid/openreader/lib/context'

+ 1 - 1
src/server-extension/resolvers/StateResolver/index.ts

@@ -2,7 +2,7 @@ import { Resolver, Root, Subscription } from 'type-graphql'
 import type { EntityManager } from 'typeorm'
 import { ProcessorState } from './types'
 import _, { isObject } from 'lodash'
-import { getEm } from '../../orm'
+import { getEm } from '../../../utils/orm'
 import { has } from '../../../utils/misc'
 
 class ProcessorStateRetriever {

+ 20 - 7
src/utils/CommentsCountersManager.ts

@@ -13,8 +13,8 @@ export class CommentCountersManager {
     id && this.videosToUpdate.add(id)
   }
 
-  async updateVideoCommentsCounters(em: EntityManager) {
-    if (this.videosToUpdate.size) {
+  async updateVideoCommentsCounters(em: EntityManager, forceUpdateAll = false) {
+    if (this.videosToUpdate.size || forceUpdateAll) {
       await em.query(`
         UPDATE "video"
         SET
@@ -25,13 +25,20 @@ export class CommentCountersManager {
               AND "comment"."status" = '${CommentStatus.VISIBLE}'
               AND "comment"."is_excluded" = '0'
           )
-        WHERE "id" IN (${[...this.videosToUpdate.values()].map((id) => `'${id}'`).join(', ')})`)
+        ${
+          forceUpdateAll
+            ? ''
+            : `WHERE "id" IN (${[...this.videosToUpdate.values()]
+                .map((id) => `'${id}'`)
+                .join(', ')})`
+        }
+      `)
       this.videosToUpdate.clear()
     }
   }
 
-  async updateParentRepliesCounters(em: EntityManager) {
-    if (this.commentsToUpdate.size) {
+  async updateParentRepliesCounters(em: EntityManager, forceUpdateAll = false) {
+    if (this.commentsToUpdate.size || forceUpdateAll) {
       await em.query(`
         UPDATE "comment"
         SET
@@ -53,8 +60,14 @@ export class CommentCountersManager {
               WHERE "comment_reaction"."comment_id" = "comment"."id"
             ) AS "reactions_and_replies"
           )
-          WHERE "id" IN (${[...this.commentsToUpdate.values()].map((id) => `'${id}'`).join(', ')})
-        `)
+        ${
+          forceUpdateAll
+            ? ''
+            : `WHERE "id" IN (${[...this.commentsToUpdate.values()]
+                .map((id) => `'${id}'`)
+                .join(', ')})`
+        }
+      `)
       this.commentsToUpdate.clear()
     }
   }

+ 1 - 1
src/utils/VideoRelevanceManager.ts

@@ -1,6 +1,6 @@
 import { EntityManager } from 'typeorm'
 import { config, ConfigVariable } from './config'
-import { getEm } from '../server-extension/orm'
+import { getEm } from './orm'
 
 // constant used to parse seconds from creation
 export const NEWNESS_SECONDS_DIVIDER = 60 * 60 * 24

+ 162 - 0
src/utils/offchainState.ts

@@ -0,0 +1,162 @@
+import { EntityManager } from 'typeorm'
+import { withHiddenEntities } from './sql'
+import fs from 'fs'
+import path from 'path'
+import { createLogger } from '@subsquid/logger'
+import assert from 'assert'
+
+const DEFAULT_EXPORT_PATH = path.resolve(__dirname, '../../db/export.json')
+
+const exportedStateMap = {
+  VideoViewEvent: true,
+  ChannelFollow: true,
+  Report: true,
+  GatewayConfig: true,
+  NftFeaturingRequest: true,
+  VideoHero: true,
+  VideoFeaturedInCategory: true,
+  Channel: ['is_excluded', 'video_views_num'],
+  Video: ['is_excluded', 'views_num'],
+  Comment: ['is_excluded'],
+  OwnedNft: ['is_featured'],
+  VideoCategory: ['is_supported'],
+}
+
+type ExportedData = {
+  [K in keyof typeof exportedStateMap]: {
+    type: 'insert' | 'update'
+    values: Record<string, unknown>[]
+  }
+}
+
+type ExportedState = {
+  data: ExportedData
+  blockNumber: number
+}
+
+export class OffchainState {
+  private logger = createLogger('offchainState')
+  private _isImported = false
+
+  public get isImported(): boolean {
+    return this._isImported
+  }
+
+  public async export(em: EntityManager, exportFilePath = DEFAULT_EXPORT_PATH): Promise<void> {
+    this.logger.info('Exporting offchain state')
+    const exportedState: ExportedState = await em.transaction(async (em) => {
+      const blockNumberPre: number = (
+        await em.query('SELECT height FROM squid_processor.status WHERE id = 0')
+      )[0].height
+      this.logger.info(`Export block number: ${blockNumberPre}`)
+      const data = await withHiddenEntities(em, async () => {
+        return Object.fromEntries(
+          await Promise.all(
+            Object.entries(exportedStateMap).map(async ([entityName, fields]) => {
+              const type = Array.isArray(fields) ? 'update' : 'insert'
+              const values = Array.isArray(fields)
+                ? await em
+                    .getRepository(entityName)
+                    .createQueryBuilder()
+                    .select(['id', ...fields])
+                    .getRawMany()
+                : await em.getRepository(entityName).find({})
+              this.logger.info(
+                `Exporting ${values.length} ${entityName} entities ` +
+                  `(type: ${type}` +
+                  (Array.isArray(fields) ? `, fields: ${fields.join(', ')})` : ')')
+              )
+              return [entityName, { type, values }]
+            })
+          )
+        )
+      })
+      const blockNumberPost: number = (
+        await em.query('SELECT height FROM squid_processor.status WHERE id = 0')
+      )[0].height
+      assert(blockNumberPre === blockNumberPost, 'Block number changed during export')
+      return { data, blockNumber: blockNumberPost }
+    })
+
+    this.logger.info(`Saving export data to ${exportFilePath}`)
+    fs.writeFileSync(exportFilePath, JSON.stringify(exportedState))
+    this.logger.info('Done')
+  }
+
+  public async import(em: EntityManager, exportFilePath = DEFAULT_EXPORT_PATH): Promise<void> {
+    if (!fs.existsSync(exportFilePath)) {
+      throw new Error(
+        `Cannot perform offchain data import! Export file ${exportFilePath} does not exist!`
+      )
+    }
+    const { data }: ExportedState = JSON.parse(fs.readFileSync(exportFilePath, 'utf-8'))
+    this.logger.info('Importing offchain state')
+    for (const [entityName, { type, values }] of Object.entries(data)) {
+      this.logger.info(
+        `${type === 'update' ? 'Updating' : 'Inserting'} ${values.length} ${entityName} entities...`
+      )
+      if (type === 'update') {
+        // We're using "batched" updates, because otherwise the process becomes extremely slow
+        const meta = em.connection.getMetadata(entityName)
+        const batchSize = 1000
+        let batchNumber = 1
+        const fieldNames = Object.keys(values[0])
+        const fieldTypes = Object.fromEntries(
+          fieldNames.map((fieldName) => {
+            const metaType = meta.columns.find(
+              (c) => c.databaseNameWithoutPrefixes === fieldName
+            )?.type
+            return [fieldName, metaType === String ? 'text' : metaType]
+          })
+        )
+        while (values.length) {
+          ++batchNumber
+          const batch = values.splice(0, batchSize)
+          this.logger.info(
+            `Executing batch #${batchNumber} of ${batch.length} entities (${values.length} entities left)...`
+          )
+          let paramCounter = 1
+          await em.query(
+            `UPDATE "${meta.tableName}"
+            SET ${fieldNames
+              .filter((f) => f !== 'id')
+              .map((f) => `"${f}" = "data"."${f}"`)
+              .join(', ')}
+            FROM (
+              SELECT
+              ${fieldNames
+                .map((fieldName) => {
+                  return `unnest($${paramCounter++}::${fieldTypes[fieldName]}[])
+                  AS "${fieldName}"`
+                })
+                .join(', ')}
+            ) AS "data"
+            WHERE "${meta.tableName}"."id" = "data"."id"`,
+            fieldNames.map((fieldName) => batch.map((v) => v[fieldName]))
+          )
+        }
+      } else {
+        await em.getRepository(entityName).insert(values)
+      }
+      this.logger.info(
+        `Done ${type === 'update' ? 'updating' : 'inserting'} ${entityName} entities`
+      )
+    }
+    const renamedExportFilePath = `${exportFilePath}.imported`
+    this.logger.info(`Renaming export file to ${renamedExportFilePath})...`)
+    fs.renameSync(exportFilePath, renamedExportFilePath)
+    this._isImported = true
+    this.logger.info('Done')
+  }
+
+  public getExportBlockNumber(exportFilePath = DEFAULT_EXPORT_PATH): number {
+    if (!fs.existsSync(exportFilePath)) {
+      this.logger.warn(`Export file ${exportFilePath} does not exist`)
+      this._isImported = true
+      return -1
+    }
+    const { blockNumber }: ExportedState = JSON.parse(fs.readFileSync(exportFilePath, 'utf-8'))
+    this.logger.info(`Last export block number established: ${blockNumber}`)
+    return blockNumber
+  }
+}

+ 0 - 0
src/server-extension/orm.ts → src/utils/orm.ts