NetworkingService.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import { ReadonlyConfig } from '../../types/config'
  2. import { QueryNodeApi } from './query-node/api'
  3. import { Logger } from 'winston'
  4. import { LoggingService } from '../logging'
  5. import { StorageNodeApi } from './storage-node/api'
  6. import { PendingDownloadData, StateCacheService } from '../cache/StateCacheService'
  7. import { DataObjectDetailsFragment } from './query-node/generated/queries'
  8. import axios, { AxiosRequestConfig } from 'axios'
  9. import {
  10. StorageNodeEndpointData,
  11. DataObjectAccessPoints,
  12. DataObjectData,
  13. DataObjectInfo,
  14. StorageNodeDownloadResponse,
  15. DownloadData,
  16. } from '../../types'
  17. import queue from 'queue'
  18. import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
  19. import http from 'http'
  20. import https from 'https'
  21. import { parseAxiosError } from '../parsers/errors'
  22. // Concurrency limits
  23. export const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10
  24. export const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
  25. export class NetworkingService {
  26. private config: ReadonlyConfig
  27. private queryNodeApi: QueryNodeApi
  28. // private runtimeApi: RuntimeApi
  29. private logging: LoggingService
  30. private stateCache: StateCacheService
  31. private logger: Logger
  32. private testLatencyQueue: queue
  33. private downloadQueue: queue
  34. constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
  35. axios.defaults.timeout = config.limits.outboundRequestsTimeout
  36. const httpConfig: http.AgentOptions | https.AgentOptions = {
  37. keepAlive: true,
  38. timeout: config.limits.outboundRequestsTimeout,
  39. maxSockets: config.limits.maxConcurrentOutboundConnections,
  40. }
  41. axios.defaults.httpAgent = new http.Agent(httpConfig)
  42. axios.defaults.httpsAgent = new https.Agent(httpConfig)
  43. this.config = config
  44. this.logging = logging
  45. this.stateCache = stateCache
  46. this.logger = logging.createLogger('NetworkingManager')
  47. this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
  48. // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
  49. void this.checkActiveStorageNodeEndpoints()
  50. // Queues
  51. this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
  52. 'end',
  53. () => {
  54. this.logger.verbose('Mean response times updated', {
  55. responseTimes: this.stateCache.getStorageNodeEndpointsMeanResponseTimes(),
  56. })
  57. }
  58. )
  59. this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
  60. }
  61. private validateNodeEndpoint(endpoint: string): void {
  62. const endpointUrl = new URL(endpoint)
  63. if (endpointUrl.protocol !== 'http:' && endpointUrl.protocol !== 'https:') {
  64. throw new Error(`Invalid endpoint protocol: ${endpointUrl.protocol}`)
  65. }
  66. }
  67. private filterStorageNodeEndpoints(input: StorageNodeEndpointData[]): StorageNodeEndpointData[] {
  68. return input.filter((b) => {
  69. try {
  70. this.validateNodeEndpoint(b.endpoint)
  71. return true
  72. } catch (err) {
  73. this.logger.warn(`Invalid storage node endpoint: ${b.endpoint} for bucket ${b.bucketId}`, {
  74. bucketId: b.bucketId,
  75. endpoint: b.endpoint,
  76. err,
  77. '@pauseFor': 900,
  78. })
  79. return false
  80. }
  81. })
  82. }
  83. private getApiEndpoint(rootEndpoint: string) {
  84. return rootEndpoint.endsWith('/') ? rootEndpoint + 'api/v1' : rootEndpoint + '/api/v1'
  85. }
  86. private prepareStorageNodeEndpoints(details: DataObjectDetailsFragment) {
  87. const endpointsData = details.storageBag.storageAssignments
  88. .filter(
  89. (a) =>
  90. a.storageBucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive' &&
  91. a.storageBucket.operatorMetadata?.nodeEndpoint
  92. )
  93. .map((a) => {
  94. const rootEndpoint = a.storageBucket.operatorMetadata!.nodeEndpoint!
  95. const apiEndpoint = this.getApiEndpoint(rootEndpoint)
  96. return {
  97. bucketId: a.storageBucket.id,
  98. endpoint: apiEndpoint,
  99. }
  100. })
  101. return this.filterStorageNodeEndpoints(endpointsData)
  102. }
  103. private parseDataObjectAccessPoints(details: DataObjectDetailsFragment): DataObjectAccessPoints {
  104. return {
  105. storageNodes: this.prepareStorageNodeEndpoints(details),
  106. }
  107. }
  108. public async dataObjectInfo(objectId: string): Promise<DataObjectInfo> {
  109. const details = await this.queryNodeApi.getDataObjectDetails(objectId)
  110. return {
  111. exists: !!details,
  112. isSupported:
  113. (this.config.buckets === 'all' &&
  114. details?.storageBag.distirbutionAssignments.some((d) =>
  115. d.distributionBucket.operators.some(
  116. (o) => o.workerId === this.config.workerId && o.status === DistributionBucketOperatorStatus.Active
  117. )
  118. )) ||
  119. (Array.isArray(this.config.buckets) &&
  120. this.config.buckets.some((bucketId) =>
  121. details?.storageBag.distirbutionAssignments
  122. .map((a) => a.distributionBucket.id)
  123. .includes(bucketId.toString())
  124. )),
  125. data: details
  126. ? {
  127. objectId,
  128. accessPoints: this.parseDataObjectAccessPoints(details),
  129. contentHash: details.ipfsHash,
  130. size: parseInt(details.size),
  131. }
  132. : undefined,
  133. }
  134. }
  135. private sortEndpointsByMeanResponseTime(endpoints: string[]) {
  136. return endpoints.sort(
  137. (a, b) =>
  138. this.stateCache.getStorageNodeEndpointMeanResponseTime(a) -
  139. this.stateCache.getStorageNodeEndpointMeanResponseTime(b)
  140. )
  141. }
  142. private downloadJob(
  143. pendingDownload: PendingDownloadData,
  144. downloadData: DownloadData,
  145. onSourceFound: (response: StorageNodeDownloadResponse) => void,
  146. onError: (error: Error) => void,
  147. onFinished?: () => void
  148. ): Promise<void> {
  149. const {
  150. objectData: { objectId, accessPoints },
  151. startAt,
  152. } = downloadData
  153. pendingDownload.status = 'LookingForSource'
  154. return new Promise<void>((resolve, reject) => {
  155. // Handlers:
  156. const fail = (message: string) => {
  157. this.stateCache.dropPendingDownload(objectId)
  158. onError(new Error(message))
  159. reject(new Error(message))
  160. }
  161. const sourceFound = (response: StorageNodeDownloadResponse) => {
  162. this.logger.info('Download source chosen', { objectId, source: response.config.url })
  163. pendingDownload.status = 'Downloading'
  164. onSourceFound(response)
  165. }
  166. const finish = () => {
  167. onFinished && onFinished()
  168. resolve()
  169. }
  170. const storageEndpoints = this.sortEndpointsByMeanResponseTime(
  171. accessPoints?.storageNodes.map((n) => n.endpoint) || []
  172. )
  173. this.logger.info('Downloading new data object', {
  174. objectId,
  175. possibleSources: storageEndpoints.map((e) => ({
  176. endpoint: e,
  177. meanResponseTime: this.stateCache.getStorageNodeEndpointMeanResponseTime(e),
  178. })),
  179. })
  180. if (!storageEndpoints.length) {
  181. return fail('No storage endpoints available to download the data object from')
  182. }
  183. const availabilityQueue = queue({
  184. concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD,
  185. autostart: true,
  186. })
  187. const objectDownloadQueue = queue({ concurrency: 1, autostart: true })
  188. storageEndpoints.forEach(async (endpoint) => {
  189. availabilityQueue.push(async () => {
  190. const api = new StorageNodeApi(endpoint, this.logging)
  191. const available = await api.isObjectAvailable(objectId)
  192. if (!available) {
  193. throw new Error('Not avilable')
  194. }
  195. return endpoint
  196. })
  197. })
  198. availabilityQueue.on('success', (endpoint) => {
  199. availabilityQueue.stop()
  200. const job = async () => {
  201. const api = new StorageNodeApi(endpoint, this.logging)
  202. const response = await api.downloadObject(objectId, startAt)
  203. return response
  204. }
  205. objectDownloadQueue.push(job)
  206. })
  207. availabilityQueue.on('error', () => {
  208. /*
  209. Do nothing.
  210. The handler is needed to avoid unhandled promise rejection
  211. */
  212. })
  213. availabilityQueue.on('end', () => {
  214. if (!objectDownloadQueue.length) {
  215. fail('Failed to download the object from any availablable storage provider')
  216. }
  217. })
  218. objectDownloadQueue.on('error', (err) => {
  219. this.logger.error('Download attempt from storage node failed after availability was confirmed:', { err })
  220. })
  221. objectDownloadQueue.on('end', () => {
  222. if (availabilityQueue.length) {
  223. availabilityQueue.start()
  224. } else {
  225. fail('Failed to download the object from any availablable storage provider')
  226. }
  227. })
  228. objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
  229. availabilityQueue.removeAllListeners().end()
  230. objectDownloadQueue.removeAllListeners().end()
  231. response.data.on('close', finish).on('error', finish).on('end', finish)
  232. sourceFound(response)
  233. })
  234. })
  235. }
  236. public downloadDataObject(downloadData: DownloadData): Promise<StorageNodeDownloadResponse> | null {
  237. const {
  238. objectData: { objectId, size },
  239. } = downloadData
  240. if (this.stateCache.getPendingDownload(objectId)) {
  241. // Already downloading
  242. return null
  243. }
  244. let resolveDownload: (response: StorageNodeDownloadResponse) => void, rejectDownload: (err: Error) => void
  245. const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
  246. resolveDownload = resolve
  247. rejectDownload = reject
  248. })
  249. // Queue the download
  250. const pendingDownload = this.stateCache.newPendingDownload(objectId, size, downloadPromise)
  251. this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, resolveDownload, rejectDownload))
  252. return downloadPromise
  253. }
  254. async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
  255. const data =
  256. this.config.buckets === 'all'
  257. ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
  258. : await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
  259. const objectsData = new Map<string, DataObjectData>()
  260. data.forEach((bucket) => {
  261. bucket.bagAssignments.forEach((a) => {
  262. a.storageBag.objects.forEach((object) => {
  263. const { ipfsHash, id, size } = object
  264. objectsData.set(id, { contentHash: ipfsHash, objectId: id, size: parseInt(size) })
  265. })
  266. })
  267. })
  268. return objectsData
  269. }
  270. async checkActiveStorageNodeEndpoints(): Promise<void> {
  271. try {
  272. const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
  273. const endpoints = this.filterStorageNodeEndpoints(
  274. activeStorageOperators.map(({ id, operatorMetadata }) => ({
  275. bucketId: id,
  276. endpoint: this.getApiEndpoint(operatorMetadata!.nodeEndpoint!),
  277. }))
  278. )
  279. this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
  280. endpoints.forEach(({ endpoint }) =>
  281. this.testLatencyQueue.push(async () => {
  282. await this.checkResponseTime(endpoint)
  283. })
  284. )
  285. } catch (err) {
  286. this.logger.error("Couldn't check active storage node endpooints", { err })
  287. }
  288. }
  289. async checkResponseTime(endpoint: string): Promise<void> {
  290. const start = Date.now()
  291. this.logger.debug(`Sending storage node response-time check request to: ${endpoint}`, { endpoint })
  292. try {
  293. const api = new StorageNodeApi(endpoint, this.logging)
  294. const reqConfig: AxiosRequestConfig = { headers: { connection: 'close' } }
  295. await api.stateApi.stateApiGetVersion(reqConfig)
  296. const responseTime = Date.now() - start
  297. this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
  298. this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)
  299. } catch (err) {
  300. if (axios.isAxiosError(err)) {
  301. const parsedErr = parseAxiosError(err)
  302. this.logger.warn(`${endpoint} check request error: ${parsedErr.message}`, {
  303. endpoint,
  304. err: parsedErr,
  305. '@pauseFor': 900,
  306. })
  307. } else {
  308. const message = err instanceof Error ? err.message : 'Unknown'
  309. this.logger.error(`${endpoint} check unexpected error: ${message}`, { endpoint, err, '@pauseFor': 900 })
  310. }
  311. }
  312. }
  313. }