|
@@ -13,14 +13,13 @@ import {
|
|
DataObjectInfo,
|
|
DataObjectInfo,
|
|
StorageNodeDownloadResponse,
|
|
StorageNodeDownloadResponse,
|
|
DownloadData,
|
|
DownloadData,
|
|
- PendingDownloadData,
|
|
|
|
- PendingDownloadStatus,
|
|
|
|
} from '../../types'
|
|
} from '../../types'
|
|
import queue from 'queue'
|
|
import queue from 'queue'
|
|
import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
|
|
import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
|
|
import http from 'http'
|
|
import http from 'http'
|
|
import https from 'https'
|
|
import https from 'https'
|
|
import { parseAxiosError } from '../parsers/errors'
|
|
import { parseAxiosError } from '../parsers/errors'
|
|
|
|
+import { PendingDownload, PendingDownloadStatusType } from './PendingDownload'
|
|
|
|
|
|
// Concurrency limits
|
|
// Concurrency limits
|
|
export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_OBJECT = 10
|
|
export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_OBJECT = 10
|
|
@@ -61,6 +60,9 @@ export class NetworkingService {
|
|
}
|
|
}
|
|
)
|
|
)
|
|
this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
|
|
this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
|
|
|
|
+ this.downloadQueue.on('error', (err) => {
|
|
|
|
+ this.logger.error('Data object download failed', { err })
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
private validateNodeEndpoint(endpoint: string): void {
|
|
private validateNodeEndpoint(endpoint: string): void {
|
|
@@ -182,6 +184,13 @@ export class NetworkingService {
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
|
|
|
|
+ availabilityQueue.on('error', () => {
|
|
|
|
+ /*
|
|
|
|
+ Do nothing.
|
|
|
|
+ The handler is needed to avoid unhandled promise rejection
|
|
|
|
+ */
|
|
|
|
+ })
|
|
|
|
+
|
|
return availabilityQueue
|
|
return availabilityQueue
|
|
}
|
|
}
|
|
|
|
|
|
@@ -233,13 +242,6 @@ export class NetworkingService {
|
|
return resolve(endpoint)
|
|
return resolve(endpoint)
|
|
})
|
|
})
|
|
|
|
|
|
- availabilityQueue.on('error', () => {
|
|
|
|
- /*
|
|
|
|
- Do nothing.
|
|
|
|
- The handler is needed to avoid unhandled promise rejection
|
|
|
|
- */
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
availabilityQueue.on('end', () => {
|
|
availabilityQueue.on('end', () => {
|
|
return reject(new Error('Failed to find data object download source'))
|
|
return reject(new Error('Failed to find data object download source'))
|
|
})
|
|
})
|
|
@@ -247,7 +249,7 @@ export class NetworkingService {
|
|
}
|
|
}
|
|
|
|
|
|
private downloadJob(
|
|
private downloadJob(
|
|
- pendingDownload: PendingDownloadData,
|
|
|
|
|
|
+ pendingDownload: PendingDownload,
|
|
downloadData: DownloadData,
|
|
downloadData: DownloadData,
|
|
onSourceFound: (response: StorageNodeDownloadResponse) => void,
|
|
onSourceFound: (response: StorageNodeDownloadResponse) => void,
|
|
onError: (error: Error) => void,
|
|
onError: (error: Error) => void,
|
|
@@ -258,7 +260,7 @@ export class NetworkingService {
|
|
startAt,
|
|
startAt,
|
|
} = downloadData
|
|
} = downloadData
|
|
|
|
|
|
- pendingDownload.status = PendingDownloadStatus.LookingForSource
|
|
|
|
|
|
+ pendingDownload.setStatus({ type: PendingDownloadStatusType.LookingForSource })
|
|
|
|
|
|
return new Promise<void>((resolve, reject) => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
// Handlers:
|
|
// Handlers:
|
|
@@ -268,9 +270,13 @@ export class NetworkingService {
|
|
reject(new Error(message))
|
|
reject(new Error(message))
|
|
}
|
|
}
|
|
|
|
|
|
- const sourceFound = (response: StorageNodeDownloadResponse) => {
|
|
|
|
- this.logger.info('Download source chosen', { objectId, source: response.config.url })
|
|
|
|
- pendingDownload.status = PendingDownloadStatus.Downloading
|
|
|
|
|
|
+ const sourceFound = (endpoint: string, response: StorageNodeDownloadResponse) => {
|
|
|
|
+ this.logger.info('Download source chosen', { objectId, source: endpoint })
|
|
|
|
+ pendingDownload.setStatus({
|
|
|
|
+ type: PendingDownloadStatusType.Downloading,
|
|
|
|
+ source: endpoint,
|
|
|
|
+ contentType: response.headers['content-type'],
|
|
|
|
+ })
|
|
onSourceFound(response)
|
|
onSourceFound(response)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -291,7 +297,7 @@ export class NetworkingService {
|
|
})),
|
|
})),
|
|
})
|
|
})
|
|
if (!storageEndpoints.length) {
|
|
if (!storageEndpoints.length) {
|
|
- return fail('No storage endpoints available to download the data object from')
|
|
|
|
|
|
+ return fail(`No storage endpoints available to download the data object: ${objectId}`)
|
|
}
|
|
}
|
|
|
|
|
|
const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
|
|
const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
|
|
@@ -302,21 +308,14 @@ export class NetworkingService {
|
|
const job = async () => {
|
|
const job = async () => {
|
|
const api = new StorageNodeApi(endpoint, this.logging, this.config)
|
|
const api = new StorageNodeApi(endpoint, this.logging, this.config)
|
|
const response = await api.downloadObject(objectId, startAt)
|
|
const response = await api.downloadObject(objectId, startAt)
|
|
- return response
|
|
|
|
|
|
+ return [endpoint, response]
|
|
}
|
|
}
|
|
objectDownloadQueue.push(job)
|
|
objectDownloadQueue.push(job)
|
|
})
|
|
})
|
|
|
|
|
|
- availabilityQueue.on('error', () => {
|
|
|
|
- /*
|
|
|
|
- Do nothing.
|
|
|
|
- The handler is needed to avoid unhandled promise rejection
|
|
|
|
- */
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
availabilityQueue.on('end', () => {
|
|
availabilityQueue.on('end', () => {
|
|
if (!objectDownloadQueue.length) {
|
|
if (!objectDownloadQueue.length) {
|
|
- fail('Failed to download the object from any availablable storage provider')
|
|
|
|
|
|
+ fail(`Failed to download object ${objectId} from any availablable storage provider`)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
|
|
|
|
@@ -328,15 +327,15 @@ export class NetworkingService {
|
|
if (availabilityQueue.length) {
|
|
if (availabilityQueue.length) {
|
|
availabilityQueue.start()
|
|
availabilityQueue.start()
|
|
} else {
|
|
} else {
|
|
- fail('Failed to download the object from any availablable storage provider')
|
|
|
|
|
|
+ fail(`Failed to download object ${objectId} from any availablable storage provider`)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
|
|
|
|
- objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
|
|
|
|
|
|
+ objectDownloadQueue.on('success', ([endpoint, response]: [string, StorageNodeDownloadResponse]) => {
|
|
availabilityQueue.removeAllListeners().end()
|
|
availabilityQueue.removeAllListeners().end()
|
|
objectDownloadQueue.removeAllListeners().end()
|
|
objectDownloadQueue.removeAllListeners().end()
|
|
response.data.on('close', finish).on('error', finish).on('end', finish)
|
|
response.data.on('close', finish).on('error', finish).on('end', finish)
|
|
- sourceFound(response)
|
|
|
|
|
|
+ sourceFound(endpoint, response)
|
|
})
|
|
})
|
|
})
|
|
})
|
|
}
|
|
}
|
|
@@ -345,23 +344,17 @@ export class NetworkingService {
|
|
const {
|
|
const {
|
|
objectData: { objectId, size },
|
|
objectData: { objectId, size },
|
|
} = downloadData
|
|
} = downloadData
|
|
-
|
|
|
|
if (this.stateCache.getPendingDownload(objectId)) {
|
|
if (this.stateCache.getPendingDownload(objectId)) {
|
|
// Already downloading
|
|
// Already downloading
|
|
return null
|
|
return null
|
|
}
|
|
}
|
|
-
|
|
|
|
- let resolveDownload: (response: StorageNodeDownloadResponse) => void, rejectDownload: (err: Error) => void
|
|
|
|
- const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
|
|
|
|
- resolveDownload = resolve
|
|
|
|
- rejectDownload = reject
|
|
|
|
|
|
+ const pendingDownload = this.stateCache.addPendingDownload(new PendingDownload(objectId, size))
|
|
|
|
+ return new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
|
|
|
|
+ const onSourceFound = resolve
|
|
|
|
+ const onError = reject
|
|
|
|
+ // Queue the download
|
|
|
|
+ this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, onSourceFound, onError))
|
|
})
|
|
})
|
|
-
|
|
|
|
- // Queue the download
|
|
|
|
- const pendingDownload = this.stateCache.newPendingDownload(objectId, size, downloadPromise)
|
|
|
|
- this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, resolveDownload, rejectDownload))
|
|
|
|
-
|
|
|
|
- return downloadPromise
|
|
|
|
}
|
|
}
|
|
|
|
|
|
async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
|
|
async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
|