12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- import { PubSubEngine, PubSub } from 'graphql-subscriptions'
- import createSubscriber from 'pg-listen'
- import { Logger } from './logger'
- const pubSub: PubSubEngine = new PubSub()
- export enum TOPICS {
- processorState = 'PROCESSOR_STATE',
- }
- export function getPubSub(): PubSubEngine {
- return pubSub
- }
- export async function startPgSubsribers() {
- // use PG_** env variables to connect
- const subscriber = createSubscriber()
- const channel = 'processed_events_log_update'
- subscriber.notifications.on(channel, (payload: unknown) => {
- const { data } = payload as {
- data: {
- event_id: string
- last_scanned_block: number
- indexer_head: number
- chain_head: number
- }
- }
- // Payload as passed to subscriber.notify() (see below)
- pubSub.publish(TOPICS.processorState, {
- lastProcessedEvent: data.event_id,
- lastScannedBlock: data.last_scanned_block,
- chainHead: data.chain_head,
- indexerHead: data.indexer_head,
- })
- })
- subscriber.events.on('error', (error) => {
- Logger.error('Fatal database connection error:', error)
- process.exit(1)
- })
- process.on('exit', () => {
- Logger.log(`Closing the subscriber`)
- subscriber.close()
- })
- await subscriber.connect()
- await subscriber.listenTo(channel)
- }
|