workingGroup.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import { EventContext, StoreContext, DatabaseManager, SubstrateEvent } from '@joystream/hydra-common'
  2. import { bytesToString, inconsistentState, logger } from './common'
  3. import { Worker, WorkerType } from 'query-node/dist/model'
  4. import { StorageWorkingGroup } from './generated/types'
  5. import { WorkerId } from '@joystream/types/augment'
  6. export function workerEntityId(type: WorkerType, workerId: string | WorkerId): string {
  7. return `${type}-${workerId.toString()}`
  8. }
  9. export async function workingGroup_OpeningFilled({ event, store }: EventContext & StoreContext): Promise<void> {
  10. const workerType = getWorkerType(event)
  11. if (!workerType) {
  12. return
  13. }
  14. const [, applicationIdToWorkerIdMap] = new StorageWorkingGroup.OpeningFilledEvent(event).params
  15. const workerIds = [...applicationIdToWorkerIdMap.values()]
  16. for (const workerId of workerIds) {
  17. await createWorker(store, workerId, workerType, event)
  18. }
  19. // emit log event
  20. logger.info('Workers have been created', { ids: workerIds.map((item) => item.toString()), workerType })
  21. }
  22. export async function workingGroup_WorkerStorageUpdated({ event, store }: EventContext & StoreContext): Promise<void> {
  23. const workerType = getWorkerType(event)
  24. if (!workerType) {
  25. return
  26. }
  27. const [workerId, newMetadata] = new StorageWorkingGroup.WorkerStorageUpdatedEvent(event).params
  28. // load worker
  29. const worker = await store.get(Worker, {
  30. where: {
  31. workerId: workerId.toString(),
  32. type: workerType,
  33. },
  34. })
  35. // ensure worker exists
  36. if (!worker) {
  37. return inconsistentState('Non-existing worker update requested', workerId)
  38. }
  39. worker.metadata = bytesToString(newMetadata)
  40. await store.save<Worker>(worker)
  41. // emit log event
  42. logger.info('Worker has been updated', { workerId, workerType })
  43. }
  44. export async function workingGroup_TerminatedWorker({ event, store }: EventContext & StoreContext): Promise<void> {
  45. const workerType = getWorkerType(event)
  46. if (!workerType) {
  47. return
  48. }
  49. const [workerId] = new StorageWorkingGroup.TerminatedWorkerEvent(event).params
  50. // do removal logic
  51. await deactivateWorker(store, event, workerType, workerId)
  52. // emit log event
  53. logger.info('Worker has been removed (worker terminated)', { workerId, workerType })
  54. }
  55. export async function workingGroup_WorkerExited({ event, store }: EventContext & StoreContext): Promise<void> {
  56. const workerType = getWorkerType(event)
  57. if (!workerType) {
  58. return
  59. }
  60. const [workerId] = new StorageWorkingGroup.WorkerExitedEvent(event).params
  61. // do removal logic
  62. await deactivateWorker(store, event, workerType, workerId)
  63. // emit log event
  64. logger.info('Worker has been removed (worker exited)', { workerId, workerType })
  65. }
  66. export async function workingGroup_TerminatedLeader({ event, store }: EventContext & StoreContext): Promise<void> {
  67. const workerType = getWorkerType(event)
  68. if (!workerType) {
  69. return
  70. }
  71. const [workerId] = new StorageWorkingGroup.WorkerExitedEvent(event).params
  72. // do removal logic
  73. await deactivateWorker(store, event, workerType, workerId)
  74. // emit log event
  75. logger.info('Working group leader has been removed (worker exited)', { workerId, workerType })
  76. }
  77. /// ///////////////// Helpers ////////////////////////////////////////////////////
  78. function getWorkerType(event: SubstrateEvent): WorkerType | null {
  79. // Note: event.section is not available!
  80. const [eventSection] = event.name.split('.')
  81. if (eventSection === 'storageWorkingGroup') {
  82. return WorkerType.STORAGE
  83. }
  84. if (eventSection === 'gatewayWorkingGroup') {
  85. return WorkerType.GATEWAY
  86. }
  87. return null
  88. }
  89. async function createWorker(
  90. db: DatabaseManager,
  91. workerId: WorkerId,
  92. workerType: WorkerType,
  93. event: SubstrateEvent
  94. ): Promise<void> {
  95. // create entity
  96. const newWorker = new Worker({
  97. id: workerEntityId(workerType, workerId),
  98. workerId: workerId.toString(),
  99. type: workerType,
  100. isActive: true,
  101. createdAt: new Date(event.blockTimestamp),
  102. updatedAt: new Date(event.blockTimestamp),
  103. })
  104. // save worker
  105. await db.save<Worker>(newWorker)
  106. }
  107. async function deactivateWorker(
  108. db: DatabaseManager,
  109. event: SubstrateEvent,
  110. workerType: WorkerType,
  111. workerId: WorkerId
  112. ) {
  113. // load worker
  114. const worker = await db.get(Worker, {
  115. where: {
  116. workerId: workerId.toString(),
  117. type: workerType,
  118. },
  119. })
  120. // ensure worker exists
  121. if (!worker) {
  122. return inconsistentState('Non-existing worker deletion requested', workerId)
  123. }
  124. // update worker
  125. worker.isActive = false
  126. // set last update time
  127. worker.updatedAt = new Date(event.blockTimestamp)
  128. // save worker
  129. await db.save<Worker>(worker)
  130. }