@@ -3,9 +3,9 @@ import { QueryNodeApi } from './query-node/api'
import { Logger } from 'winston'
import { LoggingService } from '../logging'
import { StorageNodeApi } from './storage-node/api'
-import { PendingDownloadData, StateCacheService } from '../cache/StateCacheService'
+import { StateCacheService } from '../cache/StateCacheService'
import { DataObjectDetailsFragment } from './query-node/generated/queries'
-import axios, { AxiosRequestConfig } from 'axios'
+import axios from 'axios'
import {
@@ -19,15 +19,15 @@ import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
import http from 'http'
import https from 'https'
import { parseAxiosError } from '../parsers/errors'
+import { PendingDownload, PendingDownloadStatusType } from './PendingDownload'
// Concurrency limits
export class NetworkingService {
private config: ReadonlyConfig
private queryNodeApi: QueryNodeApi
- // private runtimeApi: RuntimeApi
private logging: LoggingService
private stateCache: StateCacheService
private logger: Logger
@@ -36,10 +36,10 @@ export class NetworkingService {
private downloadQueue: queue
constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
- axios.defaults.timeout = config.limits.outboundRequestsTimeout
+ axios.defaults.timeout = config.limits.outboundRequestsTimeoutMs
const httpConfig: http.AgentOptions | https.AgentOptions = {
keepAlive: true,
- timeout: config.limits.outboundRequestsTimeout,
+ timeout: config.limits.outboundRequestsTimeoutMs,
maxSockets: config.limits.maxConcurrentOutboundConnections,
axios.defaults.httpAgent = new http.Agent(httpConfig)
@@ -49,7 +49,6 @@ export class NetworkingService {
this.stateCache = stateCache
this.logger = logging.createLogger('NetworkingManager')
this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
- // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
void this.checkActiveStorageNodeEndpoints()
// Queues
this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
@@ -61,6 +60,9 @@ export class NetworkingService {
this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
+ this.downloadQueue.on('error', (err) => {
+ this.logger.error('Data object download failed', { err })
+ })
private validateNodeEndpoint(endpoint: string): void {
@@ -92,17 +94,13 @@ export class NetworkingService {
private prepareStorageNodeEndpoints(details: DataObjectDetailsFragment) {
- const endpointsData = details.storageBag.storageAssignments
- .filter(
- (a) =>
- a.storageBucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive' &&
- a.storageBucket.operatorMetadata?.nodeEndpoint
- )
- .map((a) => {
- const rootEndpoint = a.storageBucket.operatorMetadata!.nodeEndpoint!
- const apiEndpoint = this.getApiEndpoint(rootEndpoint)
+ const endpointsData = details.storageBag.storageBuckets
+ .filter((bucket) => bucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive')
+ .map((bucket) => {
+ const rootEndpoint = bucket.operatorMetadata?.nodeEndpoint
+ const apiEndpoint = rootEndpoint ? this.getApiEndpoint(rootEndpoint) : ''
return {
- bucketId: a.storageBucket.id,
+ bucketId: bucket.id,
endpoint: apiEndpoint,
@@ -116,32 +114,42 @@ export class NetworkingService {
+ private getDataObjectActiveDistributorsSet(objectDetails: DataObjectDetailsFragment): Set<number> {
+ const activeDistributorsSet = new Set<number>()
+ const { distributionBuckets } = objectDetails.storageBag
+ for (const bucket of distributionBuckets) {
+ for (const operator of bucket.operators) {
+ if (operator.status === DistributionBucketOperatorStatus.Active) {
+ activeDistributorsSet.add(operator.workerId)
+ }
+ }
+ }
+ return activeDistributorsSet
+ }
public async dataObjectInfo(objectId: string): Promise<DataObjectInfo> {
const details = await this.queryNodeApi.getDataObjectDetails(objectId)
- return {
- exists: !!details,
- isSupported:
- (this.config.buckets === 'all' &&
- details?.storageBag.distirbutionAssignments.some((d) =>
- d.distributionBucket.operators.some(
- (o) => o.workerId === this.config.workerId && o.status === DistributionBucketOperatorStatus.Active
- )
- )) ||
- (Array.isArray(this.config.buckets) &&
- this.config.buckets.some((bucketId) =>
- details?.storageBag.distirbutionAssignments
- .map((a) => a.distributionBucket.id)
- .includes(bucketId.toString())
- )),
- data: details
- ? {
- objectId,
- accessPoints: this.parseDataObjectAccessPoints(details),
- contentHash: details.ipfsHash,
- size: parseInt(details.size),
- }
- : undefined,
+ let exists = false
+ let isSupported = false
+ let data: DataObjectData | undefined
+ if (details) {
+ exists = true
+ if (!this.config.buckets) {
+ const distributors = this.getDataObjectActiveDistributorsSet(details)
+ isSupported = typeof this.config.workerId === 'number' ? distributors.has(this.config.workerId) : false
+ } else {
+ const supportedBucketIds = this.config.buckets.map((id) => id.toString())
+ isSupported = details.storageBag.distributionBuckets.some((b) => supportedBucketIds.includes(b.id))
+ }
+ data = {
+ objectId,
+ accessPoints: this.parseDataObjectAccessPoints(details),
+ contentHash: details.ipfsHash,
+ size: parseInt(details.size),
+ }
+ return { exists, isSupported, data }
private sortEndpointsByMeanResponseTime(endpoints: string[]) {
@@ -152,8 +160,93 @@ export class NetworkingService {
+ private async checkObjectAvailability(objectId: string, endpoint: string): Promise<void> {
+ const api = new StorageNodeApi(endpoint, this.logging, this.config)
+ const available = await api.isObjectAvailable(objectId)
+ if (!available) {
+ throw new Error('Not available')
+ }
+ }
+ private createDataObjectAvailabilityCheckQueue(objectId: string, storageEndpoints: string[]) {
+ const availabilityQueue = queue({
+ autostart: true,
+ })
+ storageEndpoints.forEach(async (endpoint) => {
+ availabilityQueue.push(async () => {
+ await this.checkObjectAvailability(objectId, endpoint)
+ return endpoint
+ })
+ })
+ availabilityQueue.on('error', () => {
+ /*
+ Do nothing.
+ The handler is needed to avoid unhandled promise rejection
+ */
+ })
+ return availabilityQueue
+ }
+ public async getDataObjectDownloadSource(objectData: DataObjectData): Promise<string> {
+ const { objectId } = objectData
+ const cachedSource = await this.checkCachedDataObjectSource(objectId)
+ if (cachedSource) {
+ this.logger.info(`Found active download source for object ${objectId} in cache`, { objectId, cachedSource })
+ return cachedSource
+ }
+ return this.findDataObjectDownloadSource(objectData)
+ }
+ private async checkCachedDataObjectSource(objectId: string): Promise<string | undefined> {
+ const cachedSource = this.stateCache.getCachedDataObjectSource(objectId)
+ if (cachedSource) {
+ try {
+ await this.checkObjectAvailability(objectId, cachedSource)
+ } catch (err) {
+ this.stateCache.dropCachedDataObjectSource(objectId, cachedSource)
+ return undefined
+ }
+ return cachedSource
+ }
+ }
+ private findDataObjectDownloadSource({ objectId, accessPoints }: DataObjectData): Promise<string> {
+ return new Promise((resolve, reject) => {
+ const storageEndpoints = this.sortEndpointsByMeanResponseTime(
+ accessPoints?.storageNodes.map((n) => n.endpoint) || []
+ )
+ this.logger.info('Looking for data object source', {
+ objectId,
+ possibleSources: storageEndpoints.map((e) => ({
+ endpoint: e,
+ meanResponseTime: this.stateCache.getStorageNodeEndpointMeanResponseTime(e),
+ })),
+ })
+ if (!storageEndpoints.length) {
+ return reject(new Error('No storage endpoints available to download the data object from'))
+ }
+ const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
+ availabilityQueue.on('success', (endpoint) => {
+ availabilityQueue.stop()
+ this.stateCache.cacheDataObjectSource(objectId, endpoint)
+ return resolve(endpoint)
+ })
+ availabilityQueue.on('end', () => {
+ return reject(new Error('Failed to find data object download source'))
+ })
+ })
+ }
private downloadJob(
- pendingDownload: PendingDownloadData,
+ pendingDownload: PendingDownload,
downloadData: DownloadData,
onSourceFound: (response: StorageNodeDownloadResponse) => void,
onError: (error: Error) => void,
@@ -164,7 +257,7 @@ export class NetworkingService {
} = downloadData
- pendingDownload.status = 'LookingForSource'
+ pendingDownload.setStatus({ type: PendingDownloadStatusType.LookingForSource })
return new Promise<void>((resolve, reject) => {
// Handlers:
@@ -174,9 +267,13 @@ export class NetworkingService {
reject(new Error(message))
- const sourceFound = (response: StorageNodeDownloadResponse) => {
- this.logger.info('Download source chosen', { objectId, source: response.config.url })
- pendingDownload.status = 'Downloading'
+ const sourceFound = (endpoint: string, response: StorageNodeDownloadResponse) => {
+ this.logger.info('Download source chosen', { objectId, source: endpoint })
+ pendingDownload.setStatus({
+ type: PendingDownloadStatusType.Downloading,
+ source: endpoint,
+ contentType: response.headers['content-type'],
+ })
@@ -197,46 +294,25 @@ export class NetworkingService {
if (!storageEndpoints.length) {
- return fail('No storage endpoints available to download the data object from')
+ return fail(`No storage endpoints available to download the data object: ${objectId}`)
- const availabilityQueue = queue({
- autostart: true,
- })
+ const availabilityQueue = this.createDataObjectAvailabilityCheckQueue(objectId, storageEndpoints)
const objectDownloadQueue = queue({ concurrency: 1, autostart: true })
- storageEndpoints.forEach(async (endpoint) => {
- availabilityQueue.push(async () => {
- const api = new StorageNodeApi(endpoint, this.logging)
- const available = await api.isObjectAvailable(objectId)
- if (!available) {
- throw new Error('Not avilable')
- }
- return endpoint
- })
- })
availabilityQueue.on('success', (endpoint) => {
const job = async () => {
- const api = new StorageNodeApi(endpoint, this.logging)
+ const api = new StorageNodeApi(endpoint, this.logging, this.config)
const response = await api.downloadObject(objectId, startAt)
- return response
+ return [endpoint, response]
- availabilityQueue.on('error', () => {
- /*
- Do nothing.
- The handler is needed to avoid unhandled promise rejection
- */
- })
availabilityQueue.on('end', () => {
if (!objectDownloadQueue.length) {
- fail('Failed to download the object from any availablable storage provider')
+ fail(`Failed to download object ${objectId} from any availablable storage provider`)
@@ -248,15 +324,15 @@ export class NetworkingService {
if (availabilityQueue.length) {
} else {
- fail('Failed to download the object from any availablable storage provider')
+ fail(`Failed to download object ${objectId} from any availablable storage provider`)
- objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
+ objectDownloadQueue.on('success', ([endpoint, response]: [string, StorageNodeDownloadResponse]) => {
response.data.on('close', finish).on('error', finish).on('end', finish)
- sourceFound(response)
+ sourceFound(endpoint, response)
@@ -265,34 +341,29 @@ export class NetworkingService {
const {
objectData: { objectId, size },
} = downloadData
if (this.stateCache.getPendingDownload(objectId)) {
// Already downloading
return null
- let resolveDownload: (response: StorageNodeDownloadResponse) => void, rejectDownload: (err: Error) => void
- const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
- resolveDownload = resolve
- rejectDownload = reject
+ const pendingDownload = this.stateCache.addPendingDownload(new PendingDownload(objectId, size))
+ return new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
+ const onSourceFound = resolve
+ const onError = reject
+ // Queue the download
+ this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, onSourceFound, onError))
- // Queue the download
- const pendingDownload = this.stateCache.newPendingDownload(objectId, size, downloadPromise)
- this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, resolveDownload, rejectDownload))
- return downloadPromise
async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
- const data =
- this.config.buckets === 'all'
- ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
- : await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
+ const data = this.config.buckets
+ ? await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
+ : typeof this.config.workerId === 'number'
+ ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
+ : []
const objectsData = new Map<string, DataObjectData>()
data.forEach((bucket) => {
- bucket.bagAssignments.forEach((a) => {
- a.storageBag.objects.forEach((object) => {
+ bucket.bags.forEach((bag) => {
+ bag.objects.forEach((object) => {
const { ipfsHash, id, size } = object
objectsData.set(id, { contentHash: ipfsHash, objectId: id, size: parseInt(size) })
@@ -308,7 +379,7 @@ export class NetworkingService {
const endpoints = this.filterStorageNodeEndpoints(
activeStorageOperators.map(({ id, operatorMetadata }) => ({
bucketId: id,
- endpoint: this.getApiEndpoint(operatorMetadata!.nodeEndpoint!),
+ endpoint: operatorMetadata?.nodeEndpoint ? this.getApiEndpoint(operatorMetadata.nodeEndpoint) : '',
this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
@@ -327,9 +398,8 @@ export class NetworkingService {
const start = Date.now()
this.logger.debug(`Sending storage node response-time check request to: ${endpoint}`, { endpoint })
try {
- const api = new StorageNodeApi(endpoint, this.logging)
- const reqConfig: AxiosRequestConfig = { headers: { connection: 'close' } }
- await api.stateApi.stateApiGetVersion(reqConfig)
+ const api = new StorageNodeApi(endpoint, this.logging, this.config)
+ await api.getVersion()
const responseTime = Date.now() - start
this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)