浏览代码

storage-node-v2: Add localDataObjectIDs cache.

Shamil Gadelshin 3 年之前
父节点
当前提交
b5d1bfbb70

+ 6 - 1
storage-node-v2/src/commands/server.ts

@@ -2,6 +2,7 @@ import { flags } from '@oclif/command'
 import { createApp } from '../services/webApi/app'
 import ApiCommandBase from '../command-base/ApiCommandBase'
 import logger, { initElasticLogger } from '../services/logger'
+import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
 import { ApiPromise } from '@polkadot/api'
 import { performSync, TempDirName } from '../services/sync/synchronizer'
 import sleep from 'sleep-promise'
@@ -84,6 +85,10 @@ Supported values: warn, error, debug, info. Default:debug`,
 
     await recreateTempDirectory(flags.uploads, TempDirName)
 
+    if (fs.existsSync(flags.uploads)) {
+      await loadDataObjectIdCache(flags.uploads, TempDirName)
+    }
+
     if (!_.isEmpty(flags.elasticSearchEndpoint)) {
       initElasticLogger(flags.elasticSearchEndpoint ?? '')
     }
@@ -174,7 +179,7 @@ async function runSyncWithInterval(
   syncWorkersNumber: number,
   syncIntervalMinutes: number
 ) {
-  const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
+  const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 10
   while (true) {
     logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
     await sleep(sleepIntevalInSeconds)

+ 84 - 0
storage-node-v2/src/services/caching/localDataObjects.ts

@@ -0,0 +1,84 @@
+import AwaitLock from 'await-lock'
+import path from 'path'
+import fs from 'fs'
+import logger from '../logger'
+const fsPromises = fs.promises
+
+// Local in-memory cache for IDs.
+let idCache = new Set<string>()
+
+const lock = new AwaitLock()
+
+/**
+ * Return the current ID cache.
+ *
+ * @returns ID array.
+ *
+ */
+export async function getDataObjectIDs(): Promise<string[]> {
+  await lock.acquireAsync()
+  const ids = Array.from(idCache)
+  lock.release()
+
+  return ids
+}
+
+/**
+ * Loads ID cache from the uploading directory.
+ *
+ * @returns empty promise.
+ *
+ * @param uploadDir - uploading directory
+ * @param tempDirName - temp directory name
+ */
+export async function loadDataObjectIdCache(uploadDir: string, tempDirName: string): Promise<void> {
+  await lock.acquireAsync()
+
+  const localIds = await getLocalFileNames(uploadDir)
+  // Filter temporary directory name.
+  const tempDirectoryName = path.parse(tempDirName).name
+  const ids = localIds.filter((dataObjectId) => dataObjectId !== tempDirectoryName)
+
+  idCache = new Set(ids)
+
+  logger.debug(`Local ID cache loaded.`)
+
+  lock.release()
+}
+
+/**
+ * Adds data object ID to the local cache.
+ *
+ * @param dataObjectId - uploading directory
+ *
+ * @returns empty promise.
+ */
+export async function addDataObjectIdToCache(dataObjectId: string): Promise<void> {
+  await lock.acquireAsync()
+
+  idCache.add(dataObjectId)
+
+  lock.release()
+}
+
+/**
+ * Deletes data object ID from the local cache.
+ *
+ * @param dataObjectId - uploading directory
+ */
+export async function deleteDataObjectIdFromCache(dataObjectId: string): Promise<void> {
+  await lock.acquireAsync()
+
+  idCache.delete(dataObjectId)
+
+  lock.release()
+}
+
+/**
+ * Returns file names from the local directory.
+ *
+ * @param directory - local directory to get file names from
+ */
+function getLocalFileNames(directory: string): Promise<string[]> {
+  return fsPromises.readdir(directory)
+}

+ 1 - 1
storage-node-v2/src/services/caching/newUploads.ts

@@ -20,7 +20,7 @@ const newDataObjects = new NodeCache({
  *
  * @returns nonce string.
  */
-export function registerNewDataObjectId(dataObjectId: string) {
+export function registerNewDataObjectId(dataObjectId: string): void {
   newDataObjects.set(dataObjectId, null, ExpirationPeriod)
 }
 

+ 0 - 0
storage-node-v2/src/services/runtime/transactionNonceKeeper.ts → storage-node-v2/src/services/caching/transactionNonceKeeper.ts


+ 1 - 1
storage-node-v2/src/services/runtime/api.ts

@@ -6,7 +6,7 @@ import { TypeRegistry } from '@polkadot/types'
 import { KeyringPair } from '@polkadot/keyring/types'
 import { SubmittableExtrinsic, AugmentedEvent } from '@polkadot/api/types'
 import { DispatchError, DispatchResult } from '@polkadot/types/interfaces/system'
-import { getTransactionNonce, resetTransactionNonceCache } from './transactionNonceKeeper'
+import { getTransactionNonce, resetTransactionNonceCache } from '../caching/transactionNonceKeeper'
 import logger from '../../services/logger'
 import ExitCodes from '../../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'

+ 2 - 28
storage-node-v2/src/services/sync/synchronizer.ts

@@ -1,12 +1,10 @@
 import { getStorageObligationsFromRuntime, DataObligations } from './storageObligations'
 import logger from '../../services/logger'
+import { getDataObjectIDs } from '../../services/caching/localDataObjects'
 import { SyncTask, DownloadFileTask, DeleteLocalFileTask, PrepareDownloadFileTask } from './tasks'
 import { WorkingStack, TaskProcessorSpawner, TaskSink } from './workingProcess'
 import _ from 'lodash'
-import fs from 'fs'
-import path from 'path'
 import { ApiPromise } from '@polkadot/api'
-const fsPromises = fs.promises
 
 /**
  * Temporary directory name for data uploading.
@@ -40,7 +38,7 @@ export async function performSync(
   logger.info('Started syncing...')
   const [model, files] = await Promise.all([
     getStorageObligationsFromRuntime(queryNodeUrl, workerId),
-    getLocalDataObjects(uploadDirectory, TempDirName),
+    getDataObjectIDs(),
   ])
 
   const requiredIds = model.dataObjects.map((obj) => obj.id)
@@ -164,27 +162,3 @@ async function getDownloadTasks(
 
   return addedTasks
 }
-
-/**
- * Returns local data objects info.
- *
- * @param uploadDirectory - local directory to get file names from
- */
-export async function getLocalDataObjects(uploadDirectory: string, tempDirName: string): Promise<string[]> {
-  const localIds = await getLocalFileNames(uploadDirectory)
-
-  // Filter temporary directory name.
-  const tempDirectoryName = path.parse(tempDirName).name
-  const data = localIds.filter((dataObjectId) => dataObjectId !== tempDirectoryName)
-
-  return data
-}
-
-/**
- * Returns file names from the local directory.
- *
- * @param directory - local directory to get file names from
- */
-async function getLocalFileNames(directory: string): Promise<string[]> {
-  return fsPromises.readdir(directory)
-}

+ 11 - 7
storage-node-v2/src/services/sync/tasks.ts

@@ -10,6 +10,7 @@ import _ from 'lodash'
 import { getRemoteDataObjects } from './remoteStorageData'
 import { TaskSink } from './workingProcess'
 import { isNewDataObject } from '../caching/newUploads'
+import { addDataObjectIdToCache, deleteDataObjectIdFromCache } from '../caching/localDataObjects'
 import { hashFile } from '../helpers/hashing'
 import { parseBagId } from '../helpers/bagTypes'
 import { hexToString } from '@polkadot/util'
@@ -50,12 +51,14 @@ export class DeleteLocalFileTask implements SyncTask {
   async execute(): Promise<void> {
     const dataObjectId = this.filename
     if (isNewDataObject(dataObjectId)) {
-      logger.warn('Sync - possible QueryNode update delay (new file) - deleting file canceled: ${this.filename}')
+      logger.warn(`Sync - possible QueryNode update delay (new file) - deleting file canceled: ${this.filename}`)
       return
     }
 
     const fullPath = path.join(this.uploadsDirectory, this.filename)
-    return fsPromises.unlink(fullPath)
+    await fsPromises.unlink(fullPath)
+
+    await deleteDataObjectIdFromCache(dataObjectId)
   }
 }
 
@@ -63,7 +66,7 @@ export class DeleteLocalFileTask implements SyncTask {
  * Download the file from the remote storage node to the local storage.
  */
 export class DownloadFileTask implements SyncTask {
-  id: string
+  dataObjectId: string
   expectedHash?: string
   uploadsDirectory: string
   tempDirectory: string
@@ -71,16 +74,16 @@ export class DownloadFileTask implements SyncTask {
 
   constructor(
     baseUrl: string,
-    id: string,
+    dataObjectId: string,
     expectedHash: string | undefined,
     uploadsDirectory: string,
     tempDirectory: string
   ) {
-    this.id = id
+    this.dataObjectId = dataObjectId
     this.expectedHash = expectedHash
     this.uploadsDirectory = uploadsDirectory
     this.tempDirectory = tempDirectory
-    this.url = urljoin(baseUrl, 'api/v1/files', id)
+    this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId)
   }
 
   description(): string {
@@ -89,7 +92,7 @@ export class DownloadFileTask implements SyncTask {
 
   async execute(): Promise<void> {
     const streamPipeline = promisify(pipeline)
-    const filepath = path.join(this.uploadsDirectory, this.id)
+    const filepath = path.join(this.uploadsDirectory, this.dataObjectId)
     // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
     // This partial downloads will be cleaned up during the next sync iteration.
     const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
@@ -108,6 +111,7 @@ export class DownloadFileTask implements SyncTask {
       await streamPipeline(request, fileStream)
       await this.verifyDownloadedFile(tempFilePath)
       await fsPromises.rename(tempFilePath, filepath)
+      await addDataObjectIdToCache(this.dataObjectId)
     } catch (err) {
       logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err })
       try {

+ 5 - 3
storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -8,6 +8,7 @@ import {
 } from '../../helpers/auth'
 import { hashFile } from '../../helpers/hashing'
 import { registerNewDataObjectId } from '../../caching/newUploads'
+import { addDataObjectIdToCache } from '../../caching/localDataObjects'
 import { createNonce, getTokenExpirationTime } from '../../caching/tokenNonceKeeper'
 import { getFileInfo } from '../../helpers/fileInfo'
 import { BagId } from '@joystream/types/storage'
@@ -99,7 +100,6 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
   try {
     const fileObj = getFileObject(req)
     cleanupFileName = fileObj.path
-
     const queryNodeUrl = getQueryNodeUrl(res)
     const workerId = getWorkerId(res)
 
@@ -113,10 +113,12 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
     const accepted = await verifyDataObjectInfo(api, bagId, uploadRequest.dataObjectId, fileObj.size, hash)
 
     // Prepare new file name
+    const dataObjectId = uploadRequest.dataObjectId.toString()
     const uploadsDir = getUploadsDir(res)
-    const newPath = path.join(uploadsDir, uploadRequest.dataObjectId.toString())
+    const newPath = path.join(uploadsDir, dataObjectId)
 
-    registerNewDataObjectId(uploadRequest.dataObjectId.toString())
+    registerNewDataObjectId(dataObjectId)
+    await addDataObjectIdToCache(dataObjectId)
 
     // Overwrites existing file.
     await fsPromises.rename(fileObj.path, newPath)

+ 3 - 25
storage-node-v2/src/services/webApi/controllers/stateApi.ts

@@ -1,4 +1,4 @@
-import { getLocalDataObjects } from '../../../services/sync/synchronizer'
+import { getDataObjectIDs } from '../../../services/caching/localDataObjects'
 import * as express from 'express'
 import _ from 'lodash'
 import { getDataObjectIDsByBagId } from '../../sync/storageObligations'
@@ -34,10 +34,7 @@ export async function getAllLocalDataObjects(
   res: express.Response<unknown, AppConfig>
 ): Promise<void> {
   try {
-    const uploadsDir = getUploadsDir(res)
-    const tempFileDir = getTempFileUploadingDir(res)
-
-    const ids = await getCachedLocalDataObjects(uploadsDir, tempFileDir)
+    const ids = await getDataObjectIDs()
 
     res.status(200).json(ids)
   } catch (err) {
@@ -101,14 +98,11 @@ export async function getLocalDataObjectsByBagId(
   res: express.Response<unknown, AppConfig>
 ): Promise<void> {
   try {
-    const uploadsDir = getUploadsDir(res)
-    const tempFileDir = getTempFileUploadingDir(res)
-
     const queryNodeUrl = getQueryNodeUrl(res)
     const bagId = getBagId(req)
 
     const [ids, requiredIds] = await Promise.all([
-      getCachedLocalDataObjects(uploadsDir, tempFileDir),
+      getDataObjectIDs(),
       getCachedDataObjectsObligations(queryNodeUrl, bagId),
     ])
 
@@ -153,22 +147,6 @@ function getBagId(req: express.Request): string {
   throw new WebApiError('No bagId provided.', 400)
 }
 
-/**
- * Returns cached data objects IDs from the local data storage. Data could be
- * obsolete until cache expiration.
- *
- */
-async function getCachedLocalDataObjects(uploadsDir: string, tempDirName: string): Promise<string[]> {
-  const entryName = 'local_data_object'
-
-  if (!dataCache.has(entryName)) {
-    const data = await getLocalDataObjects(uploadsDir, tempDirName)
-
-    dataCache.set(entryName, data)
-  }
-  return dataCache.get(entryName) ?? []
-}
-
 /**
  * Returns cached data objects IDs from the local data storage. Data could be
  * obsolete until cache expiration.