synchronizer.ts 8.5 KB


  1. import { getRuntimeModel, Model } from '../../services/sync/dataObjectsModel'
  2. import { getAvailableData } from '../../services/sync/remoteData'
  3. import logger from '../../services/logger'
  4. import _ from 'lodash'
  5. import fs from 'fs'
  6. import path from 'path'
  7. import { pipeline } from 'stream'
  8. import { promisify } from 'util'
  9. import superagent from 'superagent'
  10. import urljoin from 'url-join'
  11. import AwaitLock from 'await-lock'
  12. import sleep from 'sleep-promise'
  13. const fsPromises = fs.promises
  14. // TODO: use caching
  15. export async function getLocalDataObjects(
  16. uploadDirectory: string
  17. ): Promise<string[]> {
  18. const localCids = await getLocalFileNames(uploadDirectory)
  19. return localCids
  20. }
  21. export async function performSync(
  22. workerId: number,
  23. syncWorkersNumber: number,
  24. queryNodeUrl: string,
  25. uploadDirectory: string,
  26. operatorUrl?: string
  27. ): Promise<void> {
  28. logger.info('Started syncing...')
  29. const [model, files] = await Promise.all([
  30. getRuntimeModel(queryNodeUrl, workerId),
  31. getLocalFileNames(uploadDirectory),
  32. ])
  33. const requiredCids = model.dataObjects.map((obj) => obj.cid)
  34. const added = _.difference(requiredCids, files)
  35. const deleted = _.difference(files, requiredCids)
  36. logger.debug(`Sync - added objects: ${added.length}`)
  37. logger.debug(`Sync - deleted objects: ${deleted.length}`)
  38. const workingStack = new WorkingStack()
  39. const deletedTasks = deleted.map(
  40. (fileName) => new DeleteLocalFileTask(uploadDirectory, fileName)
  41. )
  42. let addedTasks: SyncTask[]
  43. if (operatorUrl !== null) {
  44. addedTasks = await getPrepareDownloadTasks(
  45. model,
  46. added,
  47. uploadDirectory,
  48. workingStack
  49. )
  50. } else {
  51. addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory)
  52. }
  53. logger.debug(`Sync - started processing...`)
  54. const processSpawner = new TaskProcessorSpawner(
  55. workingStack,
  56. syncWorkersNumber
  57. )
  58. await workingStack.add(addedTasks)
  59. await workingStack.add(deletedTasks)
  60. await processSpawner.process()
  61. logger.info('Sync ended.')
  62. }
  63. async function getLocalFileNames(directory: string): Promise<string[]> {
  64. return fsPromises.readdir(directory)
  65. }
  66. interface SyncTask {
  67. description(): string
  68. execute(): Promise<void>
  69. }
  70. class DeleteLocalFileTask implements SyncTask {
  71. uploadsDirectory: string
  72. filename: string
  73. constructor(uploadsDirectory: string, filename: string) {
  74. this.uploadsDirectory = uploadsDirectory
  75. this.filename = filename
  76. }
  77. description(): string {
  78. return `Sync - deleting local file: ${this.filename} ....`
  79. }
  80. async execute(): Promise<void> {
  81. const fullPath = path.join(this.uploadsDirectory, this.filename)
  82. return fsPromises.unlink(fullPath)
  83. }
  84. }
  85. class DownloadFileTask implements SyncTask {
  86. filepath: string
  87. url: string
  88. constructor(baseUrl: string, filename: string, uploadsDirectory: string) {
  89. this.filepath = path.join(uploadsDirectory, filename)
  90. this.url = urljoin(baseUrl, 'api/v1/files', filename)
  91. }
  92. description(): string {
  93. return `Sync - downloading file: ${this.url} as ${this.filepath} ....`
  94. }
  95. async execute(): Promise<void> {
  96. const streamPipeline = promisify(pipeline)
  97. try {
  98. const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
  99. // Casting because of:
  100. // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
  101. const request = superagent
  102. .get(this.url)
  103. .timeout(timeoutMs) as unknown as NodeJS.ReadableStream
  104. const fileStream = fs.createWriteStream(this.filepath)
  105. await streamPipeline(request, fileStream)
  106. } catch (err) {
  107. logger.error(`Sync - fetching data error for ${this.url}: ${err}`)
  108. try {
  109. logger.warn(`Cleaning up file ${this.filepath}`)
  110. await fs.unlinkSync(this.filepath)
  111. } catch (err) {
  112. logger.error(`Sync - cannot cleanup file ${this.filepath}: ${err}`)
  113. }
  114. }
  115. }
  116. }
  117. interface TaskSink {
  118. add(tasks: SyncTask[]): Promise<void>
  119. }
  120. interface TaskSource {
  121. get(): Promise<SyncTask | null>
  122. }
  123. class WorkingStack implements TaskSink, TaskSource {
  124. workingStack: SyncTask[]
  125. lock: AwaitLock
  126. constructor() {
  127. this.workingStack = []
  128. this.lock = new AwaitLock()
  129. }
  130. async get(): Promise<SyncTask | null> {
  131. await this.lock.acquireAsync()
  132. const task = this.workingStack.pop()
  133. this.lock.release()
  134. if (task !== undefined) {
  135. return task
  136. } else {
  137. return null
  138. }
  139. }
  140. async add(tasks: SyncTask[]): Promise<void> {
  141. await this.lock.acquireAsync()
  142. if (tasks !== null) {
  143. this.workingStack.push(...tasks)
  144. }
  145. this.lock.release()
  146. }
  147. }
  148. class TaskProcessorSpawner {
  149. processNumber: number
  150. taskSource: TaskSource
  151. constructor(taskSource: TaskSource, processNumber: number) {
  152. this.taskSource = taskSource
  153. this.processNumber = processNumber
  154. }
  155. async process(): Promise<void> {
  156. const processes = []
  157. for (let i = 0; i < this.processNumber; i++) {
  158. const processor = new TaskProcessor(this.taskSource)
  159. processes.push(processor.process())
  160. }
  161. await Promise.all(processes)
  162. }
  163. }
  164. class TaskProcessor {
  165. taskSource: TaskSource
  166. exitOnCompletion: boolean
  167. constructor(taskSource: TaskSource, exitOnCompletion = true) {
  168. this.taskSource = taskSource
  169. this.exitOnCompletion = exitOnCompletion
  170. }
  171. async process(): Promise<void> {
  172. while (true) {
  173. const task = await this.taskSource.get()
  174. if (task !== null) {
  175. logger.debug(task.description())
  176. await task.execute()
  177. } else {
  178. if (this.exitOnCompletion) {
  179. return
  180. }
  181. await sleep(3000)
  182. }
  183. }
  184. }
  185. }
  186. async function getPrepareDownloadTasks(
  187. model: Model,
  188. addedCids: string[],
  189. uploadDirectory: string,
  190. taskSink: TaskSink
  191. ): Promise<PrepareDownloadFileTask[]> {
  192. const cidMap = new Map()
  193. for (const entry of model.dataObjects) {
  194. cidMap.set(entry.cid, entry.bagId)
  195. }
  196. const bucketMap = new Map()
  197. for (const entry of model.storageBuckets) {
  198. bucketMap.set(entry.id, entry.operatorUrl)
  199. }
  200. const bagMap = new Map()
  201. for (const entry of model.bags) {
  202. const operatorUrls = []
  203. for (const bucket of entry.buckets) {
  204. if (bucketMap.has(bucket)) {
  205. const operatorUrl = bucketMap.get(bucket)
  206. if (operatorUrl) {
  207. operatorUrls.push(operatorUrl)
  208. }
  209. }
  210. }
  211. bagMap.set(entry.id, operatorUrls)
  212. }
  213. const tasks = addedCids.map((cid) => {
  214. let operatorUrls: string[] = [] // can be empty after look up
  215. if (cidMap.has(cid)) {
  216. const bagid = cidMap.get(cid)
  217. if (bagMap.has(bagid)) {
  218. operatorUrls = bagMap.get(bagid)
  219. }
  220. }
  221. return new PrepareDownloadFileTask(
  222. operatorUrls,
  223. cid,
  224. uploadDirectory,
  225. taskSink
  226. )
  227. })
  228. return tasks
  229. }
  230. async function getDownloadTasks(
  231. operatorUrl: string,
  232. addedCids: string[],
  233. uploadDirectory: string
  234. ): Promise<DownloadFileTask[]> {
  235. const addedTasks = addedCids.map(
  236. (fileName) => new DownloadFileTask(operatorUrl, fileName, uploadDirectory)
  237. )
  238. return addedTasks
  239. }
  240. class PrepareDownloadFileTask implements SyncTask {
  241. cid: string
  242. operatorUrlCandidates: string[]
  243. taskSink: TaskSink
  244. uploadsDirectory: string
  245. constructor(
  246. operatorUrlCandidates: string[],
  247. cid: string,
  248. uploadsDirectory: string,
  249. taskSink: TaskSink
  250. ) {
  251. this.cid = cid
  252. this.taskSink = taskSink
  253. // TODO: remove heavy operation
  254. // Cloning is critical here. The list will be modified.
  255. this.operatorUrlCandidates = _.cloneDeep(operatorUrlCandidates)
  256. this.uploadsDirectory = uploadsDirectory
  257. }
  258. description(): string {
  259. return `Sync - preparing for download of: ${this.cid} ....`
  260. }
  261. async execute(): Promise<void> {
  262. while (!_.isEmpty(this.operatorUrlCandidates)) {
  263. const randomUrl = _.sample(this.operatorUrlCandidates)
  264. if (!randomUrl) {
  265. break // cannot get random URL
  266. }
  267. // Remove random url from the original list.
  268. _.remove(this.operatorUrlCandidates, (url) => url === randomUrl)
  269. try {
  270. const chosenBaseUrl = randomUrl
  271. const remoteOperatorCids: string[] = await getAvailableData(
  272. chosenBaseUrl
  273. )
  274. if (remoteOperatorCids.includes(this.cid)) {
  275. const newTask = new DownloadFileTask(
  276. chosenBaseUrl,
  277. this.cid,
  278. this.uploadsDirectory
  279. )
  280. return this.taskSink.add([newTask])
  281. }
  282. } catch (err) {
  283. logger.error(`Sync - fetching data error for ${this.cid}: ${err}`)
  284. }
  285. }
  286. logger.warn(`Sync - cannot get operator URLs for ${this.cid}`)
  287. }
  288. }