Browse Source

Implement pendingDownloadTimeout and maxCachedItemSize

Leszek Wiesner 3 years ago
parent
commit
e1d1049b58

+ 3 - 1
distributor-node/config.yml

@@ -15,7 +15,9 @@ limits:
   storage: 100G
   maxConcurrentStorageNodeDownloads: 100
   maxConcurrentOutboundConnections: 300
-  outboundRequestsTimeout: 5000
+  outboundRequestsTimeoutMs: 5000
+  pendingDownloadTimeoutSec: 3600
+  maxCachedItemSize: 1G
 intervals:
   saveCacheState: 60
   checkStorageNodeResponseTimes: 60

+ 3 - 1
distributor-node/config/docker/config.docker.yml

@@ -14,7 +14,9 @@ limits:
   storage: 100G
   maxConcurrentStorageNodeDownloads: 100
   maxConcurrentOutboundConnections: 300
-  outboundRequestsTimeout: 5000
+  outboundRequestsTimeoutMs: 5000
+  pendingDownloadTimeoutSec: 3600
+  maxCachedItemSize: 1G
 intervals:
   saveCacheState: 60
   checkStorageNodeResponseTimes: 60

+ 21 - 3
distributor-node/src/schemas/configSchema.ts

@@ -87,7 +87,8 @@ export const configSchema: JSONSchema4 = {
         'storage',
         'maxConcurrentStorageNodeDownloads',
         'maxConcurrentOutboundConnections',
-        'outboundRequestsTimeout',
+        'outboundRequestsTimeoutMs',
+        'pendingDownloadTimeoutSec',
       ],
       description: 'Specifies node limits w.r.t. storage, outbound connections etc.',
       additionalProperties: false,
@@ -103,13 +104,30 @@ export const configSchema: JSONSchema4 = {
           minimum: 1,
         },
         maxConcurrentOutboundConnections: {
-          description: 'Maximum number of total simultaneous outbound connections to storage node(s)',
+          description:
+            'Maximum number of total simultaneous outbound connections to storage node(s) (excluding proxy connections)',
           type: 'integer',
           minimum: 1,
         },
-        outboundRequestsTimeout: {
+        outboundRequestsTimeoutMs: {
           description: 'Timeout for all outbound storage node http requests in miliseconds',
           type: 'integer',
+          minimum: 1000,
+        },
+        pendingDownloadTimeoutSec: {
+          description: 'Timeout for pending storage node downloads in seconds',
+          type: 'integer',
+          minimum: 60,
+        },
+        maxCachedItemSize: {
+          description: 'Maximum size of a data object allowed to be cached by the node',
+          type: 'string',
+          pattern: bytesizeRegex.source,
+        },
+        dataObjectSourceByObjectIdTTL: {
+          description:
+            'TTL (in seconds) for dataObjectSourceByObjectId cache used when proxying objects of size greater than maxCachedItemSize to the right storage node. Defaults to `60` if not specified.',
+          type: 'integer',
           minimum: 1,
         },
       },

+ 26 - 0
distributor-node/src/services/cache/StateCacheService.ts

@@ -3,12 +3,15 @@ import { PendingDownloadData, PendingDownloadStatus, ReadonlyConfig, StorageNode
 import { LoggingService } from '../logging'
 import _ from 'lodash'
 import fs from 'fs'
+import NodeCache from 'node-cache'
 
 // LRU-SP cache parameters
 // Since size is in KB, these parameters should be enough for grouping objects of size up to 2^24 KB = 16 GB
 export const CACHE_GROUP_LOG_BASE = 2
 export const CACHE_GROUPS_COUNT = 24
 
+export const DEFAULT_DATA_OBJECT_SOURCE_CACHE_TTL = 60
+
 export interface StorageNodeEndpointData {
   last10ResponseTimes: number[]
 }
@@ -28,6 +31,9 @@ export class StateCacheService {
     pendingDownloadsByObjectId: new Map<string, PendingDownloadData>(),
     storageNodeEndpointDataByEndpoint: new Map<string, StorageNodeEndpointData>(),
     groupNumberByObjectId: new Map<string, number>(),
+    dataObjectSourceByObjectId: new NodeCache({
+      deleteOnExpire: true,
+    }),
   }
 
   private storedState = {
@@ -201,6 +207,26 @@ export class StateCacheService {
     ])
   }
 
