storageObligations.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api'
  2. import logger from '../logger'
  3. import _ from 'lodash'
  4. import {
  5. StorageBagDetailsFragment,
  6. StorageBucketDetailsFragment,
  7. DataObjectDetailsFragment,
  8. } from '../queryNode/generated/queries'
  9. /**
  10. * Defines storage provider data obligations.
  11. */
  12. export type DataObligations = {
  13. /**
  14. * All storage buckets in the system.
  15. */
  16. storageBuckets: StorageBucket[]
  17. /**
  18. * Assigned bags for the storage provider.
  19. */
  20. bags: Bag[]
  21. /**
  22. * Assigned data objects for the storage provider.
  23. */
  24. dataObjects: DataObject[]
  25. }
  26. /**
  27. * Storage bucket abstraction.
  28. */
  29. type StorageBucket = {
  30. /**
  31. * Storage bucket ID
  32. */
  33. id: string
  34. /**
  35. * Storage operator URL
  36. */
  37. operatorUrl: string
  38. /**
  39. * Storage working group ID.
  40. */
  41. workerId: number
  42. }
  43. /**
  44. * Storage bag abstracton.
  45. */
  46. type Bag = {
  47. /**
  48. * Storage bag ID
  49. */
  50. id: string
  51. /**
  52. * Assigned storage bucket IDs.
  53. */
  54. buckets: string[]
  55. }
  56. /**
  57. * Data object abstraction.
  58. */
  59. type DataObject = {
  60. /**
  61. * Data object ID
  62. */
  63. id: string
  64. /**
  65. * Assigned bag ID
  66. */
  67. bagId: string
  68. }
  69. /**
  70. * Get storage provider obligations like (assigned data objects) from the
  71. * runtime (Query Node).
  72. *
  73. * @param queryNodeUrl - Query Node URL
  74. * @param workerId - worker ID
  75. * @returns promise for the DataObligations
  76. */
  77. export async function getStorageObligationsFromRuntime(
  78. queryNodeUrl: string,
  79. workerId: number
  80. ): Promise<DataObligations> {
  81. const api = new QueryNodeApi(queryNodeUrl)
  82. const allBuckets = await getAllBuckets(api)
  83. const bucketIds = allBuckets
  84. .filter((bucket) => bucket.operatorStatus?.workerId === workerId)
  85. .map((bucket) => bucket.id)
  86. const assignedBags = await getAllAssignedBags(api, bucketIds)
  87. const bagIds = assignedBags.map((bag) => bag.id)
  88. const assignedDataObjects = await getAllAssignedDataObjects(api, bagIds)
  89. const model: DataObligations = {
  90. storageBuckets: allBuckets.map((bucket) => ({
  91. id: bucket.id,
  92. operatorUrl: bucket.operatorMetadata?.nodeEndpoint ?? '',
  93. workerId: bucket.operatorStatus?.workerId,
  94. })),
  95. bags: assignedBags.map((bag) => ({
  96. id: bag.id,
  97. buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.id),
  98. })),
  99. dataObjects: assignedDataObjects.map((dataObject) => ({
  100. id: dataObject.id,
  101. bagId: dataObject.storageBagId,
  102. })),
  103. }
  104. return model
  105. }
  106. /**
  107. * Get storage bucket IDs assigned to the worker.
  108. *
  109. * @param queryNodeUrl - Query Node URL
  110. * @param workerId - worker ID
  111. * @returns storage bucket IDs
  112. */
  113. export async function getStorageBucketIdsByWorkerId(queryNodeUrl: string, workerId: number): Promise<string[]> {
  114. const api = new QueryNodeApi(queryNodeUrl)
  115. const idFragments = await api.getStorageBucketIdsByWorkerId(workerId.toString())
  116. const ids = idFragments.map((frag) => frag.id)
  117. return ids
  118. }
  119. /**
  120. * Get IDs of the data objects assigned to the bag ID.
  121. *
  122. * @param api - initialiazed QueryNodeApi instance
  123. * @param bagId - bag ID
  124. * @returns data object IDs
  125. */
  126. export async function getDataObjectIDsByBagId(queryNodeUrl: string, bagId: string): Promise<string[]> {
  127. const api = new QueryNodeApi(queryNodeUrl)
  128. const dataObjects = await getAllAssignedDataObjects(api, [bagId])
  129. return dataObjects.map((obj) => obj.id)
  130. }
  131. /**
  132. * Get all storage buckets registered in the runtime (Query Node).
  133. *
  134. * @param api - initialiazed QueryNodeApi instance
  135. * @returns storage buckets data
  136. */
  137. async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFragment[]> {
  138. const idFragments = await api.getStorageBucketIds()
  139. const ids = idFragments.map((frag) => frag.id)
  140. return await getAllObjectsWithPaging(async (offset, limit) => {
  141. const idsPart = ids.slice(offset, offset + limit)
  142. if (!_.isEmpty(idsPart)) {
  143. logger.debug(`Sync - getting all storage buckets: offset = ${offset}, limit = ${limit}`)
  144. return await api.getStorageBucketDetails(idsPart, 0, limit)
  145. } else {
  146. return false
  147. }
  148. })
  149. }
  150. /**
  151. * Get all data objects assigned to storage provider.
  152. *
  153. * @param api - initialiazed QueryNodeApi instance
  154. * @param bagIds - assigned storage bags' IDs
  155. * @returns storage bag data
  156. */
  157. async function getAllAssignedDataObjects(api: QueryNodeApi, bagIds: string[]): Promise<DataObjectDetailsFragment[]> {
  158. return await api.getDataObjectDetails(bagIds)
  159. }
  160. /**
  161. * Get all bags assigned to storage provider.
  162. *
  163. * @param api - initialiazed QueryNodeApi instance
  164. * @param bucketIds - assigned storage provider buckets' IDs
  165. * @returns storage bag data
  166. */
  167. async function getAllAssignedBags(api: QueryNodeApi, bucketIds: string[]): Promise<StorageBagDetailsFragment[]> {
  168. return await api.getStorageBagsDetails(bucketIds)
  169. }
  170. /**
  171. * Abstract object acquiring function for the QueryNode. It uses paging for
  172. * queries and gets data using record offset and limit (hardcoded to 1000).
  173. *
  174. * @param objectName - object name(type) to get from the QueryNode
  175. * @param query - actual query function
  176. * @returns storage operator URL
  177. */
  178. async function getAllObjectsWithPaging<T>(
  179. query: (offset: number, limit: number) => Promise<T[] | false>
  180. ): Promise<T[]> {
  181. const result = []
  182. const limit = MAX_RESULTS_PER_QUERY
  183. let offset = 0
  184. let resultPart = []
  185. do {
  186. const queryResult = await query(offset, limit)
  187. if (queryResult === false) {
  188. return result
  189. } else {
  190. resultPart = queryResult
  191. }
  192. offset += limit
  193. result.push(...resultPart)
  194. } while (resultPart.length > 0)
  195. return result
  196. }