Quellcode durchsuchen

Request queue, improved downloads & storage node proxying, error handling, bug fixes

Leszek Wiesner vor 3 Jahren
Ursprung
Commit
4c1681309c

+ 1 - 0
distributor-node/.gitignore

@@ -6,3 +6,4 @@
 /package-lock.json
 /tmp
 node_modules
+/local

+ 4 - 4
distributor-node/config.yml

@@ -2,12 +2,12 @@ endpoints:
   queryNode: http://localhost:8081/graphql
   substrateNode: ws://localhost:9944
 directories:
-  data: /data
-  cache: /cache
-  logs: /logs
+  data: ./local/data
+  cache: ./local/cache
+  logs: ./local/logs
 log:
   file: debug
   console: debug
 port: 3334
 keys: [//Alice]
-buckets: [1]
+buckets: [0]

+ 22 - 19
distributor-node/package.json

@@ -8,46 +8,49 @@
   },
   "bugs": "https://github.com/Joystream/joystream/issues",
   "dependencies": {
+    "@elastic/ecs-winston-format": "^1.1.0",
     "@oclif/command": "^1",
     "@oclif/config": "^1",
     "@oclif/plugin-help": "^2",
-    "tslib": "^1",
-    "yaml": "^1.10.2",
     "ajv": "^7",
-    "@elastic/ecs-winston-format": "^1.1.0",
+    "axios": "^0.21.1",
     "cross-fetch": "^3.1.4",
-    "lodash": "^4.17.21",
-    "lru-cache": "^6.0.0",
     "express": "^4.17.1",
     "express-winston": "^4.1.0",
+    "file-type": "^16.5.1",
+    "lodash": "^4.17.21",
+    "lru-cache": "^6.0.0",
+    "node-cleanup": "^2.1.2",
     "proper-lockfile": "^4.1.2",
-    "axios": "^0.21.1",
-    "send": "^0.17.1",
     "read-chunk": "^3.2.0",
-    "file-type": "^16.5.1",
-    "node-cleanup": "^2.1.2"
+    "send": "^0.17.1",
+    "tslib": "^1",
+    "yaml": "^1.10.2",
+    "queue": "^6.0.2",
+    "express-http-proxy": "^1.6.2"
   },
   "devDependencies": {
+    "@graphql-codegen/cli": "^1.21.4",
+    "@graphql-codegen/import-types-preset": "^1.18.1",
+    "@graphql-codegen/typescript": "^1.22.0",
+    "@graphql-codegen/typescript-document-nodes": "^1.17.11",
+    "@graphql-codegen/typescript-operations": "^1.17.16",
     "@oclif/dev-cli": "^1",
     "@oclif/test": "^1",
+    "@openapitools/openapi-generator-cli": "^2.3.6",
     "@types/chai": "^4",
     "@types/mocha": "^5",
     "@types/node": "^10",
+    "@types/node-cleanup": "^2.1.1",
+    "@types/express-http-proxy": "^1.6.2",
     "chai": "^4",
     "globby": "^10",
+    "json-schema-to-typescript": "^10.1.4",
     "mocha": "^5",
     "nyc": "^14",
-    "ts-node": "^8",
-    "typescript": "^3.3",
-    "@graphql-codegen/cli": "^1.21.4",
-    "@graphql-codegen/typescript": "^1.22.0",
-    "@graphql-codegen/import-types-preset": "^1.18.1",
-    "@graphql-codegen/typescript-operations": "^1.17.16",
-    "@graphql-codegen/typescript-document-nodes": "^1.17.11",
-    "json-schema-to-typescript": "^10.1.4",
     "openapi-typescript": "^4.0.2",
-    "@openapitools/openapi-generator-cli": "^2.3.6",
-    "@types/node-cleanup": "^2.1.1"
+    "ts-node": "^8",
+    "typescript": "^3.3"
   },
   "engines": {
     "node": ">=14.16.1"

+ 0 - 1
distributor-node/src/command-base/default.ts

@@ -91,7 +91,6 @@ export default abstract class DefaultCommandBase extends Command {
 
   async finally(err: any): Promise<void> {
     if (!err) this.exit(ExitCodes.OK)
-    console.error(err)
     super.finally(err)
   }
 }

+ 4 - 0
distributor-node/src/commands/start.ts

@@ -4,6 +4,10 @@ import { App } from '../app'
 export default class StartNode extends DefaultCommandBase {
   static description = 'Start the node'
 
+  static flags = {
+    ...DefaultCommandBase.flags,
+  }
+
   async run(): Promise<void> {
     const app = new App(this.appConfig)
     app.start()

+ 44 - 29
distributor-node/src/services/cache/StateCacheService.ts

@@ -1,14 +1,15 @@
 import { Logger } from 'winston'
-import { ReadonlyConfig } from '../../types'
+import { ReadonlyConfig, StorageNodeDownloadResponse } from '../../types'
 import { LoggingService } from '../logging'
 import fs from 'fs'
 
 export interface PendingDownloadData {
   objectSize: number
-  availableEndpoints: string[]
-  pendingAvailabilityEndpointsCount: number
-  downloadAttempts: number
-  isAttemptPending: boolean
+  promise: Promise<StorageNodeDownloadResponse>
+}
+
+export interface StorageNodeEndpointData {
+  responseTimes: number[]
 }
 
 export class StateCacheService {
@@ -16,11 +17,16 @@ export class StateCacheService {
   private config: ReadonlyConfig
   private cacheFilePath: string
   private saveInterval: NodeJS.Timeout
-  private cacheData = {
-    lruContentHashes: new Set<string>(),
+
+  private memoryState = {
     pendingDownloadsByContentHash: new Map<string, PendingDownloadData>(),
-    mimeTypeByContentHash: new Map<string, string>(),
     contentHashByObjectId: new Map<string, string>(),
+    storageNodeEndpointDataByEndpoint: new Map<string, StorageNodeEndpointData>(),
+  }
+
+  private storedState = {
+    lruContentHashes: new Set<string>(),
+    mimeTypeByContentHash: new Map<string, string>(),
   }
 
   public constructor(config: ReadonlyConfig, logging: LoggingService, saveIntervalMs = 60 * 1000) {
@@ -31,56 +37,65 @@ export class StateCacheService {
   }
 
   public setContentMimeType(contentHash: string, mimeType: string): void {
-    this.cacheData.mimeTypeByContentHash.set(contentHash, mimeType)
+    this.storedState.mimeTypeByContentHash.set(contentHash, mimeType)
   }
 
   public getContentMimeType(contentHash: string): string | undefined {
-    return this.cacheData.mimeTypeByContentHash.get(contentHash)
+    return this.storedState.mimeTypeByContentHash.get(contentHash)
   }
 
   public setObjectContentHash(objectId: string, hash: string): void {
-    this.cacheData.contentHashByObjectId.set(objectId, hash)
+    this.memoryState.contentHashByObjectId.set(objectId, hash)
   }
 
   public getObjectContentHash(objectId: string): string | undefined {
-    return this.cacheData.contentHashByObjectId.get(objectId)
+    return this.memoryState.contentHashByObjectId.get(objectId)
   }
 
   public useContent(contentHash: string): void {
-    if (this.cacheData.lruContentHashes.has(contentHash)) {
-      this.cacheData.lruContentHashes.delete(contentHash)
+    if (this.storedState.lruContentHashes.has(contentHash)) {
+      this.storedState.lruContentHashes.delete(contentHash)
     }
-    this.cacheData.lruContentHashes.add(contentHash)
+    this.storedState.lruContentHashes.add(contentHash)
   }
 
-  public newPendingDownload(contentHash: string, objectSize: number): PendingDownloadData {
+  public newPendingDownload(
+    contentHash: string,
+    objectSize: number,
+    promise: Promise<StorageNodeDownloadResponse>
+  ): PendingDownloadData {
     const pendingDownload: PendingDownloadData = {
       objectSize,
-      availableEndpoints: [],
-      pendingAvailabilityEndpointsCount: 0,
-      downloadAttempts: 0,
-      isAttemptPending: false,
+      promise,
     }
-    this.cacheData.pendingDownloadsByContentHash.set(contentHash, pendingDownload)
+    this.memoryState.pendingDownloadsByContentHash.set(contentHash, pendingDownload)
     return pendingDownload
   }
 
   public getPendingDownload(contentHash: string): PendingDownloadData | undefined {
-    return this.cacheData.pendingDownloadsByContentHash.get(contentHash)
+    return this.memoryState.pendingDownloadsByContentHash.get(contentHash)
   }
 
   public dropPendingDownload(contentHash: string): void {
-    this.cacheData.pendingDownloadsByContentHash.delete(contentHash)
+    this.memoryState.pendingDownloadsByContentHash.delete(contentHash)
   }
 
   public dropByHash(contentHash: string): void {
-    this.cacheData.mimeTypeByContentHash.delete(contentHash)
-    this.cacheData.lruContentHashes.delete(contentHash)
+    this.storedState.mimeTypeByContentHash.delete(contentHash)
+    this.storedState.lruContentHashes.delete(contentHash)
+  }
+
+  public setStorageNodeEndpointResponseTime(endpoint: string, time: number): void {
+    const data = this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint) || { responseTimes: [] }
+    data.responseTimes.push(time)
+  }
+
+  public getStorageNodeEndpointData(endpoint: string): StorageNodeEndpointData | undefined {
+    return this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint)
   }
 
   private serializeData() {
-    // Only serializes data we can't easily reproduce during startup
-    const { lruContentHashes, mimeTypeByContentHash } = this.cacheData
+    const { lruContentHashes, mimeTypeByContentHash } = this.storedState
     return JSON.stringify({
       lruContentHashes: Array.from(lruContentHashes),
       mimeTypeByContentHash: Array.from(mimeTypeByContentHash.entries()),
@@ -113,8 +128,8 @@ export class StateCacheService {
     if (fs.existsSync(this.cacheFilePath)) {
       this.logger.info('Loading cache from file', { file: this.cacheFilePath })
       const fileContent = JSON.parse(fs.readFileSync(this.cacheFilePath).toString())
-      this.cacheData.lruContentHashes = new Set<string>(fileContent.lruContentHashes || [])
-      this.cacheData.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
+      this.storedState.lruContentHashes = new Set<string>(fileContent.lruContentHashes || [])
+      this.storedState.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
     } else {
       this.logger.warn(`Cache file (${this.cacheFilePath}) is empty. Starting from scratch`)
     }

+ 13 - 2
distributor-node/src/services/content/ContentService.ts

@@ -1,10 +1,9 @@
 import fs from 'fs'
-import { ReadonlyConfig } from '../../types'
+import { ReadonlyConfig, DataObjectData } from '../../types'
 import { StateCacheService } from '../cache/StateCacheService'
 import { LoggingService } from '../logging'
 import { Logger } from 'winston'
 import { FileContinousReadStream, FileContinousReadStreamOptions } from './FileContinousReadStream'
-import { DataObjectData } from '../../types/dataObject'
 import readChunk from 'read-chunk'
 import FileType from 'file-type'
 import _ from 'lodash'
@@ -83,4 +82,16 @@ export class ContentService {
     const guessResult = await FileType.fromBuffer(chunk)
     return guessResult?.mime || DEFAULT_CONTENT_TYPE
   }
+
+  public async handleNewContent(contentHash: string, dataStream: NodeJS.ReadableStream): Promise<void> {
+    const fileStream = this.createWriteStream(contentHash)
+    fileStream.on('ready', () => {
+      dataStream.pipe(fileStream)
+    })
+    fileStream.on('finish', async () => {
+      const mimeType = await this.guessMimeType(contentHash)
+      this.stateCache.setContentMimeType(contentHash, mimeType)
+      this.stateCache.dropPendingDownload(contentHash)
+    })
+  }
 }

+ 26 - 24
distributor-node/src/services/logging/LoggingService.ts

@@ -2,6 +2,27 @@ import winston, { Logger, LoggerOptions } from 'winston'
 import escFormat from '@elastic/ecs-winston-format'
 import { ReadonlyConfig } from '../../types'
 
+const cliColors = {
+  error: 'red',
+  warn: 'yellow',
+  info: 'green',
+  http: 'magenta',
+  debug: 'grey',
+}
+
+winston.addColors(cliColors)
+
+const cliFormat = winston.format.combine(
+  winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
+  winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }),
+  winston.format.colorize({ all: true }),
+  winston.format.printf(
+    (info) =>
+      `${info.timestamp} ${info.label} ${info.level}: ${info.message}` +
+      (Object.keys(info.metadata).length ? `\n${JSON.stringify(info.metadata, null, 4)}` : '')
+  )
+)
+
 export class LoggingService {
   private loggerOptions: LoggerOptions
 
@@ -14,55 +35,36 @@ export class LoggingService {
       new winston.transports.File({
         filename: `${config.directories.logs}/logs.json`,
         level: config.log?.file || 'debug',
+        format: escFormat(),
       }),
     ]
     if (config.log?.console) {
       transports.push(
         new winston.transports.Console({
           level: config.log.console,
+          format: cliFormat,
         })
       )
     }
     return new LoggingService({
-      format: escFormat(),
       transports,
     })
   }
 
   public static withCLIConfig(): LoggingService {
-    const colors = {
-      error: 'red',
-      warn: 'yellow',
-      info: 'green',
-      http: 'magenta',
-      debug: 'grey',
-    }
-
-    winston.addColors(colors)
-
-    const format = winston.format.combine(
-      winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
-      winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }),
-      winston.format.colorize({ all: true }),
-      winston.format.printf(
-        (info) =>
-          `${info.timestamp} ${info.label} ${info.level}: ${info.message}` +
-          (Object.keys(info.metadata).length ? `\n${JSON.stringify(info.metadata, null, 4)}` : '')
-      )
-    )
     return new LoggingService({
       transports: new winston.transports.Console({
         // Log everything to stderr, only the command output value will be written to stdout
         stderrLevels: Object.keys(winston.config.npm.levels),
+        format: cliFormat,
       }),
-      format,
     })
   }
 
-  public createLogger(label: string): Logger {
+  public createLogger(label: string, ...meta: unknown[]): Logger {
     return winston.createLogger({
       ...this.loggerOptions,
-      defaultMeta: { label },
+      defaultMeta: { label, ...meta },
     })
   }
 }

+ 154 - 79
distributor-node/src/services/networking/NetworkingService.ts

@@ -2,11 +2,27 @@ import { ReadonlyConfig } from '../../types/config'
 import { QueryNodeApi } from './query-node/api'
 import { Logger } from 'winston'
 import { LoggingService } from '../logging'
-import { DataObjectAccessPoints, DataObjectData, DataObjectInfo } from '../../types/dataObject'
 import { StorageNodeApi } from './storage-node/api'
 import { StateCacheService } from '../cache/StateCacheService'
 import { DataObjectDetailsFragment } from './query-node/generated/queries'
-import { AxiosResponse } from 'axios'
+import axios from 'axios'
+import {
+  StorageNodeEndpointData,
+  DataObjectAccessPoints,
+  DataObjectData,
+  DataObjectInfo,
+  StorageNodeDownloadResponse,
+} from '../../types'
+import queue from 'queue'
+import _ from 'lodash'
+
+// TODO: Adjust limits and intervals
+const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
+const MAX_CONCURRENT_DOWNLOADS = 10
+const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10 // 10 pending download * 10 availibility checks per download = 100 concurrent requests
+
+const STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS = 60000
+const STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT = 5000
 
 export class NetworkingService {
   private config: ReadonlyConfig
@@ -16,6 +32,9 @@ export class NetworkingService {
   private stateCache: StateCacheService
   private logger: Logger
 
+  private storageNodeEndpointsCheckInterval: NodeJS.Timeout
+  private testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true })
+
   constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
     this.config = config
     this.logging = logging
@@ -23,6 +42,15 @@ export class NetworkingService {
     this.logger = logging.createLogger('NetworkingManager')
     this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode)
     // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
+    this.checkActiveStorageNodeEndpoints()
+    this.storageNodeEndpointsCheckInterval = setInterval(
+      this.checkActiveStorageNodeEndpoints.bind(this),
+      STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS
+    )
+  }
+
+  public clearIntervals(): void {
+    clearInterval(this.storageNodeEndpointsCheckInterval)
   }
 
   private validateNodeEndpoint(endpoint: string): void {
@@ -32,8 +60,24 @@ export class NetworkingService {
     }
   }
 
+  private filterStorageNodeEndpoints(input: StorageNodeEndpointData[]): StorageNodeEndpointData[] {
+    return input.filter((b) => {
+      try {
+        this.validateNodeEndpoint(b.endpoint)
+        return true
+      } catch (err) {
+        this.logger.warn('Invalid storage endpoint detected!', {
+          bucketId: b.bucketId,
+          endpoint: b.endpoint,
+          err,
+        })
+        return false
+      }
+    })
+  }
+
   private prepareStorageNodeEndpoints(details: DataObjectDetailsFragment) {
-    return details.storageBag.storedBy
+    const endpointsData = details.storageBag.storedBy
       .filter(
         (b) => b.operatorStatus.__typename === 'StorageBucketOperatorStatusActive' && b.operatorMetadata?.nodeEndpoint
       )
@@ -41,26 +85,13 @@ export class NetworkingService {
         bucketId: b.id,
         endpoint: b.operatorMetadata!.nodeEndpoint!,
       }))
-      .filter((b) => {
-        try {
-          this.validateNodeEndpoint(b.endpoint)
-          return true
-        } catch (err) {
-          this.logger.warn('Invalid storage endpoint detected', {
-            bucketId: b.bucketId,
-            endpoint: b.endpoint,
-            err,
-          })
-          return false
-        }
-      })
+
+    return this.filterStorageNodeEndpoints(endpointsData)
   }
 
   private parseDataObjectAccessPoints(details: DataObjectDetailsFragment): DataObjectAccessPoints {
     return {
       storageNodes: this.prepareStorageNodeEndpoints(details),
-      // TODO:
-      distributorNodes: [],
     }
   }
 
@@ -85,81 +116,94 @@ export class NetworkingService {
     }
   }
 
-  public downloadDataObject(objectData: DataObjectData): Promise<AxiosResponse<NodeJS.ReadableStream>> | null {
+  private sortEndpointsByMeanResponseTime(endpoints: string[]) {
+    return endpoints.sort((a, b) => {
+      const dataA = this.stateCache.getStorageNodeEndpointData(a)
+      const dataB = this.stateCache.getStorageNodeEndpointData(b)
+      return (
+        _.mean(dataA?.responseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT]) -
+        _.mean(dataB?.responseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT])
+      )
+    })
+  }
+
+  public downloadDataObject(objectData: DataObjectData): Promise<StorageNodeDownloadResponse> | null {
     const { contentHash, accessPoints, size } = objectData
 
     if (this.stateCache.getPendingDownload(contentHash)) {
+      // Already downloading
       return null
     }
 
-    const pendingDownload = this.stateCache.newPendingDownload(contentHash, size)
-
-    return new Promise<AxiosResponse<NodeJS.ReadableStream>>((resolve, reject) => {
-      const storageEndpoints = accessPoints?.storageNodes.map((n) => n.endpoint)
+    const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
+      const storageEndpoints = this.sortEndpointsByMeanResponseTime(
+        accessPoints?.storageNodes.map((n) => n.endpoint) || []
+      )
 
       this.logger.info('Downloading new data object', { contentHash, storageEndpoints })
-      if (!storageEndpoints || !storageEndpoints.length) {
-        return reject(new Error('No storage endpoints available to download the data object from'))
+      if (!storageEndpoints.length) {
+        reject(new Error('No storage endpoints available to download the data object from'))
+        return
       }
-      const availabilityPromises = storageEndpoints.map(async (endpoint) => {
-        const api = new StorageNodeApi(endpoint, this.logging)
-        const available = await api.isObjectAvailable(contentHash)
-        if (!available) {
-          throw new Error('Not avilable')
+
+      const availabilityQueue = queue({ concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD, autostart: true })
+      const downloadQueue = queue({ concurrency: 1, autostart: true })
+
+      storageEndpoints.forEach(async (endpoint) => {
+        availabilityQueue.push(async () => {
+          const api = new StorageNodeApi(endpoint, this.logging)
+          const available = await api.isObjectAvailable(contentHash)
+          if (!available) {
+            throw new Error('Not avilable')
+          }
+          return endpoint
+        })
+      })
+
+      availabilityQueue.on('success', (endpoint) => {
+        availabilityQueue.stop()
+        const job = () => {
+          const api = new StorageNodeApi(endpoint, this.logging)
+          return api.downloadObject(contentHash)
         }
-        return endpoint
+        downloadQueue.push(job)
       })
 
-      pendingDownload.pendingAvailabilityEndpointsCount = availabilityPromises.length
-      availabilityPromises.forEach((availableNodePromise) =>
-        availableNodePromise
-          .then(async (endpoint) => {
-            pendingDownload.availableEndpoints.push(endpoint)
-            if (!pendingDownload.isAttemptPending) {
-              this.attemptDataObjectDownload(contentHash)
-                .then(resolve)
-                .catch(() => {
-                  if (!pendingDownload.pendingAvailabilityEndpointsCount && !pendingDownload.isAttemptPending) {
-                    return reject(new Error('Cannot download data object from any node'))
-                  }
-                })
-            }
-          })
-          .finally(() => --pendingDownload.pendingAvailabilityEndpointsCount)
-      )
+      availabilityQueue.on('error', () => {
+        /*
+        Do nothing.
+        The handler is needed to avoid unhandled promise rejection
+        */
+      })
+
+      downloadQueue.on('error', (err) => {
+        this.logger.error('Download attempt from storage node failed after availability was confirmed:', { err })
+      })
+
+      downloadQueue.on('end', () => {
+        if (availabilityQueue.length) {
+          availabilityQueue.start()
+        } else {
+          reject(new Error('Failed to download the object from any availablable storage provider'))
+        }
+      })
+
+      availabilityQueue.on('end', () => {
+        if (!downloadQueue.length) {
+          reject(new Error('Failed to download the object from any availablable storage provider'))
+        }
+      })
+
+      downloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
+        availabilityQueue.removeAllListeners().end()
+        downloadQueue.removeAllListeners().end()
+        resolve(response)
+      })
     })
-  }
 
-  private async attemptDataObjectDownload(contentHash: string): Promise<AxiosResponse<NodeJS.ReadableStream>> {
-    const pendingDownload = this.stateCache.getPendingDownload(contentHash)
-    if (!pendingDownload) {
-      throw new Error('Attempting data object download with missing pending download data')
-    }
-    if (pendingDownload.isAttemptPending) {
-      throw new Error('Attempting data object download during an already pending attempt')
-    }
-    const endpoint = pendingDownload.availableEndpoints.shift()
-    if (!endpoint) {
-      throw new Error('Attempting data object download without any available endpoint')
-    }
-    pendingDownload.isAttemptPending = true
-    this.logger.info('Requesting data object from storage node', { contentHash, endpoint })
-    const api = new StorageNodeApi(endpoint, this.logging)
-    try {
-      const response = await api.downloadObject(contentHash)
-      ++pendingDownload.downloadAttempts
-      pendingDownload.isAttemptPending = false
-      // TODO: Validate reponse? (ie. object size etc.)
-      return response
-    } catch (e) {
-      ++pendingDownload.downloadAttempts
-      pendingDownload.isAttemptPending = false
-      if (pendingDownload.availableEndpoints.length) {
-        return this.attemptDataObjectDownload(contentHash)
-      } else {
-        throw e
-      }
-    }
+    this.stateCache.newPendingDownload(contentHash, size, downloadPromise)
+
+    return downloadPromise
   }
 
   async fetchSupportedDataObjects(): Promise<DataObjectData[]> {
@@ -178,4 +222,35 @@ export class NetworkingService {
 
     return objectsData
   }
+
+  async checkActiveStorageNodeEndpoints(): Promise<void> {
+    const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
+    const endpoints = this.filterStorageNodeEndpoints(
+      activeStorageOperators.map(({ id, operatorMetadata }) => ({
+        bucketId: id,
+        endpoint: operatorMetadata!.nodeEndpoint!,
+      }))
+    )
+    this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
+
+    endpoints.forEach(({ endpoint }) => this.testLatencyQueue.push(() => this.checkResponseTime(endpoint)))
+  }
+
+  async checkResponseTime(endpoint: string): Promise<void> {
+    const start = Date.now()
+    this.logger.debug(`Sending storage node response-time check request to: ${endpoint}`, { endpoint })
+    try {
+      // TODO: Use a status endpoint once available?
+      await axios.get(endpoint, { timeout: STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT })
+    } catch (err) {
+      if (axios.isAxiosError(err) && err.response?.status === 404) {
+        // This is the expected outcome currently
+        const responseTime = Date.now() - start
+        this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
+        this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)
+      } else {
+        this.logger.warn('Storage node giving unexpected reponse on root endpoint!', { err })
+      }
+    }
+  }
 }

+ 11 - 0
distributor-node/src/services/networking/query-node/api.ts

@@ -9,6 +9,10 @@ import {
   GetDistributionBucketsWithObjectsQuery,
   GetDistributionBucketsWithObjectsQueryVariables,
   GetDistributionBucketsWithObjects,
+  StorageBucketOperatorFieldsFragment,
+  GetActiveStorageBucketOperatorsDataQuery,
+  GetActiveStorageBucketOperatorsDataQueryVariables,
+  GetActiveStorageBucketOperatorsData,
 } from './generated/queries'
 import { Maybe } from './generated/schema'
 
@@ -65,4 +69,11 @@ export class QueryNodeApi {
       GetDistributionBucketsWithObjectsQueryVariables
     >(GetDistributionBucketsWithObjects, { ids }, 'distributionBuckets')
   }
+
+  public getActiveStorageBucketOperatorsData(): Promise<StorageBucketOperatorFieldsFragment[]> {
+    return this.multipleEntitiesQuery<
+      GetActiveStorageBucketOperatorsDataQuery,
+      GetActiveStorageBucketOperatorsDataQueryVariables
+    >(GetActiveStorageBucketOperatorsData, {}, 'storageBuckets')
+  }
 }

+ 23 - 1
distributor-node/src/services/networking/query-node/generated/queries.ts

@@ -19,6 +19,13 @@ export type GetDistributionBucketsWithObjectsQueryVariables = Types.Exact<{
 
 export type GetDistributionBucketsWithObjectsQuery = { distributionBuckets: Array<DistirubtionBucketsWithObjectsFragment> };
 
+export type StorageBucketOperatorFieldsFragment = { id: string, operatorMetadata?: Types.Maybe<{ nodeEndpoint?: Types.Maybe<string> }> };
+
+export type GetActiveStorageBucketOperatorsDataQueryVariables = Types.Exact<{ [key: string]: never; }>;
+
+
+export type GetActiveStorageBucketOperatorsDataQuery = { storageBuckets: Array<StorageBucketOperatorFieldsFragment> };
+
 export const DataObjectDetails = gql`
     fragment DataObjectDetails on StorageDataObject {
   id
@@ -53,6 +60,14 @@ export const DistirubtionBucketsWithObjects = gql`
   }
 }
     `;
+export const StorageBucketOperatorFields = gql`
+    fragment StorageBucketOperatorFields on StorageBucket {
+  id
+  operatorMetadata {
+    nodeEndpoint
+  }
+}
+    `;
 export const GetDataObjectDetails = gql`
     query getDataObjectDetails($id: ID!) {
   storageDataObjectByUniqueInput(where: {id: $id}) {
@@ -66,4 +81,11 @@ export const GetDistributionBucketsWithObjects = gql`
     ...DistirubtionBucketsWithObjects
   }
 }
-    ${DistirubtionBucketsWithObjects}`;
+    ${DistirubtionBucketsWithObjects}`;
+export const GetActiveStorageBucketOperatorsData = gql`
+    query getActiveStorageBucketOperatorsData {
+  storageBuckets(where: {operatorStatus_json: {isTypeOf_eq: "StorageBucketOperatorStatusActive"}, operatorMetadata: {nodeEndpoint_contains: "http"}}, limit: 9999) {
+    ...StorageBucketOperatorFields
+  }
+}
+    ${StorageBucketOperatorFields}`;

+ 19 - 0
distributor-node/src/services/networking/query-node/queries/queries.graphql

@@ -41,3 +41,22 @@ query getDistributionBucketsWithObjects($ids: [ID!]) {
     ...DistirubtionBucketsWithObjects
   }
 }
+
+fragment StorageBucketOperatorFields on StorageBucket {
+  id
+  operatorMetadata {
+    nodeEndpoint
+  }
+}
+
+query getActiveStorageBucketOperatorsData {
+  storageBuckets(
+    where: {
+      operatorStatus_json: { isTypeOf_eq: "StorageBucketOperatorStatusActive" },
+      operatorMetadata: { nodeEndpoint_contains: "http" }
+    },
+    limit: 9999
+  ) {
+    ...StorageBucketOperatorFields
+  }
+}

+ 7 - 5
distributor-node/src/services/networking/storage-node/api.ts

@@ -1,8 +1,9 @@
 import { Configuration } from './generated'
 import { PublicApi } from './generated/api'
-import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'
+import axios, { AxiosRequestConfig } from 'axios'
 import { LoggingService } from '../../logging'
 import { Logger } from 'winston'
+import { StorageNodeDownloadResponse } from '../../../types'
 
 const AXIOS_TIMEOUT = 10000
 
@@ -21,7 +22,7 @@ export class StorageNodeApi {
     })
     this.publicApi = new PublicApi(config)
     this.endpoint = new URL(endpoint).toString()
-    this.logger = logging.createLogger('StorageNodeApi')
+    this.logger = logging.createLogger('StorageNodeApi', { endpoint })
   }
 
   public async isObjectAvailable(contentHash: string): Promise<boolean> {
@@ -30,10 +31,10 @@ export class StorageNodeApi {
         Range: 'bytes=0-0',
       },
     }
-    this.logger.info('Checking object availibility', { endpoint: this.endpoint, contentHash })
+    this.logger.info('Checking object availibility', { contentHash })
     try {
       await this.publicApi.publicApiFiles(contentHash, options)
-      this.logger.info('Data object available', { contentHash, endpoint: this.endpoint })
+      this.logger.info('Data object available', { contentHash })
       return true
     } catch (err) {
       if (axios.isAxiosError(err)) {
@@ -45,7 +46,8 @@ export class StorageNodeApi {
     }
   }
 
-  public async downloadObject(contentHash: string): Promise<AxiosResponse<NodeJS.ReadableStream>> {
+  public async downloadObject(contentHash: string): Promise<StorageNodeDownloadResponse> {
+    this.logger.info('Sending download request', { contentHash })
     const options: AxiosRequestConfig = {
       responseType: 'stream',
     }

+ 1 - 1
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -13,7 +13,7 @@ export class ConfigParserService {
   }
 
   public resolveConfigDirectoryPaths(paths: Config['directories'], configFilePath: string): Config['directories'] {
-    return _.mapValues(paths, (v) => path.resolve(configFilePath, v))
+    return _.mapValues(paths, (v) => path.resolve(path.dirname(configFilePath), v))
   }
 
   public loadConfing(configPath: string): Config {

+ 14 - 2
distributor-node/src/services/server/ServerService.ts

@@ -19,6 +19,18 @@ export class ServerService {
   private logger: Logger
   private expressApp: express.Application
 
+  private routeWrapper<T>(
+    handler: (req: express.Request<T>, res: express.Response, next: express.NextFunction) => Promise<void>
+  ) {
+    return async (req: express.Request<T>, res: express.Response, next: express.NextFunction) => {
+      try {
+        await handler(req, res, next)
+      } catch (err) {
+        next(err)
+      }
+    }
+  }
+
   public constructor(
     config: ReadonlyConfig,
     stateCache: StateCacheService,
@@ -53,7 +65,7 @@ export class ServerService {
     )
 
     // Routes
-    app.use('/api/v1/asset/:objectId', publicController.asset.bind(publicController))
+    app.use('/api/v1/asset/:objectId', this.routeWrapper(publicController.asset.bind(publicController)))
 
     // Error logger
     app.use(
@@ -77,7 +89,7 @@ export class ServerService {
           })
           .end()
       } else {
-        next(err)
+        res.status(err.status || 500).json({ type: 'exception', message: err.message })
       }
     })
 

+ 42 - 59
distributor-node/src/services/server/controllers/public.ts

@@ -6,6 +6,7 @@ import { NetworkingService } from '../../../services/networking'
 import { ErrorResponse, RouteParams } from '../../../types/api'
 import { LoggingService } from '../../logging'
 import { ContentService, DEFAULT_CONTENT_TYPE } from '../../content/ContentService'
+import proxy from 'express-http-proxy'
 
 export class PublicApiController {
   private logger: Logger
@@ -25,7 +26,12 @@ export class PublicApiController {
     this.content = content
   }
 
-  private serveAvailableAsset(req: express.Request, res: express.Response, contentHash: string): void {
+  private serveAvailableAsset(
+    req: express.Request,
+    res: express.Response,
+    next: express.NextFunction,
+    contentHash: string
+  ): void {
     // TODO: FIXME: Actually check if we are still supposed to serve it and just remove after responding if not
     this.stateCache.useContent(contentHash)
 
@@ -53,45 +59,34 @@ export class PublicApiController {
     stream.pipe(res)
   }
 
-  private async servePendingDownloadAsset(req: express.Request, res: express.Response, contentHash: string) {
-    let closed = false
-    req.on('close', () => {
-      closed = true
-    })
+  private async servePendingDownloadAsset(
+    req: express.Request,
+    res: express.Response,
+    next: express.NextFunction,
+    contentHash: string
+  ) {
     const pendingDownload = this.stateCache.getPendingDownload(contentHash)
     if (!pendingDownload) {
       throw new Error('Trying to serve pending download asset that is not pending download!')
     }
-    const { objectSize } = pendingDownload
-    const mimeType = this.stateCache.getContentMimeType(contentHash)
-    const requestedRanges = req.range(objectSize)
-    const range =
-      Array.isArray(requestedRanges) && requestedRanges.type === 'bytes' && requestedRanges.length === 1
-        ? requestedRanges[0]
-        : null
-    const start = range?.start || 0
-    const end = range?.end || objectSize
-
-    res.status(range ? 206 : 200)
-    res.setHeader('content-disposition', 'inline')
-    res.setHeader('content-type', mimeType || DEFAULT_CONTENT_TYPE)
-    res.setHeader('content-length', end - start + 1)
-    if (range) {
-      res.setHeader('content-range', `bytes ${start}-${end}/${objectSize}`)
-    }
-    const stream = this.content.createContinousReadStream(contentHash, { start, end })
-    let chunk = null
-    while ((chunk = await stream.readChunk()) !== null) {
-      if (closed) {
-        break
-      } else {
-        res.write(chunk)
-      }
-    }
-    res.end()
+
+    const { promise } = pendingDownload
+    const response = await promise
+    const source = new URL(response.config.url!)
+
+    this.logger.info(`Proxying request to ${source.href}`, { source: source.href })
+
+    // Proxy the request to download source
+    await proxy(source.origin, {
+      proxyReqPathResolver: () => source.pathname,
+    })(req, res, next)
   }
 
-  public async asset(req: express.Request<RouteParams<'public.asset'>>, res: express.Response): Promise<void> {
+  public async asset(
+    req: express.Request<RouteParams<'public.asset'>>,
+    res: express.Response,
+    next: express.NextFunction
+  ): Promise<void> {
     req.on('close', () => {
       res.end()
     })
@@ -105,10 +100,10 @@ export class PublicApiController {
     if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
       this.logger.info('Requested file found in filesystem', { path: this.content.path(contentHash) })
       this.stateCache.useContent(contentHash)
-      this.serveAvailableAsset(req, res, contentHash)
+      return this.serveAvailableAsset(req, res, next, contentHash)
     } else if (contentHash && pendingDownload) {
       this.logger.info('Requested file is in pending download state', { path: this.content.path(contentHash) })
-      this.servePendingDownloadAsset(req, res, contentHash)
+      return this.servePendingDownloadAsset(req, res, next, contentHash)
     } else {
       this.logger.info('Requested file not found in filesystem')
       const objectInfo = await this.networking.dataObjectInfo(objectId)
@@ -117,12 +112,13 @@ export class PublicApiController {
           message: 'Data object does not exist',
         }
         res.status(404).json(errorRes)
-      } else if (!objectInfo.isSupported) {
-        const errorRes: ErrorResponse = {
-          message: 'Data object not served by this node',
-        }
-        res.status(400).json(errorRes)
-        // TODO: Redirect to other node that supports it?
+        // TODO: UNCOMMENT!
+        // } else if (!objectInfo.isSupported) {
+        //   const errorRes: ErrorResponse = {
+        //     message: 'Data object not served by this node',
+        //   }
+        //   res.status(400).json(errorRes)
+        //   // TODO: Redirect to other node that supports it?
       } else {
         const { data: objectData } = objectInfo
         if (!objectData) {
@@ -130,24 +126,11 @@ export class PublicApiController {
         }
         const { contentHash } = objectData
         const downloadResponse = await this.networking.downloadDataObject(objectData)
-        if (!downloadResponse) {
-          // Object should be already in pending download
-          this.servePendingDownloadAsset(req, res, contentHash)
-          return
+
+        if (downloadResponse) {
+          this.content.handleNewContent(contentHash, downloadResponse.data)
         }
-        const fileStream = this.content.createWriteStream(contentHash)
-        const { data, headers } = downloadResponse
-        fileStream.on('ready', () => {
-          // TODO: Determine mimeType by chunk processing if header not send?
-          const mimeType = headers['content-type'] || DEFAULT_CONTENT_TYPE
-          this.stateCache.setContentMimeType(contentHash, mimeType)
-          data.pipe(fileStream)
-          this.servePendingDownloadAsset(req, res, contentHash)
-        })
-        fileStream.on('finish', () => {
-          // TODO: Validate file?
-          this.stateCache.dropPendingDownload(contentHash)
-        })
+        return this.servePendingDownloadAsset(req, res, next, contentHash)
       }
     }
   }

+ 4 - 1
distributor-node/src/types/index.ts

@@ -1,2 +1,5 @@
-export * from './config'
+export * from './api'
 export * from './common'
+export * from './config'
+export * from './storage'
+export * from './networking'

+ 3 - 0
distributor-node/src/types/networking.ts

@@ -0,0 +1,3 @@
+import { AxiosResponse } from 'axios'
+
+export type StorageNodeDownloadResponse = AxiosResponse<NodeJS.ReadableStream>

+ 6 - 8
distributor-node/src/types/dataObject.ts → distributor-node/src/types/storage.ts

@@ -1,12 +1,10 @@
+export type StorageNodeEndpointData = {
+  bucketId: string
+  endpoint: string
+}
+
 export type DataObjectAccessPoints = {
-  storageNodes: {
-    bucketId: string
-    endpoint: string
-  }[]
-  distributorNodes: {
-    bucketId: string
-    endpoint: string
-  }[]
+  storageNodes: StorageNodeEndpointData[]
 }
 
 export type DataObjectData = {

+ 17 - 1
yarn.lock

@@ -13619,7 +13619,7 @@ es6-iterator@^2.0.3, es6-iterator@~2.0.3:
     es5-ext "^0.10.35"
     es6-symbol "^3.1.1"
 
-es6-promise@^4.0.3, es6-promise@^4.1.0:
+es6-promise@^4.0.3, es6-promise@^4.1.0, es6-promise@^4.1.1:
   version "4.2.8"
   resolved "https://registry.yarnpkg.com/es6-promise/-/es6-promise-4.2.8.tgz#4eb21594c972bc40553d276e510539143db53e0a"
   integrity "sha1-TrIVlMlyvEBVPSduUQU5FD21Pgo= sha512-HJDGx5daxeIvxdBxvG2cb9g4tEvwIk3i8+nhX0yGrYmZUzbkdg8QbDevheDB8gd0//uPj4c1EQua8Q+MViT0/w=="
@@ -14344,6 +14344,15 @@ expect@^26.6.2:
     jest-message-util "^26.6.2"
     jest-regex-util "^26.0.0"
 
+express-http-proxy@^1.6.2:
+  version "1.6.2"
+  resolved "https://registry.yarnpkg.com/express-http-proxy/-/express-http-proxy-1.6.2.tgz#e87152e45958cee4b91da2fdaa20a1ffd581204a"
+  integrity sha512-soP7UXySFdLbeeMYL1foBkEoZj6HELq9BDAOCr1sLRpqjPaFruN5o6+bZeC+7U4USWIl4JMKEiIvTeKJ2WQdlQ==
+  dependencies:
+    debug "^3.0.1"
+    es6-promise "^4.1.1"
+    raw-body "^2.3.0"
+
 express-normalize-query-params-middleware@^0.5.0:
   version "0.5.1"
   resolved "https://registry.yarnpkg.com/express-normalize-query-params-middleware/-/express-normalize-query-params-middleware-0.5.1.tgz#dbe1e8139aecb234fb6adb5c0059c75db9733d2a"
@@ -25874,6 +25883,13 @@ quick-format-unescaped@^4.0.3:
   resolved "https://registry.yarnpkg.com/quick-format-unescaped/-/quick-format-unescaped-4.0.3.tgz#6d6b66b8207aa2b35eef12be1421bb24c428f652"
   integrity sha512-MaL/oqh02mhEo5m5J2rwsVL23Iw2PEaGVHgT2vFt8AAsr0lfvQA5dpXo9TPu0rz7tSBdUPgkbam0j/fj5ZM8yg==
 
+queue@^6.0.2:
+  version "6.0.2"
+  resolved "https://registry.yarnpkg.com/queue/-/queue-6.0.2.tgz#b91525283e2315c7553d2efa18d83e76432fed65"
+  integrity sha512-iHZWu+q3IdFZFX36ro/lKBkSvfkztY5Y7HMiPlOUjhupPcG2JMfst2KKEpu5XndviX/3UhFbRngUPNKtgvtZiA==
+  dependencies:
+    inherits "~2.0.3"
+
 quick-lru@^1.0.0:
   version "1.1.0"
   resolved "https://registry.yarnpkg.com/quick-lru/-/quick-lru-1.1.0.tgz#4360b17c61136ad38078397ff11416e186dcfbb8"