tasks.ts 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import fs from 'fs'
  2. import path from 'path'
  3. import { pipeline } from 'stream'
  4. import { promisify } from 'util'
  5. import superagent from 'superagent'
  6. import urljoin from 'url-join'
  7. import { v4 as uuidv4 } from 'uuid'
  8. import logger from '../../services/logger'
  9. import _ from 'lodash'
  10. import { getRemoteDataObjects } from './remoteStorageData'
  11. import { TaskSink } from './workingProcess'
  12. import { isNewDataObject } from '../caching/newUploads'
  13. import { addDataObjectIdToCache, deleteDataObjectIdFromCache } from '../caching/localDataObjects'
  14. import { hashFile } from '../helpers/hashing'
  15. import { parseBagId } from '../helpers/bagTypes'
  16. import { hexToString } from '@polkadot/util'
  17. import { ApiPromise } from '@polkadot/api'
  18. const fsPromises = fs.promises
  19. /**
  20. * Defines syncronization task abstraction.
  21. */
  22. export interface SyncTask {
  23. /**
  24. * Returns human-friendly task description.
  25. */
  26. description(): string
  27. /**
  28. * Performs the task.
  29. */
  30. execute(): Promise<void>
  31. }
  32. /**
  33. * Deletes the file in the local storage by its name.
  34. */
  35. export class DeleteLocalFileTask implements SyncTask {
  36. uploadsDirectory: string
  37. filename: string
  38. constructor(uploadsDirectory: string, filename: string) {
  39. this.uploadsDirectory = uploadsDirectory
  40. this.filename = filename
  41. }
  42. description(): string {
  43. return `Sync - deleting local file: ${this.filename} ....`
  44. }
  45. async execute(): Promise<void> {
  46. const dataObjectId = this.filename
  47. if (isNewDataObject(dataObjectId)) {
  48. logger.warn(`Sync - possible QueryNode update delay (new file) - deleting file canceled: ${this.filename}`)
  49. return
  50. }
  51. const fullPath = path.join(this.uploadsDirectory, this.filename)
  52. await fsPromises.unlink(fullPath)
  53. await deleteDataObjectIdFromCache(dataObjectId)
  54. }
  55. }
  56. /**
  57. * Download the file from the remote storage node to the local storage.
  58. */
  59. export class DownloadFileTask implements SyncTask {
  60. dataObjectId: string
  61. expectedHash?: string
  62. uploadsDirectory: string
  63. tempDirectory: string
  64. url: string
  65. downloadTimeout: number
  66. constructor(
  67. baseUrl: string,
  68. dataObjectId: string,
  69. expectedHash: string | undefined,
  70. uploadsDirectory: string,
  71. tempDirectory: string,
  72. downloadTimeout: number
  73. ) {
  74. this.dataObjectId = dataObjectId
  75. this.expectedHash = expectedHash
  76. this.uploadsDirectory = uploadsDirectory
  77. this.tempDirectory = tempDirectory
  78. this.downloadTimeout = downloadTimeout
  79. this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId)
  80. }
  81. description(): string {
  82. return `Sync - downloading file: ${this.url} to ${this.uploadsDirectory} ....`
  83. }
  84. async execute(): Promise<void> {
  85. const streamPipeline = promisify(pipeline)
  86. const filepath = path.join(this.uploadsDirectory, this.dataObjectId)
  87. // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
  88. // This partial downloads will be cleaned up during the next sync iteration.
  89. const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
  90. try {
  91. const timeoutMs = this.downloadTimeout * 60 * 1000
  92. // Casting because of:
  93. // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
  94. const request = (superagent.get(this.url).timeout(timeoutMs) as unknown) as NodeJS.ReadableStream
  95. const fileStream = fs.createWriteStream(tempFilePath)
  96. request.on('response', (res) => {
  97. if (!res.ok) {
  98. logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`)
  99. }
  100. })
  101. await streamPipeline(request, fileStream)
  102. await this.verifyDownloadedFile(tempFilePath)
  103. await fsPromises.rename(tempFilePath, filepath)
  104. await addDataObjectIdToCache(this.dataObjectId)
  105. } catch (err) {
  106. logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err })
  107. try {
  108. logger.warn(`Cleaning up file ${tempFilePath}`)
  109. await fsPromises.unlink(tempFilePath)
  110. } catch (err) {
  111. logger.error(`Sync - cannot cleanup file ${tempFilePath}: ${err}`, { err })
  112. }
  113. }
  114. }
  115. /** Compares expected and real IPFS hashes
  116. *
  117. * @param filePath downloaded file path
  118. */
  119. async verifyDownloadedFile(filePath: string): Promise<void> {
  120. if (!_.isEmpty(this.expectedHash)) {
  121. const hash = await hashFile(filePath)
  122. if (hash !== this.expectedHash) {
  123. throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`)
  124. }
  125. }
  126. }
  127. }
  128. /**
  129. * Resolve remote storage node URLs and creates file downloading tasks (DownloadFileTask).
  130. */
  131. export class PrepareDownloadFileTask implements SyncTask {
  132. bagId: string
  133. dataObjectId: string
  134. operatorUrlCandidates: string[]
  135. taskSink: TaskSink
  136. uploadsDirectory: string
  137. tempDirectory: string
  138. api?: ApiPromise
  139. downloadTimeout: number
  140. constructor(
  141. operatorUrlCandidates: string[],
  142. bagId: string,
  143. dataObjectId: string,
  144. uploadsDirectory: string,
  145. tempDirectory: string,
  146. taskSink: TaskSink,
  147. downloadTimeout: number,
  148. api?: ApiPromise
  149. ) {
  150. this.api = api
  151. this.bagId = bagId
  152. this.dataObjectId = dataObjectId
  153. this.taskSink = taskSink
  154. this.operatorUrlCandidates = operatorUrlCandidates
  155. this.uploadsDirectory = uploadsDirectory
  156. this.tempDirectory = tempDirectory
  157. this.downloadTimeout = downloadTimeout
  158. }
  159. description(): string {
  160. return `Sync - preparing for download of: ${this.dataObjectId} ....`
  161. }
  162. async execute(): Promise<void> {
  163. // Create an array of operator URL indices to maintain a random URL choice
  164. // cannot use the original array because we shouldn't modify the original data.
  165. // And cloning it seems like a heavy operation.
  166. const operatorUrlIndices: number[] = [...Array(this.operatorUrlCandidates.length).keys()]
  167. if (_.isEmpty(this.bagId)) {
  168. logger.error(`Sync - invalid task - no bagId for ${this.dataObjectId}`)
  169. return
  170. }
  171. while (!_.isEmpty(operatorUrlIndices)) {
  172. const randomUrlIndex = _.sample(operatorUrlIndices)
  173. if (randomUrlIndex === undefined) {
  174. logger.warn(`Sync - cannot get a random URL`)
  175. break
  176. }
  177. const randomUrl = this.operatorUrlCandidates[randomUrlIndex]
  178. logger.debug(`Sync - random storage node URL was chosen ${randomUrl}`)
  179. // Remove random url from the original list.
  180. _.remove(operatorUrlIndices, (index) => index === randomUrlIndex)
  181. try {
  182. const chosenBaseUrl = randomUrl
  183. const [remoteOperatorIds, hash] = await Promise.all([
  184. getRemoteDataObjects(chosenBaseUrl),
  185. this.getExpectedHash(),
  186. ])
  187. if (remoteOperatorIds.includes(this.dataObjectId)) {
  188. const newTask = new DownloadFileTask(
  189. chosenBaseUrl,
  190. this.dataObjectId,
  191. hash,
  192. this.uploadsDirectory,
  193. this.tempDirectory,
  194. this.downloadTimeout
  195. )
  196. return this.taskSink.add([newTask])
  197. }
  198. } catch (err) {
  199. logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err })
  200. }
  201. }
  202. logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`)
  203. }
  204. async getExpectedHash(): Promise<string | undefined> {
  205. if (this.api !== undefined) {
  206. const convertedBagId = parseBagId(this.bagId)
  207. const dataObject = await this.api.query.storage.dataObjectsById(convertedBagId, this.dataObjectId)
  208. return hexToString(dataObject.ipfsContentId.toString())
  209. }
  210. return undefined
  211. }
  212. }