pubsub.ts 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import { PubSubEngine, PubSub } from 'graphql-subscriptions'
  2. import createSubscriber from 'pg-listen'
  3. import { Logger } from './logger'
  4. const pubSub: PubSubEngine = new PubSub()
  5. export enum TOPICS {
  6. processorState = 'PROCESSOR_STATE',
  7. }
  8. export function getPubSub(): PubSubEngine {
  9. return pubSub
  10. }
  11. export async function startPgSubsribers() {
  12. // use PG_** env variables to connect
  13. const subscriber = createSubscriber()
  14. const channel = 'processed_events_log_update'
  15. subscriber.notifications.on(channel, (payload: unknown) => {
  16. const { data } = payload as {
  17. data: {
  18. event_id: string
  19. last_scanned_block: number
  20. indexer_head: number
  21. chain_head: number
  22. }
  23. }
  24. // Payload as passed to subscriber.notify() (see below)
  25. pubSub.publish(TOPICS.processorState, {
  26. lastProcessedEvent: data.event_id,
  27. lastScannedBlock: data.last_scanned_block,
  28. chainHead: data.chain_head,
  29. indexerHead: data.indexer_head,
  30. })
  31. })
  32. subscriber.events.on('error', (error) => {
  33. Logger.error('Fatal database connection error:', error)
  34. process.exit(1)
  35. })
  36. process.on('exit', () => {
  37. Logger.log(`Closing the subscriber`)
  38. subscriber.close()
  39. })
  40. await subscriber.connect()
  41. await subscriber.listenTo(channel)
  42. }