+  public cacheDataObjectSource(objectId: string, source: string): void {
+    this.memoryState.dataObjectSourceByObjectId.set<string>(
+      objectId,
+      source,
+      this.config.limits.dataObjectSourceByObjectIdTTL || DEFAULT_DATA_OBJECT_SOURCE_CACHE_TTL
+    )
+  }
+
+  public getCachedDataObjectSource(objectId: string): string | undefined {
+    return this.memoryState.dataObjectSourceByObjectId.get<string | undefined>(objectId)
+  }
+
+  public dropCachedDataObjectSource(objectId: string, expectedSource?: string): void {
+    const cachedSource = this.memoryState.dataObjectSourceByObjectId.get<string | undefined>(objectId)
+    if (!expectedSource || cachedSource === expectedSource) {
+      this.logger.info('Force-dropping cached dataObjectSource', { objectId, cachedSource, expectedSource })
+      this.memoryState.dataObjectSourceByObjectId.del(objectId)
+    }
+  }
+
   private serializeData() {
     const { lruCacheGroups, mimeTypeByObjectId } = this.storedState
     return JSON.stringify(

+ 25 - 0
distributor-node/src/services/httpApi/controllers/public.ts

@@ -45,6 +45,31 @@ export class PublicApiController {
   ): Promise<void> {
     const { objectId, size, contentHash } = objectData
 
+    const { maxCachedItemSize } = this.config.limits
+    if (maxCachedItemSize && size > maxCachedItemSize) {
+      this.logger.info(`Requested object is above maxCachedItemSize: ${size}/${maxCachedItemSize}`, {
+        objectId,
+        size,
+        maxCachedItemSize,
+      })
+      const sourceRootApiEndpoint = await this.networking.getDataObjectDownloadSource(objectData)
+      const sourceUrl = new URL(`files/${objectId}`, `${sourceRootApiEndpoint}/`)
+      res.setHeader('x-cache', 'miss')
+      res.setHeader('x-data-source', 'external')
+      this.logger.info(`Proxying request to ${sourceUrl.toString()}`, { objectId, sourceUrl: sourceUrl.toString() })
+      return proxy(sourceUrl.origin, {
+        proxyReqPathResolver: () => sourceUrl.pathname,
+        proxyErrorHandler: (err, res, next) => {
+          this.logger.error(`Proxy request to ${sourceUrl} failed!`, {
+            objectId,
+            sourceUrl: sourceUrl.toString(),
+          })
+          this.stateCache.dropCachedDataObjectSource(objectId, sourceRootApiEndpoint)
+          next(err)
+        },
+      })(req, res, next)
+    }
+
     const downloadResponse = await this.networking.downloadDataObject({ objectData })
 
     if (downloadResponse) {

+ 93 - 23
distributor-node/src/services/networking/NetworkingService.ts

@@ -5,7 +5,7 @@ import { LoggingService } from '../logging'
 import { StorageNodeApi } from './storage-node/api'
 import { StateCacheService } from '../cache/StateCacheService'
 import { DataObjectDetailsFragment } from './query-node/generated/queries'
-import axios, { AxiosRequestConfig } from 'axios'
+import axios from 'axios'
 import {
   StorageNodeEndpointData,
   DataObjectAccessPoints,
@@ -23,7 +23,7 @@ import https from 'https'
 import { parseAxiosError } from '../parsers/errors'
 
 // Concurrency limits
-export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10
+export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_OBJECT = 10
 export const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
 
 export class NetworkingService {
@@ -37,10 +37,10 @@ export class NetworkingService {
   private downloadQueue: queue
 
   constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
-    axios.defaults.timeout = config.limits.outboundRequestsTimeout
+    axios.defaults.timeout = config.limits.outboundRequestsTimeoutMs
     const httpConfig: http.AgentOptions | https.AgentOptions = {
       keepAlive: true,
-      timeout: config.limits.outboundRequestsTimeout,
+      timeout: config.limits.outboundRequestsTimeoutMs,
       maxSockets: config.limits.maxConcurrentOutboundConnections,
     }
     axios.defaults.httpAgent = new http.Agent(httpConfig)
@@ -161,6 +161,91 @@ export class NetworkingService {
     )
   }
 
+  private async checkObjectAvailability(objectId: string, endpoint: string): Promise<void> {
+    const api = new StorageNodeApi(endpoint, this.logging, this.config)
+    const available = await api.isObjectAvailable(objectId)
+    if (!available) {
+      throw new Error('Not available')
+    }
+  }
+
+  private createDataObjectAvailabilityCheckQueue(objectId: string, storageEndpoints: string[]) {
+    const availabilityQueue = queue({
+      concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_OBJECT,
+      autostart: true,
+    })
+
+    storageEndpoints.forEach(async (endpoint) => {
+      availabilityQueue.push(async () => {
+        await this.checkObjectAvailability(objectId, endpoint)
+        return endpoint
+      })
+    })
+
+    return availabilityQueue
+  }
+
+  public async getDataObjectDownloadSource(objectData: DataObjectData): Promise<string> {
+    const { objectId } = objectData
+    const cachedSource = await this.checkCachedDataObjectSource(objectId)
+    if (cachedSource) {
+      this.logger.info(`Found active download source for object ${objectId} in cache`, { objectId, cachedSource })
+      return cachedSource
+    }
+    return this.findDataObjectDownloadSource(objectData)
+  }
+
+  private async checkCachedDataObjectSource(objectId: string): Promise<string | undefined> {
+    const cachedSource = this.stateCache.getCachedDataObjectSource(objectId)
+    if (cachedSource) {
+      try {
+        await this.checkObjectAvailability(objectId, cachedSource)
+      } catch (err) {
+        this.stateCache.dropCachedDataObjectSource(objectId, cachedSource)
+        return undefined
+      }
+      return cachedSource
+    }
+  }
+
+  private findDataObjectDownloadSource({ objectId, accessPoints }: DataObjectData): Promise<string> {
+    return new Promise((resolve, reject) => {
+      const storageEndpoints = this.sortEndpointsByMeanResponseTime(
+        accessPoints?.storageNodes.map((n) => n.endpoint) || []
+      )
+
+      this.logger.info('Looking for data object source', {
+        objectId,
+        possibleSources: storageEndpoints.map((e) => ({
+          endpoint: e,
+          meanResponseTime: this.stateCache.getStorageNodeEndpointMeanResponseTime(e),
+        })),
+      })
+      if (!storageEndpoints.length) {
+        return reject(new Error('No storage endpoints available to download the data object from'))
+      }
+
+      const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
+
+      availabilityQueue.on('success', (endpoint) => {
+        availabilityQueue.stop()
+        this.stateCache.cacheDataObjectSource(objectId, endpoint)
+        return resolve(endpoint)
+      })
+
+      availabilityQueue.on('error', () => {
+        /*
+        Do nothing.
+        The handler is needed to avoid unhandled promise rejection
+        */
+      })
+
+      availabilityQueue.on('end', () => {
+        return reject(new Error('Failed to find data object download source'))
+      })
+    })
+  }
+
   private downloadJob(
     pendingDownload: PendingDownloadData,
     downloadData: DownloadData,
@@ -209,27 +294,13 @@ export class NetworkingService {
         return fail('No storage endpoints available to download the data object from')
       }
 
-      const availabilityQueue = queue({
-        concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD,
-        autostart: true,
-      })
+      const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
       const objectDownloadQueue = queue({ concurrency: 1, autostart: true })
 
-      storageEndpoints.forEach(async (endpoint) => {
-        availabilityQueue.push(async () => {
-          const api = new StorageNodeApi(endpoint, this.logging)
-          const available = await api.isObjectAvailable(objectId)
-          if (!available) {
-            throw new Error('Not available')
-          }
-          return endpoint
-        })
-      })
-
       availabilityQueue.on('success', (endpoint) => {
         availabilityQueue.stop()
         const job = async () => {
-          const api = new StorageNodeApi(endpoint, this.logging)
+          const api = new StorageNodeApi(endpoint, this.logging, this.config)
           const response = await api.downloadObject(objectId, startAt)
           return response
         }
@@ -336,9 +407,8 @@ export class NetworkingService {
     const start = Date.now()
     this.logger.debug(`Sending storage node response-time check request to: ${endpoint}`, { endpoint })
     try {
-      const api = new StorageNodeApi(endpoint, this.logging)
-      const reqConfig: AxiosRequestConfig = { headers: { connection: 'close' } }
-      await api.stateApi.stateApiGetVersion(reqConfig)
+      const api = new StorageNodeApi(endpoint, this.logging, this.config)
+      await api.getVersion()
       const responseTime = Date.now() - start
       this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
       this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)

+ 63 - 14
distributor-node/src/services/networking/storage-node/api.ts

@@ -1,31 +1,50 @@
-import { Configuration } from './generated'
+import { Configuration, VersionResponse } from './generated'
 import { FilesApi, StateApi } from './generated/api'
-import axios, { AxiosRequestConfig } from 'axios'
+import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'
 import { LoggingService } from '../../logging'
 import { Logger } from 'winston'
-import { StorageNodeDownloadResponse } from '../../../types'
+import { ReadonlyConfig, StorageNodeDownloadResponse } from '../../../types'
 import { parseAxiosError } from '../../parsers/errors'
 
 export class StorageNodeApi {
   private logger: Logger
-  public filesApi: FilesApi
-  public stateApi: StateApi
+  private filesApi: FilesApi
+  private stateApi: StateApi
+  private config: ReadonlyConfig
   public endpoint: string
 
-  public constructor(endpoint: string, logging: LoggingService) {
-    const config = new Configuration({
+  public constructor(endpoint: string, logging: LoggingService, config: ReadonlyConfig) {
+    this.config = config
+    const apiConfig = new Configuration({
       basePath: endpoint,
     })
-    this.filesApi = new FilesApi(config)
-    this.stateApi = new StateApi(config)
+    this.filesApi = new FilesApi(apiConfig)
+    this.stateApi = new StateApi(apiConfig)
     this.endpoint = new URL(endpoint).toString()
     this.logger = logging.createLogger('StorageNodeApi', { endpoint })
   }
 
+  // Adds timeout for the request which can additionaly take into account response processing time.
+  private reqConfigWithTimeout(options: AxiosRequestConfig, timeoutMs: number): [AxiosRequestConfig, NodeJS.Timeout] {
+    const source = axios.CancelToken.source()
+    const timeout = setTimeout(() => {
+      this.logger.error(`Request timeout of ${timeoutMs}ms reached`, { timeoutMs })
+      source.cancel('Request timeout')
+    }, timeoutMs)
+    return [
+      {
+        ...options,
+        cancelToken: source.token,
+      },
+      timeout,
+    ]
+  }
+
   public async isObjectAvailable(objectId: string): Promise<boolean> {
+    const [options, timeout] = this.reqConfigWithTimeout({}, this.config.limits.outboundRequestsTimeoutMs)
     this.logger.debug('Checking object availibility', { objectId })
     try {
-      await this.filesApi.publicApiGetFileHeaders(objectId)
+      await this.filesApi.publicApiGetFileHeaders(objectId, options)
       this.logger.debug('Data object available', { objectId })
       return true
     } catch (err) {
@@ -35,17 +54,47 @@ export class StorageNodeApi {
       }
       this.logger.error('Unexpected error while requesting data object', { objectId, err })
       throw err
+    } finally {
+      clearTimeout(timeout)
     }
   }
 
   public async downloadObject(objectId: string, startAt?: number): Promise<StorageNodeDownloadResponse> {
     this.logger.verbose('Sending download request', { objectId, startAt })
-    const options: AxiosRequestConfig = {
-      responseType: 'stream',
-    }
+    const [options, timeout] = this.reqConfigWithTimeout(
+      {
+        responseType: 'stream',
+      },
+      this.config.limits.pendingDownloadTimeoutSec * 1000
+    )
     if (startAt) {
       options.headers.Range = `bytes=${startAt}-`
     }
-    return this.filesApi.publicApiGetFile(objectId, options)
+    try {
+      const response: StorageNodeDownloadResponse = await this.filesApi.publicApiGetFile(objectId, options)
+      response.data.on('end', () => clearTimeout(timeout))
+      response.data.on('error', () => clearTimeout(timeout))
+      return response
+    } catch (err) {
+      clearTimeout(timeout)
+      throw err
+    }
+  }
+
+  public async getVersion(): Promise<AxiosResponse<VersionResponse>> {
+    const [options, timeout] = this.reqConfigWithTimeout(
+      {
+        headers: {
+          connection: 'close',
+        },
+      },
+      this.config.limits.outboundRequestsTimeoutMs
+    )
+    try {
+      const response = await this.stateApi.stateApiGetVersion(options)
+      return response
+    } finally {
+      clearTimeout(timeout)
+    }
   }
 }

+ 12 - 3
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -7,7 +7,8 @@ import _ from 'lodash'
 import configSchema, { bytesizeUnits } from '../../schemas/configSchema'
 import { JSONSchema4, JSONSchema4TypeName } from 'json-schema'
 
-const MIN_CACHE_SIZE = 20 * Math.pow(1024, 3)
+const MIN_CACHE_SIZE = '20G'
+const MIN_MAX_CACHED_ITEM_SIZE = '1M'
 
 export class ConfigParserService {
   validator: ValidationService
@@ -143,9 +144,16 @@ export class ConfigParserService {
     const directories = this.resolveConfigDirectoryPaths(configJson.directories, configPath)
     const keys = this.resolveConfigKeysPaths(configJson.keys, configPath)
     const storageLimit = this.parseBytesize(configJson.limits.storage)
+    const maxCachedItemSize = configJson.limits.maxCachedItemSize
+      ? this.parseBytesize(configJson.limits.maxCachedItemSize)
+      : undefined
 
-    if (storageLimit < MIN_CACHE_SIZE) {
-      throw new Error('Cache storage limit should be at least 20G!')
+    if (storageLimit < this.parseBytesize(MIN_CACHE_SIZE)) {
+      throw new Error(`Config.limits.storage should be at least ${MIN_CACHE_SIZE}!`)
+    }
+
+    if (maxCachedItemSize && maxCachedItemSize < this.parseBytesize(MIN_MAX_CACHED_ITEM_SIZE)) {
+      throw new Error(`Config.limits.maxCachedItemSize should be at least ${MIN_MAX_CACHED_ITEM_SIZE}!`)
     }
 
     const parsedConfig: Config = {
@@ -155,6 +163,7 @@ export class ConfigParserService {
       limits: {
         ...configJson.limits,
         storage: storageLimit,
+        maxCachedItemSize,
       },
     }
 

+ 2 - 1
distributor-node/src/types/config.ts

@@ -2,8 +2,9 @@ import { DistributorNodeConfiguration as ConfigJson } from './generated/ConfigJs
 import { DeepReadonly } from './common'
 
 export type Config = Omit<ConfigJson, 'limits'> & {
-  limits: Omit<ConfigJson['limits'], 'storage'> & {
+  limits: Omit<ConfigJson['limits'], 'storage' | 'maxCachedItemSize'> & {
     storage: number
+    maxCachedItemSize?: number
   }
 }
 export type ReadonlyConfig = DeepReadonly<Config>

+ 14 - 2
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -86,13 +86,25 @@ export interface DistributorNodeConfiguration {
      */
     maxConcurrentStorageNodeDownloads: number
     /**
-     * Maximum number of total simultaneous outbound connections to storage node(s)
+     * Maximum number of total simultaneous outbound connections to storage node(s) (excluding proxy connections)
      */
     maxConcurrentOutboundConnections: number
     /**
      * Timeout for all outbound storage node http requests in miliseconds
      */
-    outboundRequestsTimeout: number
+    outboundRequestsTimeoutMs: number
+    /**
+     * Timeout for pending storage node downloads in seconds
+     */
+    pendingDownloadTimeoutSec: number
+    /**
+     * Maximum size of a data object allowed to be cached by the node
+     */
+    maxCachedItemSize?: string
+    /**
+     * TTL (in seconds) for dataObjectSourceByObjectId cache used when proxying objects of size greater than maxCachedItemSize to the right storage node
+     */
+    dataObjectSourceByObjectIdTTL?: number
   }
   /**
    * Specifies how often periodic tasks (for example cache cleanup) are executed by the node.