index.ts 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. /*
  2. eslint-disable @typescript-eslint/naming-convention
  3. */
  4. import { EventContext, StoreContext } from '@joystream/hydra-common'
  5. import { Storage } from '../generated/types/storage'
  6. import {
  7. DistributionBucket,
  8. DistributionBucketFamily,
  9. DistributionBucketOperator,
  10. DistributionBucketOperatorMetadata,
  11. DistributionBucketOperatorStatus,
  12. NodeLocationMetadata,
  13. StorageBag,
  14. StorageBucket,
  15. StorageBucketOperatorStatusActive,
  16. StorageBucketOperatorStatusInvited,
  17. StorageBucketOperatorStatusMissing,
  18. StorageDataObject,
  19. StorageSystemParameters,
  20. GeoCoordinates,
  21. } from 'query-node/dist/model'
  22. import BN from 'bn.js'
  23. import { getById, inconsistentState } from '../common'
  24. import {
  25. processDistributionBucketFamilyMetadata,
  26. processDistributionOperatorMetadata,
  27. processStorageOperatorMetadata,
  28. } from './metadata'
  29. import {
  30. createDataObjects,
  31. getStorageSystem,
  32. removeDataObject,
  33. getStorageBucketWithOperatorMetadata,
  34. getBag,
  35. getDynamicBagId,
  36. getDynamicBagOwner,
  37. getDataObjectsInBag,
  38. getDynamicBag,
  39. getDistributionBucketFamilyWithMetadata,
  40. getDistributionBucketOperatorWithMetadata,
  41. distributionBucketId,
  42. distributionOperatorId,
  43. distributionBucketIdByFamilyAndIndex,
  44. } from './utils'
  45. // STORAGE BUCKETS
  46. export async function storage_StorageBucketCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  47. const [
  48. bucketId,
  49. invitedWorkerId,
  50. acceptingNewBags,
  51. dataObjectSizeLimit,
  52. dataObjectCountLimit,
  53. ] = new Storage.StorageBucketCreatedEvent(event).params
  54. const storageBucket = new StorageBucket({
  55. id: bucketId.toString(),
  56. acceptingNewBags: acceptingNewBags.isTrue,
  57. dataObjectCountLimit: new BN(dataObjectCountLimit.toString()),
  58. dataObjectsSizeLimit: new BN(dataObjectSizeLimit.toString()),
  59. dataObjectsCount: new BN(0),
  60. dataObjectsSize: new BN(0),
  61. })
  62. if (invitedWorkerId.isSome) {
  63. const operatorStatus = new StorageBucketOperatorStatusInvited()
  64. operatorStatus.workerId = invitedWorkerId.unwrap().toNumber()
  65. storageBucket.operatorStatus = operatorStatus
  66. } else {
  67. storageBucket.operatorStatus = new StorageBucketOperatorStatusMissing()
  68. }
  69. await store.save<StorageBucket>(storageBucket)
  70. }
  71. export async function storage_StorageOperatorMetadataSet({ event, store }: EventContext & StoreContext): Promise<void> {
  72. const [bucketId, , metadataBytes] = new Storage.StorageOperatorMetadataSetEvent(event).params
  73. const storageBucket = await getStorageBucketWithOperatorMetadata(store, bucketId.toString())
  74. storageBucket.operatorMetadata = await processStorageOperatorMetadata(
  75. store,
  76. storageBucket.operatorMetadata,
  77. metadataBytes
  78. )
  79. await store.save<StorageBucket>(storageBucket)
  80. }
  81. export async function storage_StorageBucketStatusUpdated({ event, store }: EventContext & StoreContext): Promise<void> {
  82. const [bucketId, acceptingNewBags] = new Storage.StorageBucketStatusUpdatedEvent(event).params
  83. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  84. storageBucket.acceptingNewBags = acceptingNewBags.isTrue
  85. await store.save<StorageBucket>(storageBucket)
  86. }
  87. export async function storage_StorageBucketInvitationAccepted({
  88. event,
  89. store,
  90. }: EventContext & StoreContext): Promise<void> {
  91. const [bucketId, workerId] = new Storage.StorageBucketInvitationAcceptedEvent(event).params
  92. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  93. const operatorStatus = new StorageBucketOperatorStatusActive()
  94. operatorStatus.workerId = workerId.toNumber()
  95. storageBucket.operatorStatus = operatorStatus
  96. await store.save<StorageBucket>(storageBucket)
  97. }
  98. export async function storage_StorageBucketInvitationCancelled({
  99. event,
  100. store,
  101. }: EventContext & StoreContext): Promise<void> {
  102. const [bucketId] = new Storage.StorageBucketInvitationCancelledEvent(event).params
  103. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  104. const operatorStatus = new StorageBucketOperatorStatusMissing()
  105. storageBucket.operatorStatus = operatorStatus
  106. await store.save<StorageBucket>(storageBucket)
  107. }
  108. export async function storage_StorageBucketOperatorInvited({
  109. event,
  110. store,
  111. }: EventContext & StoreContext): Promise<void> {
  112. const [bucketId, workerId] = new Storage.StorageBucketOperatorInvitedEvent(event).params
  113. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  114. const operatorStatus = new StorageBucketOperatorStatusInvited()
  115. operatorStatus.workerId = workerId.toNumber()
  116. storageBucket.operatorStatus = operatorStatus
  117. await store.save<StorageBucket>(storageBucket)
  118. }
  119. export async function storage_StorageBucketOperatorRemoved({
  120. event,
  121. store,
  122. }: EventContext & StoreContext): Promise<void> {
  123. const [bucketId] = new Storage.StorageBucketInvitationCancelledEvent(event).params
  124. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  125. const operatorStatus = new StorageBucketOperatorStatusMissing()
  126. storageBucket.operatorStatus = operatorStatus
  127. await store.save<StorageBucket>(storageBucket)
  128. }
  129. export async function storage_StorageBucketsUpdatedForBag({
  130. event,
  131. store,
  132. }: EventContext & StoreContext): Promise<void> {
  133. const [bagId, addedBucketsSet, removedBucketsSet] = new Storage.StorageBucketsUpdatedForBagEvent(event).params
  134. // Get or create bag
  135. const storageBag = await getBag(store, bagId, ['storageBuckets'])
  136. const removedBucketsIds = Array.from(removedBucketsSet).map((id) => id.toString())
  137. const addedBucketsIds = Array.from(addedBucketsSet).map((id) => id.toString())
  138. storageBag.storageBuckets = (storageBag.storageBuckets || [])
  139. .filter((bucket) => !removedBucketsIds.includes(bucket.id))
  140. .concat(addedBucketsIds.map((id) => new StorageBucket({ id })))
  141. await store.save<StorageBag>(storageBag)
  142. }
  143. export async function storage_VoucherChanged({ event, store }: EventContext & StoreContext): Promise<void> {
  144. const [bucketId, voucher] = new Storage.VoucherChangedEvent(event).params
  145. const bucket = await getById(store, StorageBucket, bucketId.toString())
  146. bucket.dataObjectCountLimit = voucher.objectsLimit
  147. bucket.dataObjectsSizeLimit = voucher.sizeLimit
  148. bucket.dataObjectsCount = voucher.objectsUsed
  149. bucket.dataObjectsSize = voucher.sizeUsed
  150. await store.save<StorageBucket>(bucket)
  151. }
  152. export async function storage_StorageBucketVoucherLimitsSet({
  153. event,
  154. store,
  155. }: EventContext & StoreContext): Promise<void> {
  156. const [bucketId, sizeLimit, countLimit] = new Storage.StorageBucketVoucherLimitsSetEvent(event).params
  157. const bucket = await getById(store, StorageBucket, bucketId.toString())
  158. bucket.dataObjectsSizeLimit = sizeLimit
  159. bucket.dataObjectCountLimit = countLimit
  160. await store.save<StorageBucket>(bucket)
  161. }
  162. export async function storage_StorageBucketDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  163. const [bucketId] = new Storage.StorageBucketDeletedEvent(event).params
  164. // TODO: Cascade remove on db level (would require changes in Hydra / comitting autogenerated files)
  165. const storageBucket = await store.get(StorageBucket, {
  166. where: { id: bucketId.toString() },
  167. relations: ['bags', 'bags.storageBuckets'],
  168. })
  169. if (!storageBucket) {
  170. inconsistentState(`Storage bucket by id ${bucketId.toString()} not found!`)
  171. }
  172. // Remove relations
  173. await Promise.all(
  174. (storageBucket.bags || []).map((bag) => {
  175. bag.storageBuckets = (bag.storageBuckets || []).filter((bucket) => bucket.id !== bucketId.toString())
  176. return store.save<StorageBag>(bag)
  177. })
  178. )
  179. await store.remove<StorageBucket>(storageBucket)
  180. }
  181. // DYNAMIC BAGS
  182. export async function storage_DynamicBagCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  183. const [bagId, , storageBucketIdsSet, distributionBucketIdsSet] = new Storage.DynamicBagCreatedEvent(event).params
  184. const storageBag = new StorageBag({
  185. id: getDynamicBagId(bagId),
  186. owner: getDynamicBagOwner(bagId),
  187. storageBuckets: Array.from(storageBucketIdsSet).map((id) => new StorageBucket({ id: id.toString() })),
  188. distributionBuckets: Array.from(distributionBucketIdsSet).map(
  189. (id) => new DistributionBucket({ id: distributionBucketId(id) })
  190. ),
  191. })
  192. await store.save<StorageBag>(storageBag)
  193. }
  194. export async function storage_DynamicBagDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  195. const [, bagId] = new Storage.DynamicBagDeletedEvent(event).params
  196. const storageBag = await getDynamicBag(store, bagId, ['objects'])
  197. await store.remove<StorageBag>(storageBag)
  198. }
  199. // DATA OBJECTS
  200. // Note: "Uploaded" here actually means "created" (the real upload happens later)
  201. export async function storage_DataObjectsUploaded({ event, store }: EventContext & StoreContext): Promise<void> {
  202. const [dataObjectIds, uploadParams, deletionPrize] = new Storage.DataObjectsUploadedEvent(event).params
  203. await createDataObjects(store, uploadParams, deletionPrize, dataObjectIds)
  204. }
  205. export async function storage_PendingDataObjectsAccepted({ event, store }: EventContext & StoreContext): Promise<void> {
  206. const [, , bagId, dataObjectIds] = new Storage.PendingDataObjectsAcceptedEvent(event).params
  207. const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds)
  208. await Promise.all(
  209. dataObjects.map(async (dataObject) => {
  210. dataObject.isAccepted = true
  211. await store.save<StorageDataObject>(dataObject)
  212. })
  213. )
  214. }
  215. export async function storage_DataObjectsMoved({ event, store }: EventContext & StoreContext): Promise<void> {
  216. const [srcBagId, destBagId, dataObjectIds] = new Storage.DataObjectsMovedEvent(event).params
  217. const dataObjects = await getDataObjectsInBag(store, srcBagId, dataObjectIds)
  218. const destBag = await getBag(store, destBagId)
  219. await Promise.all(
  220. dataObjects.map(async (dataObject) => {
  221. dataObject.storageBag = destBag
  222. await store.save<StorageDataObject>(dataObject)
  223. })
  224. )
  225. }
  226. export async function storage_DataObjectsDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  227. const [, bagId, dataObjectIds] = new Storage.DataObjectsDeletedEvent(event).params
  228. const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds)
  229. await Promise.all(dataObjects.map((o) => removeDataObject(store, o)))
  230. }
  231. // DISTRIBUTION FAMILY
  232. export async function storage_DistributionBucketFamilyCreated({
  233. event,
  234. store,
  235. }: EventContext & StoreContext): Promise<void> {
  236. const [familyId] = new Storage.DistributionBucketFamilyCreatedEvent(event).params
  237. const family = new DistributionBucketFamily({
  238. id: familyId.toString(),
  239. })
  240. await store.save<DistributionBucketFamily>(family)
  241. }
  242. export async function storage_DistributionBucketFamilyMetadataSet({
  243. event,
  244. store,
  245. }: EventContext & StoreContext): Promise<void> {
  246. const [familyId, metadataBytes] = new Storage.DistributionBucketFamilyMetadataSetEvent(event).params
  247. const family = await getDistributionBucketFamilyWithMetadata(store, familyId.toString())
  248. family.metadata = await processDistributionBucketFamilyMetadata(store, family.metadata, metadataBytes)
  249. await store.save<DistributionBucketFamily>(family)
  250. }
  251. export async function storage_DistributionBucketFamilyDeleted({
  252. event,
  253. store,
  254. }: EventContext & StoreContext): Promise<void> {
  255. const [familyId] = new Storage.DistributionBucketFamilyDeletedEvent(event).params
  256. const family = await getById(store, DistributionBucketFamily, familyId.toString())
  257. await store.remove(family)
  258. }
  259. // DISTRIBUTION BUCKET
  260. export async function storage_DistributionBucketCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  261. const [familyId, acceptingNewBags, bucketId] = new Storage.DistributionBucketCreatedEvent(event).params
  262. const family = await getById(store, DistributionBucketFamily, familyId.toString())
  263. const bucket = new DistributionBucket({
  264. id: distributionBucketId(bucketId),
  265. bucketIndex: bucketId.distribution_bucket_index.toNumber(),
  266. acceptingNewBags: acceptingNewBags.valueOf(),
  267. distributing: true, // Runtime default
  268. family,
  269. })
  270. await store.save<DistributionBucket>(bucket)
  271. }
  272. export async function storage_DistributionBucketStatusUpdated({
  273. event,
  274. store,
  275. }: EventContext & StoreContext): Promise<void> {
  276. const [bucketId, acceptingNewBags] = new Storage.DistributionBucketStatusUpdatedEvent(event).params
  277. const bucket = await getById(store, DistributionBucket, distributionBucketId(bucketId))
  278. bucket.acceptingNewBags = acceptingNewBags.valueOf()
  279. await store.save<DistributionBucket>(bucket)
  280. }
  281. export async function storage_DistributionBucketDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  282. const [bucketId] = new Storage.DistributionBucketDeletedEvent(event).params
  283. // TODO: Cascade remove on db level (would require changes in Hydra / comitting autogenerated files)
  284. const distributionBucket = await store.get(DistributionBucket, {
  285. where: { id: distributionBucketId(bucketId) },
  286. relations: ['bags', 'bags.distributionBuckets'],
  287. })
  288. if (!distributionBucket) {
  289. inconsistentState(`Distribution bucket by id ${distributionBucketId(bucketId)} not found!`)
  290. }
  291. // Remove relations
  292. await Promise.all(
  293. (distributionBucket.bags || []).map((bag) => {
  294. bag.distributionBuckets = (bag.distributionBuckets || []).filter(
  295. (bucket) => bucket.id !== distributionBucketId(bucketId)
  296. )
  297. return store.save<StorageBag>(bag)
  298. })
  299. )
  300. await store.remove<DistributionBucket>(distributionBucket)
  301. }
  302. export async function storage_DistributionBucketsUpdatedForBag({
  303. event,
  304. store,
  305. }: EventContext & StoreContext): Promise<void> {
  306. const [
  307. bagId,
  308. familyId,
  309. addedBucketsIndices,
  310. removedBucketsIndices,
  311. ] = new Storage.DistributionBucketsUpdatedForBagEvent(event).params
  312. // Get or create bag
  313. const storageBag = await getBag(store, bagId, ['distributionBuckets'])
  314. const removedBucketsIds = Array.from(removedBucketsIndices).map((bucketIndex) =>
  315. distributionBucketIdByFamilyAndIndex(familyId, bucketIndex)
  316. )
  317. const addedBucketsIds = Array.from(addedBucketsIndices).map((bucketIndex) =>
  318. distributionBucketIdByFamilyAndIndex(familyId, bucketIndex)
  319. )
  320. storageBag.distributionBuckets = (storageBag.distributionBuckets || [])
  321. .filter((bucket) => !removedBucketsIds.includes(bucket.id))
  322. .concat(addedBucketsIds.map((id) => new DistributionBucket({ id })))
  323. await store.save<StorageBag>(storageBag)
  324. }
  325. export async function storage_DistributionBucketModeUpdated({
  326. event,
  327. store,
  328. }: EventContext & StoreContext): Promise<void> {
  329. const [bucketId, distributing] = new Storage.DistributionBucketModeUpdatedEvent(event).params
  330. const bucket = await getById(store, DistributionBucket, distributionBucketId(bucketId))
  331. bucket.distributing = distributing.valueOf()
  332. await store.save<DistributionBucket>(bucket)
  333. }
  334. export async function storage_DistributionBucketOperatorInvited({
  335. event,
  336. store,
  337. }: EventContext & StoreContext): Promise<void> {
  338. const [bucketId, workerId] = new Storage.DistributionBucketOperatorInvitedEvent(event).params
  339. const bucket = await getById(store, DistributionBucket, distributionBucketId(bucketId))
  340. const invitedOperator = new DistributionBucketOperator({
  341. id: distributionOperatorId(bucketId, workerId),
  342. distributionBucket: bucket,
  343. status: DistributionBucketOperatorStatus.INVITED,
  344. workerId: workerId.toNumber(),
  345. })
  346. await store.save<DistributionBucketOperator>(invitedOperator)
  347. }
  348. export async function storage_DistributionBucketInvitationCancelled({
  349. event,
  350. store,
  351. }: EventContext & StoreContext): Promise<void> {
  352. const [bucketId, workerId] = new Storage.DistributionBucketOperatorInvitedEvent(event).params
  353. const invitedOperator = await getById(store, DistributionBucketOperator, distributionOperatorId(bucketId, workerId))
  354. await store.remove<DistributionBucketOperator>(invitedOperator)
  355. }
  356. export async function storage_DistributionBucketInvitationAccepted({
  357. event,
  358. store,
  359. }: EventContext & StoreContext): Promise<void> {
  360. const [workerId, bucketId] = new Storage.DistributionBucketInvitationAcceptedEvent(event).params
  361. const invitedOperator = await getById(store, DistributionBucketOperator, distributionOperatorId(bucketId, workerId))
  362. invitedOperator.status = DistributionBucketOperatorStatus.ACTIVE
  363. await store.save<DistributionBucketOperator>(invitedOperator)
  364. }
  365. export async function storage_DistributionBucketMetadataSet({
  366. event,
  367. store,
  368. }: EventContext & StoreContext): Promise<void> {
  369. const [workerId, bucketId, metadataBytes] = new Storage.DistributionBucketMetadataSetEvent(event).params
  370. const operator = await getDistributionBucketOperatorWithMetadata(store, distributionOperatorId(bucketId, workerId))
  371. operator.metadata = await processDistributionOperatorMetadata(store, operator.metadata, metadataBytes)
  372. await store.save<DistributionBucketOperator>(operator)
  373. }
  374. export async function storage_DistributionBucketOperatorRemoved({
  375. event,
  376. store,
  377. }: EventContext & StoreContext): Promise<void> {
  378. const [bucketId, workerId] = new Storage.DistributionBucketOperatorRemovedEvent(event).params
  379. // TODO: Cascade remove on db level (would require changes in Hydra / comitting autogenerated files)
  380. const operator = await getDistributionBucketOperatorWithMetadata(store, distributionOperatorId(bucketId, workerId))
  381. await store.remove<DistributionBucketOperator>(operator)
  382. if (operator.metadata) {
  383. await store.remove<DistributionBucketOperatorMetadata>(operator.metadata)
  384. if (operator.metadata.nodeLocation) {
  385. await store.remove<NodeLocationMetadata>(operator.metadata.nodeLocation)
  386. if (operator.metadata.nodeLocation.coordinates) {
  387. await store.remove<GeoCoordinates>(operator.metadata.nodeLocation.coordinates)
  388. }
  389. }
  390. }
  391. }
  392. // STORAGE SYSTEM GLOBAL PARAMS
  393. export async function storage_UpdateBlacklist({ event, store }: EventContext & StoreContext): Promise<void> {
  394. const [removedContentIds, addedContentIds] = new Storage.UpdateBlacklistEvent(event).params
  395. const storageSystem = await getStorageSystem(store)
  396. storageSystem.blacklist = storageSystem.blacklist
  397. .filter((cid) => !Array.from(removedContentIds).some((id) => id.eq(cid)))
  398. .concat(Array.from(addedContentIds).map((id) => id.toString()))
  399. await store.save<StorageSystemParameters>(storageSystem)
  400. }
  401. export async function storage_DistributionBucketsPerBagLimitUpdated({
  402. event,
  403. store,
  404. }: EventContext & StoreContext): Promise<void> {
  405. const [newLimit] = new Storage.DistributionBucketsPerBagLimitUpdatedEvent(event).params
  406. const storageSystem = await getStorageSystem(store)
  407. storageSystem.distributionBucketsPerBagLimit = newLimit.toNumber()
  408. await store.save<StorageSystemParameters>(storageSystem)
  409. }
  410. export async function storage_StorageBucketsPerBagLimitUpdated({
  411. event,
  412. store,
  413. }: EventContext & StoreContext): Promise<void> {
  414. const [newLimit] = new Storage.StorageBucketsPerBagLimitUpdatedEvent(event).params
  415. const storageSystem = await getStorageSystem(store)
  416. storageSystem.storageBucketsPerBagLimit = newLimit.toNumber()
  417. await store.save<StorageSystemParameters>(storageSystem)
  418. }
  419. export async function storage_StorageBucketsVoucherMaxLimitsUpdated({
  420. event,
  421. store,
  422. }: EventContext & StoreContext): Promise<void> {
  423. const [sizeLimit, countLimit] = new Storage.StorageBucketsVoucherMaxLimitsUpdatedEvent(event).params
  424. const storageSystem = await getStorageSystem(store)
  425. storageSystem.storageBucketMaxObjectsSizeLimit = sizeLimit
  426. storageSystem.storageBucketMaxObjectsCountLimit = countLimit
  427. await store.save<StorageSystemParameters>(storageSystem)
  428. }
  429. export async function storage_UploadingBlockStatusUpdated({
  430. event,
  431. store,
  432. }: EventContext & StoreContext): Promise<void> {
  433. const [isBlocked] = new Storage.UploadingBlockStatusUpdatedEvent(event).params
  434. const storageSystem = await getStorageSystem(store)
  435. storageSystem.uploadingBlocked = isBlocked.isTrue
  436. await store.save<StorageSystemParameters>(storageSystem)
  437. }
  438. export async function storage_DataObjectPerMegabyteFeeUpdated({
  439. event,
  440. store,
  441. }: EventContext & StoreContext): Promise<void> {
  442. const [newFee] = new Storage.DataObjectPerMegabyteFeeUpdatedEvent(event).params
  443. const storageSystem = await getStorageSystem(store)
  444. storageSystem.dataObjectFeePerMb = newFee
  445. await store.save<StorageSystemParameters>(storageSystem)
  446. }