Przeglądaj źródła

Improve error handling (mostly query-node-related)

Leszek Wiesner 3 lat temu
rodzic
commit
1b6ac27d41

+ 15 - 0
distributor-node/.eslintrc.js

@@ -0,0 +1,15 @@
+module.exports = {
+  env: {
+    mocha: true,
+  },
+  parserOptions: {
+    project: './tsconfig.json',
+  },
+  extends: ['@joystream/eslint-config'],
+  rules: {
+    'no-unused-vars': 'off', // Required by the typescript rule below
+    '@typescript-eslint/no-unused-vars': ['error'],
+    '@typescript-eslint/no-floating-promises': 'error',
+    'no-void': 'off',
+  },
+}

+ 1 - 0
distributor-node/.prettierignore

@@ -1,3 +1,4 @@
 /**/generated
 /**/mock.graphql
 lib
+local

+ 18 - 16
distributor-node/src/app/index.ts

@@ -52,11 +52,16 @@ export class App {
 
   public async start(): Promise<void> {
     this.logger.info('Starting the app')
-    this.checkConfigDirectories()
-    this.stateCache.load()
-    const dataObjects = await this.networking.fetchSupportedDataObjects()
-    await this.content.startupInit(dataObjects)
-    this.server.start()
+    try {
+      this.checkConfigDirectories()
+      this.stateCache.load()
+      const dataObjects = await this.networking.fetchSupportedDataObjects()
+      await this.content.startupInit(dataObjects)
+      this.server.start()
+    } catch (err) {
+      this.logger.error('Node initialization failed!', { err })
+      process.exit(-1)
+    }
     nodeCleanup(this.exitHandler.bind(this))
   }
 
@@ -65,11 +70,6 @@ export class App {
     // We can try to wait until some pending downloads are finished here etc.
     this.logger.info('Graceful exit initialized')
 
-    // Stop accepting any new requests and save cache
-    this.server.stop()
-    this.stateCache.clearInterval()
-    this.stateCache.saveSync()
-
     // Try to process remaining downloads
     const MAX_RETRY_ATTEMPTS = 3
     let retryCounter = 0
@@ -95,17 +95,19 @@ export class App {
   }
 
   private exitCritically(): void {
-    this.logger.info('Critical exit initialized')
-    // Handling exits due to an error - only some critical, synchronous work can be done here
-    this.server.stop()
-    this.stateCache.clearInterval()
-    this.stateCache.saveSync()
+    // Some additional synchronous work if required...
     this.logger.info('Critical exit finished')
   }
 
   private exitHandler(exitCode: number | null, signal: string | null): boolean | undefined {
-    this.logger.info('Exiting')
+    this.logger.info('Exiting...')
+    // Clear intervals
     this.stateCache.clearInterval()
+    this.networking.clearIntervals()
+    // Stop the server
+    this.server.stop()
+    // Save cache
+    this.stateCache.saveSync()
     if (signal) {
       // Async exit can be executed
       this.exitGracefully()

+ 1 - 1
distributor-node/src/commands/start.ts

@@ -10,7 +10,7 @@ export default class StartNode extends DefaultCommandBase {
 
   async run(): Promise<void> {
     const app = new App(this.appConfig)
-    app.start()
+    await app.start()
   }
 
   async finally(): Promise<void> {

+ 22 - 19
distributor-node/src/services/networking/NetworkingService.ts

@@ -49,13 +49,12 @@ export class NetworkingService {
     this.logging = logging
     this.stateCache = stateCache
     this.logger = logging.createLogger('NetworkingManager')
-    this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode)
+    this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
     // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
-    this.checkActiveStorageNodeEndpoints()
-    this.storageNodeEndpointsCheckInterval = setInterval(
-      this.checkActiveStorageNodeEndpoints.bind(this),
-      STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS
-    )
+    void this.checkActiveStorageNodeEndpoints()
+    this.storageNodeEndpointsCheckInterval = setInterval(async () => {
+      await this.checkActiveStorageNodeEndpoints()
+    }, STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS)
     // Queues
     this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
       'end',
@@ -307,20 +306,24 @@ export class NetworkingService {
   }
 
   async checkActiveStorageNodeEndpoints(): Promise<void> {
-    const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
-    const endpoints = this.filterStorageNodeEndpoints(
-      activeStorageOperators.map(({ id, operatorMetadata }) => ({
-        bucketId: id,
-        endpoint: operatorMetadata!.nodeEndpoint!,
-      }))
-    )
-    this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
+    try {
+      const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
+      const endpoints = this.filterStorageNodeEndpoints(
+        activeStorageOperators.map(({ id, operatorMetadata }) => ({
+          bucketId: id,
+          endpoint: operatorMetadata!.nodeEndpoint!,
+        }))
+      )
+      this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
 
-    endpoints.forEach(({ endpoint }) =>
-      this.testLatencyQueue.push(async () => {
-        await this.checkResponseTime(endpoint)
-      })
-    )
+      endpoints.forEach(({ endpoint }) =>
+        this.testLatencyQueue.push(async () => {
+          await this.checkResponseTime(endpoint)
+        })
+      )
+    } catch (err) {
+      this.logger.error("Couldn't check active storage node endpooints", { err })
+    }
   }
 
   async checkResponseTime(endpoint: string): Promise<void> {

+ 13 - 3
distributor-node/src/services/networking/query-node/api.ts

@@ -1,5 +1,8 @@
-import { ApolloClient, NormalizedCacheObject, HttpLink, InMemoryCache, DocumentNode } from '@apollo/client/core'
+import { ApolloClient, NormalizedCacheObject, HttpLink, InMemoryCache, DocumentNode, from } from '@apollo/client/core'
+import { onError } from '@apollo/client/link/error'
 import fetch from 'cross-fetch'
+import { Logger } from 'winston'
+import { LoggingService } from '../../logging'
 import {
   DataObjectDetailsFragment,
   GetDataObjectDetails,
@@ -21,10 +24,17 @@ import { Maybe } from './generated/schema'
 
 export class QueryNodeApi {
   private apolloClient: ApolloClient<NormalizedCacheObject>
+  private logger: Logger
 
-  public constructor(endpoint: string) {
+  public constructor(endpoint: string, logging: LoggingService, exitOnError = false) {
+    this.logger = logging.createLogger('QueryNodeApi')
+    const errorLink = onError(({ graphQLErrors, networkError }) => {
+      const message = networkError?.message || 'Graphql syntax errors found'
+      this.logger.error('Error when trying to execute a query!', { err: { message, graphQLErrors, networkError } })
+      exitOnError && process.exit(-1)
+    })
     this.apolloClient = new ApolloClient({
-      link: new HttpLink({ uri: endpoint, fetch }),
+      link: from([errorLink, new HttpLink({ uri: endpoint, fetch })]),
       cache: new InMemoryCache(),
       defaultOptions: { query: { fetchPolicy: 'no-cache', errorPolicy: 'all' } },
     })

+ 3 - 3
distributor-node/src/services/validation/generateTypes.ts

@@ -7,7 +7,7 @@ import { schemas } from './schemas'
 const prettierConfig = require('@joystream/prettier-config')
 
 Object.entries(schemas).forEach(([schemaKey, schema]) => {
-  compile(schema, `${schemaKey}Json`, { style: prettierConfig }).then((output) =>
-    fs.writeFileSync(path.resolve(__dirname, `../../types/generated/${schemaKey}Json.d.ts`), output)
-  )
+  compile(schema, `${schemaKey}Json`, { style: prettierConfig })
+    .then((output) => fs.writeFileSync(path.resolve(__dirname, `../../types/generated/${schemaKey}Json.d.ts`), output))
+    .catch(console.error)
 })