Browse Source

Cache logic, mimeType fix, requests handling optimalization

Leszek Wiesner 3 years ago
parent
commit
c69da4c9eb

+ 2 - 1
distributor-node/config.yml

@@ -7,7 +7,8 @@ directories:
   logs: ./local/logs
 log:
   file: debug
-  console: debug
+  console: info
+storageLimit: 100G
 port: 3334
 keys: [//Alice]
 buckets: [0]

+ 1 - 1
distributor-node/package.json

@@ -40,7 +40,7 @@
     "@openapitools/openapi-generator-cli": "^2.3.6",
     "@types/chai": "^4",
     "@types/mocha": "^5",
-    "@types/node": "^10",
+    "@types/node": "^14",
     "@types/node-cleanup": "^2.1.1",
     "@types/express-http-proxy": "^1.6.2",
     "chai": "^4",

+ 85 - 8
distributor-node/src/services/cache/StateCacheService.ts

@@ -3,6 +3,12 @@ import { ReadonlyConfig, StorageNodeDownloadResponse } from '../../types'
 import { LoggingService } from '../logging'
 import fs from 'fs'
 
+// LRU-SP cache parameters
+// Since size is in KB, these parameters should be enough for grouping objects of size up to 2^24 KB = 16 GB
+// TODO: Intoduce MAX_CACHED_ITEM_SIZE and skip caching for large objects entirely? (ie. 10 GB objects)
+export const CACHE_GROUP_LOG_BASE = 2
+export const CACHE_GROUPS_COUNT = 24
+
 export interface PendingDownloadData {
   objectSize: number
   promise: Promise<StorageNodeDownloadResponse>
@@ -12,6 +18,12 @@ export interface StorageNodeEndpointData {
   responseTimes: number[]
 }
 
+export interface CacheItemData {
+  sizeKB: number
+  popularity: number
+  lastAccessTime: number
+}
+
 export class StateCacheService {
   private logger: Logger
   private config: ReadonlyConfig
@@ -25,7 +37,8 @@ export class StateCacheService {
   }
 
   private storedState = {
-    lruContentHashes: new Set<string>(),
+    groupNumberByContentHash: new Map<string, number>(),
+    lruCacheGroups: Array.from({ length: CACHE_GROUPS_COUNT }).map(() => new Map<string, CacheItemData>()),
     mimeTypeByContentHash: new Map<string, string>(),
   }
 
@@ -52,11 +65,66 @@ export class StateCacheService {
     return this.memoryState.contentHashByObjectId.get(objectId)
   }
 
+  private calcCacheGroup({ sizeKB, popularity }: CacheItemData) {
+    return Math.min(
+      Math.max(Math.ceil(Math.log(sizeKB / popularity) / Math.log(CACHE_GROUP_LOG_BASE)), 0),
+      CACHE_GROUPS_COUNT - 1
+    )
+  }
+
+  public newContent(contentHash: string, sizeInBytes: number): void {
+    const cacheItemData: CacheItemData = {
+      popularity: 1,
+      lastAccessTime: Date.now(),
+      sizeKB: Math.ceil(sizeInBytes / 1024),
+    }
+    const groupNumber = this.calcCacheGroup(cacheItemData)
+    this.storedState.groupNumberByContentHash.set(contentHash, groupNumber)
+    this.storedState.lruCacheGroups[groupNumber].set(contentHash, cacheItemData)
+  }
+
   public useContent(contentHash: string): void {
-    if (this.storedState.lruContentHashes.has(contentHash)) {
-      this.storedState.lruContentHashes.delete(contentHash)
+    const groupNumber = this.storedState.groupNumberByContentHash.get(contentHash)
+    if (groupNumber === undefined) {
+      this.logger.warn('groupNumberByContentHash missing when trying to update LRU of content', { contentHash })
+      return
     }
-    this.storedState.lruContentHashes.add(contentHash)
+    const group = this.storedState.lruCacheGroups[groupNumber]
+    const cacheItemData = group.get(contentHash)
+    if (!cacheItemData) {
+      this.logger.warn('Cache inconsistency: item missing in group retrieved from by groupNumberByContentHash map!', {
+        contentHash,
+        groupNumber,
+      })
+      this.storedState.groupNumberByContentHash.delete(contentHash)
+      return
+    }
+    cacheItemData.lastAccessTime = Date.now()
+    ++cacheItemData.popularity
+    // Move object to the top of the current group / new group
+    const targetGroupNumber = this.calcCacheGroup(cacheItemData)
+    const targetGroup = this.storedState.lruCacheGroups[targetGroupNumber]
+    group.delete(contentHash)
+    targetGroup.set(contentHash, cacheItemData)
+    if (targetGroupNumber !== groupNumber) {
+      this.storedState.groupNumberByContentHash.set(contentHash, targetGroupNumber)
+    }
+  }
+
+  public getCacheEvictCandidateHash(): string | null {
+    let highestCost = 0
+    let bestCandidate: string | null = null
+    for (const group of this.storedState.lruCacheGroups) {
+      const lastItemInGroup = Array.from(group.entries())[0]
+      const [contentHash, objectData] = lastItemInGroup
+      const elapsedSinceLastAccessed = Math.ceil((Date.now() - objectData.lastAccessTime) / 60_000)
+      const itemCost = (elapsedSinceLastAccessed * objectData.sizeKB) / objectData.popularity
+      if (itemCost >= highestCost) {
+        highestCost = itemCost
+        bestCandidate = contentHash
+      }
+    }
+    return bestCandidate
   }
 
   public newPendingDownload(
@@ -82,7 +150,12 @@ export class StateCacheService {
 
   public dropByHash(contentHash: string): void {
     this.storedState.mimeTypeByContentHash.delete(contentHash)
-    this.storedState.lruContentHashes.delete(contentHash)
+    this.memoryState.pendingDownloadsByContentHash.delete(contentHash)
+    const cacheGroupNumber = this.storedState.groupNumberByContentHash.get(contentHash)
+    if (cacheGroupNumber) {
+      this.storedState.groupNumberByContentHash.delete(contentHash)
+      this.storedState.lruCacheGroups[cacheGroupNumber].delete(contentHash)
+    }
   }
 
   public setStorageNodeEndpointResponseTime(endpoint: string, time: number): void {
@@ -95,10 +168,11 @@ export class StateCacheService {
   }
 
   private serializeData() {
-    const { lruContentHashes, mimeTypeByContentHash } = this.storedState
+    const { groupNumberByContentHash, lruCacheGroups, mimeTypeByContentHash } = this.storedState
     return JSON.stringify({
-      lruContentHashes: Array.from(lruContentHashes),
+      lruCacheGroups: lruCacheGroups.map((g) => Array.from(g.entries())),
       mimeTypeByContentHash: Array.from(mimeTypeByContentHash.entries()),
+      groupNumberByContentHash: Array.from(groupNumberByContentHash.entries()),
     })
   }
 
@@ -128,7 +202,10 @@ export class StateCacheService {
     if (fs.existsSync(this.cacheFilePath)) {
       this.logger.info('Loading cache from file', { file: this.cacheFilePath })
       const fileContent = JSON.parse(fs.readFileSync(this.cacheFilePath).toString())
-      this.storedState.lruContentHashes = new Set<string>(fileContent.lruContentHashes || [])
+      this.storedState.lruCacheGroups = (fileContent.lruCacheGroups || []).map(
+        (g: any) => new Map<string, CacheItemData>(g)
+      )
+      this.storedState.groupNumberByContentHash = new Map<string, number>(fileContent.groupNumberByContentHash || [])
       this.storedState.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
     } else {
       this.logger.warn(`Cache file (${this.cacheFilePath}) is empty. Starting from scratch`)

+ 77 - 13
distributor-node/src/services/content/ContentService.ts

@@ -4,9 +4,9 @@ import { StateCacheService } from '../cache/StateCacheService'
 import { LoggingService } from '../logging'
 import { Logger } from 'winston'
 import { FileContinousReadStream, FileContinousReadStreamOptions } from './FileContinousReadStream'
-import readChunk from 'read-chunk'
 import FileType from 'file-type'
 import _ from 'lodash'
+import { Readable } from 'stream'
 
 export const DEFAULT_CONTENT_TYPE = 'application/octet-stream'
 
@@ -16,6 +16,12 @@ export class ContentService {
   private logger: Logger
   private stateCache: StateCacheService
 
+  private contentSizeSum = 0 // TODO: Assign on startup
+
+  private get freeSpace(): number {
+    return this.config.storageLimit - this.contentSizeSum
+  }
+
   public constructor(config: ReadonlyConfig, logging: LoggingService, stateCache: StateCacheService) {
     this.config = config
     this.logger = logging.createLogger('ContentService')
@@ -67,7 +73,7 @@ export class ContentService {
   }
 
   public createWriteStream(contentHash: string): fs.WriteStream {
-    return fs.createWriteStream(this.path(contentHash))
+    return fs.createWriteStream(this.path(contentHash), { autoClose: true, emitClose: true })
   }
 
   public createContinousReadStream(
@@ -78,20 +84,78 @@ export class ContentService {
   }
 
   public async guessMimeType(contentHash: string): Promise<string> {
-    const chunk = await readChunk(this.path(contentHash), 0, 4100)
-    const guessResult = await FileType.fromBuffer(chunk)
+    const guessResult = await FileType.fromFile(this.path(contentHash))
     return guessResult?.mime || DEFAULT_CONTENT_TYPE
   }
 
-  public async handleNewContent(contentHash: string, dataStream: NodeJS.ReadableStream): Promise<void> {
-    const fileStream = this.createWriteStream(contentHash)
-    fileStream.on('ready', () => {
-      dataStream.pipe(fileStream)
-    })
-    fileStream.on('finish', async () => {
-      const mimeType = await this.guessMimeType(contentHash)
-      this.stateCache.setContentMimeType(contentHash, mimeType)
-      this.stateCache.dropPendingDownload(contentHash)
+  private dropCacheItemsUntilFreeSpaceReached(expectedFreeSpace: number): void {
+    let evictCandidateHash: string | null
+    while ((evictCandidateHash = this.stateCache.getCacheEvictCandidateHash())) {
+      this.drop(evictCandidateHash)
+      if (this.freeSpace === expectedFreeSpace) {
+        return
+      }
+    }
+  }
+
+  public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<boolean> {
+    return new Promise((resolve, reject) => {
+      if (this.freeSpace < expectedSize) {
+        this.dropCacheItemsUntilFreeSpaceReached(expectedSize)
+      }
+
+      const fileStream = this.createWriteStream(contentHash)
+
+      let bytesRecieved = 0
+
+      // TODO: Use NodeJS pipeline for easier error handling (https://nodejs.org/es/docs/guides/backpressuring-in-streams/#the-problem-with-data-handling)
+
+      fileStream.on('ready', () => {
+        dataStream.pipe(fileStream)
+        // Attach handler after pipe, otherwise some data will be lost!
+        dataStream.on('data', (chunk) => {
+          bytesRecieved += chunk.length
+          if (bytesRecieved > expectedSize) {
+            dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
+          }
+        })
+        // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
+        resolve(true)
+      })
+
+      dataStream.on('error', (e) => {
+        fileStream.destroy(e)
+      })
+
+      fileStream.on('error', (err) => {
+        reject(err)
+        this.logger.error(`Content data stream error`, {
+          err,
+          contentHash,
+          expectedSize,
+          bytesRecieved,
+        })
+        this.drop(contentHash)
+      })
+
+      fileStream.on('close', async () => {
+        const { bytesWritten } = fileStream
+        if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
+          this.logger.info('New content accepted', { contentHash, bytesRecieved, written: bytesWritten })
+          this.stateCache.dropPendingDownload(contentHash)
+          const mimeType = await this.guessMimeType(contentHash)
+          this.stateCache.newContent(contentHash, expectedSize)
+          this.stateCache.setContentMimeType(contentHash, mimeType)
+        } else {
+          this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
+            contentHash,
+            expectedSize,
+            bytesWritten,
+            bytesRecieved,
+          })
+          this.drop(contentHash)
+        }
+      })
     })
   }
 }

+ 45 - 33
distributor-node/src/services/content/FileContinousReadStream.ts

@@ -1,3 +1,4 @@
+import { Readable } from 'stream'
 import fs from 'fs'
 
 export interface FileContinousReadStreamOptions {
@@ -8,20 +9,22 @@ export interface FileContinousReadStreamOptions {
   maxRetries?: number
 }
 
-export class FileContinousReadStream {
+export class FileContinousReadStream extends Readable {
   private fd: number
   private position: number
-  private end: number
-  private chunkSize: number
+  private lastByte: number
   private missingDataRetryTime: number
   private maxRetries: number
   private finished: boolean
+  private interval: NodeJS.Timeout | undefined
 
   public constructor(path: string, options: FileContinousReadStreamOptions) {
+    super({
+      highWaterMark: options.chunkSize || 1 * 1024 * 1024, // default: 1 MB
+    })
     this.fd = fs.openSync(path, 'r')
     this.position = options.start || 0
-    this.end = options.end
-    this.chunkSize = options.chunkSize || 1 * 1024 * 1024 // 1 MB
+    this.lastByte = options.end
     this.missingDataRetryTime = options.missingDataRetryTime || 50 // 50 ms
     this.maxRetries = options.maxRetries || 2400 // 2400 retries x 50 ms = 120s timeout
     this.finished = false
@@ -32,14 +35,14 @@ export class FileContinousReadStream {
     this.finished = true
   }
 
-  private readChunkSync(): Buffer | null {
-    const chunk = Buffer.alloc(this.chunkSize)
-    const readBytes = fs.readSync(this.fd, chunk, 0, this.chunkSize, this.position)
+  private readChunkSync(bytesN: number): Buffer | null {
+    const chunk = Buffer.alloc(bytesN)
+    const readBytes = fs.readSync(this.fd, chunk, 0, bytesN, this.position)
     const newPosition = this.position + readBytes
-    if (readBytes < this.chunkSize && newPosition <= this.end) {
+    if (readBytes < bytesN && newPosition <= this.lastByte) {
       return null
     }
-    if (newPosition > this.end) {
+    if (newPosition > this.lastByte) {
       this.finish()
       return chunk.slice(0, readBytes)
     }
@@ -47,29 +50,38 @@ export class FileContinousReadStream {
     return chunk
   }
 
-  public readChunk(): Promise<Buffer | null> {
-    return new Promise((resolve, reject) => {
-      if (this.finished) {
-        return resolve(null)
-      }
+  // Reason: https://nodejs.org/docs/latest/api/stream.html#stream_implementing_a_readable_stream
+  // eslint-disable-next-line @typescript-eslint/naming-convention
+  _read(bytesN: number): void {
+    if (this.finished) {
+      this.push(null)
+      return
+    }
+    const chunk = this.readChunkSync(bytesN)
+    if (chunk === null) {
+      let retries = 0
+      const interval = setInterval(() => {
+        const chunk = this.readChunkSync(bytesN)
+        if (chunk !== null) {
+          clearInterval(interval)
+          return this.push(chunk)
+        }
+        if (++retries >= this.maxRetries) {
+          clearInterval(interval)
+          this.destroy(new Error('Max missing data retries limit reached'))
+        }
+      }, this.missingDataRetryTime)
+      this.interval = interval
+    } else {
+      this.push(chunk)
+    }
+  }
 
-      const chunk = this.readChunkSync()
-      if (chunk === null) {
-        let retries = 0
-        const interval = setInterval(() => {
-          const chunk = this.readChunkSync()
-          if (chunk !== null) {
-            clearInterval(interval)
-            return resolve(chunk)
-          }
-          if (++retries >= this.maxRetries) {
-            clearInterval(interval)
-            return reject(new Error('Max missing data retries limit reached'))
-          }
-        }, this.missingDataRetryTime)
-      } else {
-        resolve(chunk)
-      }
-    })
+  // Reason: https://nodejs.org/docs/latest/api/stream.html#stream_implementing_a_readable_stream
+  // eslint-disable-next-line @typescript-eslint/naming-convention
+  _destroy(): void {
+    if (this.interval) {
+      clearInterval(this.interval)
+    }
   }
 }

+ 25 - 3
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -4,6 +4,9 @@ import fs from 'fs'
 import path from 'path'
 import YAML from 'yaml'
 import _ from 'lodash'
+import { bytesizeUnits } from '../validation/schemas/configSchema'
+
+const MIN_CACHE_SIZE = 20 * Math.pow(1024, 3)
 
 export class ConfigParserService {
   validator: ValidationService
@@ -16,6 +19,13 @@ export class ConfigParserService {
     return _.mapValues(paths, (v) => path.resolve(path.dirname(configFilePath), v))
   }
 
+  private parseBytesize(bytesize: string) {
+    const intValue = parseInt(bytesize)
+    const unit = bytesize[bytesize.length - 1]
+
+    return intValue * Math.pow(1024, bytesizeUnits.indexOf(unit))
+  }
+
   public loadConfing(configPath: string): Config {
     const fileContent = fs.readFileSync(configPath).toString()
     let inputConfig: unknown
@@ -27,9 +37,21 @@ export class ConfigParserService {
       throw new Error('Unrecognized config format (use .yml or .json)')
     }
 
-    const config = this.validator.validate('Config', inputConfig)
-    config.directories = this.resolveConfigDirectoryPaths(config.directories, configPath)
+    const configJson = this.validator.validate('Config', inputConfig)
+
+    const directories = this.resolveConfigDirectoryPaths(configJson.directories, configPath)
+    const storageLimit = this.parseBytesize(configJson.storageLimit)
+
+    if (storageLimit < MIN_CACHE_SIZE) {
+      throw new Error('Cache storage limit should be at least 20G!')
+    }
+
+    const parsedConfig: Config = {
+      ...configJson,
+      directories,
+      storageLimit,
+    }
 
-    return config
+    return parsedConfig
   }
 }

+ 1 - 0
distributor-node/src/services/server/ServerService.ts

@@ -51,6 +51,7 @@ export class ServerService {
     app.use(
       expressWinston.logger({
         winstonInstance: this.logger,
+        level: 'http',
       })
     )
 

+ 70 - 12
distributor-node/src/services/server/controllers/public.ts

@@ -8,6 +8,8 @@ import { LoggingService } from '../../logging'
 import { ContentService, DEFAULT_CONTENT_TYPE } from '../../content/ContentService'
 import proxy from 'express-http-proxy'
 
+const CACHE_MAX_AGE = 31536000
+
 export class PublicApiController {
   private logger: Logger
   private networking: NetworkingService
@@ -26,20 +28,27 @@ export class PublicApiController {
     this.content = content
   }
 
-  private serveAvailableAsset(
+  private serveAssetFromFilesystem(
     req: express.Request,
     res: express.Response,
     next: express.NextFunction,
     contentHash: string
   ): void {
     // TODO: FIXME: Actually check if we are still supposed to serve it and just remove after responding if not
+    // TODO: Limit the number of times useContent is trigerred for similar requests
+    // (for example: same ip, 3 different request within a minute = 1 request)
     this.stateCache.useContent(contentHash)
 
     const path = this.content.path(contentHash)
-    const stream = send(req, path)
+    const stream = send(req, path, {
+      maxAge: CACHE_MAX_AGE,
+      lastModified: false,
+    })
     const mimeType = this.stateCache.getContentMimeType(contentHash)
 
     stream.on('headers', (res) => {
+      res.setHeader('x-cache', 'hit')
+      res.setHeader('x-data-source', 'cache')
       res.setHeader('content-disposition', 'inline')
       res.setHeader('content-type', mimeType || DEFAULT_CONTENT_TYPE)
     })
@@ -70,16 +79,59 @@ export class PublicApiController {
       throw new Error('Trying to serve pending download asset that is not pending download!')
     }
 
-    const { promise } = pendingDownload
+    const { promise, objectSize } = pendingDownload
     const response = await promise
     const source = new URL(response.config.url!)
+    // TODO: FIXME: This may not be available soon! (may be removed from storage node)
+    const contentType = response.headers['content-type'] || DEFAULT_CONTENT_TYPE
+    res.setHeader('content-type', contentType)
+    // Allow caching pendingDownload reponse only for very short period of time and requite revalidation,
+    // since the data coming from the source may not be valid
+    res.setHeader('cache-control', `max-age=180, must-revalidate`)
+
+    // Handle request using pending download file if this makes sense in current context:
+    if (this.content.exists(contentHash)) {
+      const range = req.range(objectSize)
+      if (!range || range === -1 || range === -2 || range.length !== 1 || range.type !== 'bytes') {
+        // Range is not provided / invalid - serve data from pending download file
+        return this.servePendingDownloadAssetFromFile(req, res, next, contentHash, objectSize)
+      } else if (range[0].start === 0) {
+        // Range starts from the beginning of the content - serve data from pending download file
+        return this.servePendingDownloadAssetFromFile(req, res, next, contentHash, objectSize, range[0].end)
+      }
+    }
 
-    this.logger.info(`Proxying request to ${source.href}`, { source: source.href })
+    // Range doesn't start from the beginning of the content or the file was not found - froward request to source storage node
+    this.logger.info(`Forwarding request to ${source.href}`, { source: source.href })
+    res.setHeader('x-data-source', 'external')
+    return proxy(source.origin, { proxyReqPathResolver: () => source.pathname })(req, res, next)
+  }
 
-    // Proxy the request to download source
-    await proxy(source.origin, {
-      proxyReqPathResolver: () => source.pathname,
-    })(req, res, next)
+  private async servePendingDownloadAssetFromFile(
+    req: express.Request,
+    res: express.Response,
+    next: express.NextFunction,
+    contentHash: string,
+    objectSize: number,
+    rangeEnd?: number
+  ) {
+    const isRange = rangeEnd !== undefined
+    this.logger.info(`Serving pending download asset from file`, { contentHash, isRange, objectSize, rangeEnd })
+    const stream = this.content.createContinousReadStream(contentHash, {
+      end: isRange ? rangeEnd || 0 : objectSize - 1,
+    })
+    req.on('close', () => {
+      res.end()
+      stream.destroy()
+    })
+    res.status(isRange ? 206 : 200)
+    res.setHeader('accept-ranges', 'bytes')
+    res.setHeader('x-data-source', 'partial-cache')
+    res.setHeader('content-disposition', 'inline')
+    if (isRange) {
+      res.setHeader('content-range', `bytes 0-${rangeEnd}/${objectSize}`)
+    }
+    stream.pipe(res)
   }
 
   public async asset(
@@ -100,9 +152,10 @@ export class PublicApiController {
     if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
       this.logger.info('Requested file found in filesystem', { path: this.content.path(contentHash) })
       this.stateCache.useContent(contentHash)
-      return this.serveAvailableAsset(req, res, next, contentHash)
+      return this.serveAssetFromFilesystem(req, res, next, contentHash)
     } else if (contentHash && pendingDownload) {
       this.logger.info('Requested file is in pending download state', { path: this.content.path(contentHash) })
+      res.setHeader('x-cache', 'pending')
       return this.servePendingDownloadAsset(req, res, next, contentHash)
     } else {
       this.logger.info('Requested file not found in filesystem')
@@ -112,7 +165,7 @@ export class PublicApiController {
           message: 'Data object does not exist',
         }
         res.status(404).json(errorRes)
-        // TODO: UNCOMMENT!
+        // TODO: FIXME: UNCOMMENT!
         // } else if (!objectInfo.isSupported) {
         //   const errorRes: ErrorResponse = {
         //     message: 'Data object not served by this node',
@@ -124,11 +177,16 @@ export class PublicApiController {
         if (!objectData) {
           throw new Error('Missing data object data')
         }
-        const { contentHash } = objectData
+        const { contentHash, size } = objectData
+
         const downloadResponse = await this.networking.downloadDataObject(objectData)
 
         if (downloadResponse) {
-          this.content.handleNewContent(contentHash, downloadResponse.data)
+          // Note: Await will only wait unil the file is created, so we may serve the response from it
+          await this.content.handleNewContent(contentHash, size, downloadResponse.data)
+          res.setHeader('x-cache', 'fetch-triggered')
+        } else {
+          res.setHeader('x-cache', 'pending')
         }
         return this.servePendingDownloadAsset(req, res, next, contentHash)
       }

+ 5 - 1
distributor-node/src/services/validation/schemas/configSchema.ts

@@ -2,9 +2,12 @@ import { JSONSchema4 } from 'json-schema'
 import { strictObject } from './utils'
 import winston from 'winston'
 
+export const bytesizeUnits = ['B', 'K', 'M', 'G', 'T']
+export const bytesizeRegex = new RegExp(`^[0-9]+(${bytesizeUnits.join('|')})$`)
+
 export const configSchema: JSONSchema4 = {
   type: 'object',
-  required: ['endpoints', 'directories', 'buckets', 'keys', 'port'],
+  required: ['endpoints', 'directories', 'buckets', 'keys', 'port', 'storageLimit'],
   additionalProperties: false,
   properties: {
     endpoints: strictObject({
@@ -24,6 +27,7 @@ export const configSchema: JSONSchema4 = {
         console: { type: 'string', enum: Object.keys(winston.config.npm.levels) },
       },
     },
+    storageLimit: { type: 'string', pattern: bytesizeRegex.source },
     port: { type: 'number' },
     keys: { type: 'array', items: { type: 'string' }, minItems: 1 },
     buckets: { type: 'array', items: { type: 'number' }, minItems: 1 },

+ 3 - 1
distributor-node/src/types/config.ts

@@ -1,5 +1,7 @@
 import { ConfigJson } from './generated/ConfigJson'
 import { DeepReadonly } from './common'
 
-export type Config = ConfigJson
+export type Config = Omit<ConfigJson, 'storageLimit'> & {
+  storageLimit: number
+}
 export type ReadonlyConfig = DeepReadonly<Config>

+ 1 - 0
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -19,6 +19,7 @@ export interface ConfigJson {
     file?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
     console?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
   }
+  storageLimit: string
   port: number
   keys: [string, ...string[]]
   buckets: [number, ...number[]]

+ 2 - 1
distributor-node/src/types/networking.ts

@@ -1,3 +1,4 @@
 import { AxiosResponse } from 'axios'
+import { Readable } from 'stream'
 
-export type StorageNodeDownloadResponse = AxiosResponse<NodeJS.ReadableStream>
+export type StorageNodeDownloadResponse = AxiosResponse<Readable>

+ 5 - 0
yarn.lock

@@ -6522,6 +6522,11 @@
   resolved "https://registry.yarnpkg.com/@types/node/-/node-12.19.3.tgz#a6e252973214079155f749e8bef99cc80af182fa"
   integrity sha512-8Jduo8wvvwDzEVJCOvS/G6sgilOLvvhn1eMmK3TW8/T217O7u1jdrK6ImKLv80tVryaPSVeKu6sjDEiFjd4/eg==
 
+"@types/node@^14":
+  version "14.17.7"
+  resolved "https://registry.yarnpkg.com/@types/node/-/node-14.17.7.tgz#e106997493e617edeba52fdbd965930da494113b"
+  integrity sha512-SYTdMaW47se8499q8m0fYKZZRlmq0RaRv6oYmlVm6DUm31l0fhOl1D03X8hGxohCKTI2Bg6w7W0TiYB51aJzag==
+
 "@types/normalize-package-data@^2.4.0":
   version "2.4.0"
   resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.0.tgz#e486d0d97396d79beedd0a6e33f4534ff6b4973e"