Browse Source

Cache cleanup, intervals configuration, duplicated objectIds in fetchSupportedDataObjects fix

Leszek Wiesner 3 years ago
parent
commit
dbd9112604

+ 5 - 1
distributor-node/config.yml

@@ -9,13 +9,17 @@ directories:
   logs: ./local/logs
 log:
   file: debug
-  console: info
+  console: verbose
   # elastic: info
 limits:
   storage: 100G
   maxConcurrentStorageNodeDownloads: 100
   maxConcurrentOutboundConnections: 300
   outboundRequestsTimeout: 5000
+intervals:
+  saveCacheState: 60
+  checkStorageNodeResponseTimes: 60
+  cacheCleanup: 60
 port: 3334
 keys: [//Alice]
 buckets: 'all'

+ 4 - 0
distributor-node/config/docker/config.docker.yml

@@ -15,6 +15,10 @@ limits:
   maxConcurrentStorageNodeDownloads: 100
   maxConcurrentOutboundConnections: 300
   outboundRequestsTimeout: 5000
+intervals:
+  saveCacheState: 60
+  checkStorageNodeResponseTimes: 60
+  cacheCleanup: 60
 port: 3334
 keys: [//Alice]
 buckets: 'all'

+ 24 - 6
distributor-node/src/app/index.ts

@@ -7,6 +7,7 @@ import { ServerService } from '../services/server/ServerService'
 import { Logger } from 'winston'
 import fs from 'fs'
 import nodeCleanup from 'node-cleanup'
+import { AppIntervals } from '../types/app'
 
 export class App {
   private config: ReadonlyConfig
@@ -16,17 +17,35 @@ export class App {
   private server: ServerService
   private logging: LoggingService
   private logger: Logger
+  private intervals: AppIntervals | undefined
 
   constructor(config: ReadonlyConfig) {
     this.config = config
     this.logging = LoggingService.withAppConfig(config)
     this.stateCache = new StateCacheService(config, this.logging)
-    this.content = new ContentService(config, this.logging, this.stateCache)
     this.networking = new NetworkingService(config, this.stateCache, this.logging)
+    this.content = new ContentService(config, this.logging, this.networking, this.stateCache)
     this.server = new ServerService(config, this.stateCache, this.content, this.logging, this.networking)
     this.logger = this.logging.createLogger('App')
   }
 
+  private setIntervals() {
+    this.intervals = {
+      saveCacheState: setInterval(() => this.stateCache.save(), this.config.intervals.saveCacheState * 1000),
+      checkStorageNodeResponseTimes: setInterval(
+        () => this.networking.checkActiveStorageNodeEndpoints(),
+        this.config.intervals.checkStorageNodeResponseTimes * 1000
+      ),
+      cacheCleanup: setInterval(() => this.content.cacheCleanup(), this.config.intervals.cacheCleanup * 1000),
+    }
+  }
+
+  private clearIntervals() {
+    if (this.intervals) {
+      Object.values(this.intervals).forEach((interval) => clearInterval(interval))
+    }
+  }
+
   private checkConfigDirectories(): void {
     Object.entries(this.config.directories).forEach(([name, path]) => {
       const dirInfo = `${name} directory (${path})`
@@ -51,12 +70,12 @@ export class App {
   }
 
   public async start(): Promise<void> {
-    this.logger.info('Starting the app')
+    this.logger.info('Starting the app', { config: this.config })
     try {
       this.checkConfigDirectories()
       this.stateCache.load()
-      const dataObjects = await this.networking.fetchSupportedDataObjects()
-      await this.content.startupInit(dataObjects)
+      await this.content.startupInit()
+      this.setIntervals()
       this.server.start()
     } catch (err) {
       this.logger.error('Node initialization failed!', { err })
@@ -102,8 +121,7 @@ export class App {
   private exitHandler(exitCode: number | null, signal: string | null): boolean | undefined {
     this.logger.info('Exiting...')
     // Clear intervals
-    this.stateCache.clearInterval()
-    this.networking.clearIntervals()
+    this.clearIntervals()
     // Stop the server
     this.server.stop()
     // Save cache

+ 1 - 7
distributor-node/src/services/cache/StateCacheService.ts

@@ -32,7 +32,6 @@ export class StateCacheService {
   private logger: Logger
   private config: ReadonlyConfig
   private cacheFilePath: string
-  private saveInterval: NodeJS.Timeout
 
   private memoryState = {
     pendingDownloadsByObjectId: new Map<string, PendingDownloadData>(),
@@ -45,11 +44,10 @@ export class StateCacheService {
     mimeTypeByObjectId: new Map<string, string>(),
   }
 
-  public constructor(config: ReadonlyConfig, logging: LoggingService, saveIntervalMs = 60 * 1000) {
+  public constructor(config: ReadonlyConfig, logging: LoggingService) {
     this.logger = logging.createLogger('StateCacheService')
     this.cacheFilePath = `${config.directories.cache}/cache.json`
     this.config = config
-    this.saveInterval = setInterval(() => this.save(), saveIntervalMs)
   }
 
   public setContentMimeType(objectId: string, mimeType: string): void {
@@ -289,8 +287,4 @@ export class StateCacheService {
       this.logger.warn(`Cache file (${this.cacheFilePath}) is empty. Starting from scratch`)
     }
   }
-
-  public clearInterval(): void {
-    clearInterval(this.saveInterval)
-  }
 }

+ 38 - 9
distributor-node/src/services/content/ContentService.ts

@@ -1,12 +1,12 @@
 import fs from 'fs'
-import { ReadonlyConfig, DataObjectData } from '../../types'
+import { ReadonlyConfig } from '../../types'
 import { StateCacheService } from '../cache/StateCacheService'
 import { LoggingService } from '../logging'
 import { Logger } from 'winston'
 import { FileContinousReadStream, FileContinousReadStreamOptions } from './FileContinousReadStream'
 import FileType from 'file-type'
-import _ from 'lodash'
 import { Readable, pipeline } from 'stream'
+import { NetworkingService } from '../networking'
 
 export const DEFAULT_CONTENT_TYPE = 'application/octet-stream'
 
@@ -15,6 +15,7 @@ export class ContentService {
   private dataDir: string
   private logger: Logger
   private stateCache: StateCacheService
+  private networking: NetworkingService
 
   private contentSizeSum = 0
 
@@ -26,22 +27,50 @@ export class ContentService {
     return this.config.limits.storage - this.contentSizeSum
   }
 
-  public constructor(config: ReadonlyConfig, logging: LoggingService, stateCache: StateCacheService) {
+  public constructor(
+    config: ReadonlyConfig,
+    logging: LoggingService,
+    networking: NetworkingService,
+    stateCache: StateCacheService
+  ) {
     this.config = config
     this.logger = logging.createLogger('ContentService')
     this.stateCache = stateCache
+    this.networking = networking
     this.dataDir = config.directories.data
   }
 
-  public async startupInit(supportedObjects: DataObjectData[]): Promise<void> {
-    const dataObjectsById = _.groupBy(supportedObjects, (o) => o.objectId)
+  public async cacheCleanup(): Promise<void> {
+    const supportedObjects = await this.networking.fetchSupportedDataObjects()
+    const cachedObjectsIds = this.stateCache.getCachedObjectsIds()
+    let droppedObjects = 0
+
+    this.logger.verbose('Performing cache cleanup...', {
+      supportedObjects: supportedObjects.size,
+      objectsInCache: cachedObjectsIds.length,
+    })
+
+    for (const objectId of cachedObjectsIds) {
+      if (!supportedObjects.has(objectId)) {
+        this.drop(objectId, 'No longer supported')
+        ++droppedObjects
+      }
+    }
+
+    this.logger.verbose('Cache cleanup finished', {
+      droppedObjects,
+    })
+  }
+
+  public async startupInit(): Promise<void> {
+    const supportedObjects = await this.networking.fetchSupportedDataObjects()
     const dataDirFiles = fs.readdirSync(this.dataDir)
     const filesCountOnStartup = dataDirFiles.length
     const cachedObjectsIds = this.stateCache.getCachedObjectsIds()
     const cacheItemsCountOnStartup = cachedObjectsIds.length
 
     this.logger.info('ContentService initializing...', {
-      supportedObjects: supportedObjects.length,
+      supportedObjects: supportedObjects.size,
       filesCountOnStartup,
       cacheItemsCountOnStartup,
     })
@@ -53,14 +82,14 @@ export class ContentService {
       this.contentSizeSum += fileSize
 
       // Drop files that are not part of current chain assignment
-      const [objectById] = dataObjectsById[objectId] || []
-      if (!objectById) {
+      const dataObject = supportedObjects.get(objectId)
+      if (!dataObject) {
         this.drop(objectId, 'Not supported')
         continue
       }
 
       // Compare file size to expected one
-      const { size: dataObjectSize } = objectById
+      const { size: dataObjectSize } = dataObject
       if (fileSize !== dataObjectSize) {
         // Existing file size does not match the expected one
         const msg = `Unexpected file size. Expected: ${dataObjectSize}, actual: ${fileSize}`

+ 4 - 12
distributor-node/src/services/networking/NetworkingService.ts

@@ -20,9 +20,9 @@ import http from 'http'
 import https from 'https'
 import { parseAxiosError } from '../parsers/errors'
 
+// Concurrency limits
 const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10
 const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
-const STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS = 60000
 
 export class NetworkingService {
   private config: ReadonlyConfig
@@ -32,7 +32,6 @@ export class NetworkingService {
   private stateCache: StateCacheService
   private logger: Logger
 
-  private storageNodeEndpointsCheckInterval: NodeJS.Timeout
   private testLatencyQueue: queue
   private downloadQueue: queue
 
@@ -52,9 +51,6 @@ export class NetworkingService {
     this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
     // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
     void this.checkActiveStorageNodeEndpoints()
-    this.storageNodeEndpointsCheckInterval = setInterval(async () => {
-      await this.checkActiveStorageNodeEndpoints()
-    }, STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS)
     // Queues
     this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
       'end',
@@ -67,10 +63,6 @@ export class NetworkingService {
     this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
   }
 
-  public clearIntervals(): void {
-    clearInterval(this.storageNodeEndpointsCheckInterval)
-  }
-
   private validateNodeEndpoint(endpoint: string): void {
     const endpointUrl = new URL(endpoint)
     if (endpointUrl.protocol !== 'http:' && endpointUrl.protocol !== 'https:') {
@@ -284,17 +276,17 @@ export class NetworkingService {
     return downloadPromise
   }
 
-  async fetchSupportedDataObjects(): Promise<DataObjectData[]> {
+  async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
     const data =
       this.config.buckets === 'all'
         ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
         : await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
-    const objectsData: DataObjectData[] = []
+    const objectsData = new Map<string, DataObjectData>()
     data.forEach((bucket) => {
       bucket.bagAssignments.forEach((a) => {
         a.storageBag.objects.forEach((object) => {
           const { ipfsHash, id, size } = object
-          objectsData.push({ contentHash: ipfsHash, objectId: id, size: parseInt(size) })
+          objectsData.set(id, { contentHash: ipfsHash, objectId: id, size: parseInt(size) })
         })
       })
     })

+ 1 - 2
distributor-node/src/services/server/controllers/public.ts

@@ -39,8 +39,7 @@ export class PublicApiController {
     next: express.NextFunction,
     objectId: string
   ): void {
-    // TODO: FIXME: Actually check if we are still supposed to serve it and just remove after responding if not
-    // TODO: Limit the number of times useContent is trigerred for similar requests
+    // TODO: Limit the number of times useContent is trigerred for similar requests?
     // (for example: same ip, 3 different request within a minute = 1 request)
     this.stateCache.useContent(objectId)
 

+ 6 - 1
distributor-node/src/services/validation/schemas/configSchema.ts

@@ -7,7 +7,7 @@ export const bytesizeRegex = new RegExp(`^[0-9]+(${bytesizeUnits.join('|')})$`)
 
 export const configSchema: JSONSchema4 = {
   type: 'object',
-  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'workerId', 'limits'],
+  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'workerId', 'limits', 'intervals'],
   additionalProperties: false,
   properties: {
     id: { type: 'string' },
@@ -41,6 +41,11 @@ export const configSchema: JSONSchema4 = {
       maxConcurrentOutboundConnections: { type: 'integer', minimum: 1 },
       outboundRequestsTimeout: { type: 'integer', minimum: 1 },
     }),
+    intervals: strictObject({
+      saveCacheState: { type: 'integer', minimum: 1 },
+      checkStorageNodeResponseTimes: { type: 'integer', minimum: 1 },
+      cacheCleanup: { type: 'integer', minimum: 1 },
+    }),
     port: { type: 'integer', minimum: 0 },
     keys: { type: 'array', items: { type: 'string' }, minItems: 1 },
     buckets: {

+ 5 - 0
distributor-node/src/types/app.ts

@@ -0,0 +1,5 @@
+export type AppIntervals = {
+  saveCacheState: NodeJS.Timeout
+  checkStorageNodeResponseTimes: NodeJS.Timeout
+  cacheCleanup: NodeJS.Timeout
+}

+ 5 - 0
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -28,6 +28,11 @@ export interface ConfigJson {
     maxConcurrentOutboundConnections: number
     outboundRequestsTimeout: number
   }
+  intervals: {
+    saveCacheState: number
+    checkStorageNodeResponseTimes: number
+    cacheCleanup: number
+  }
   port: number
   keys: [string, ...string[]]
   buckets: [number, ...number[]] | 'all'