Browse Source

Logging improvements, add winston-ES, fix latency test issues

Leszek Wiesner 3 years ago
parent
commit
b333356080

+ 3 - 0
distributor-node/config.yml

@@ -1,6 +1,8 @@
+id: test-node
 endpoints:
   queryNode: http://localhost:8081/graphql
   substrateNode: ws://localhost:9944
+  elasticSearch: http://localhost:9200
 directories:
   data: ./local/data
   cache: ./local/cache
@@ -8,6 +10,7 @@ directories:
 log:
   file: debug
   console: info
+  elastic: info
 storageLimit: 100G
 port: 3334
 keys: [//Alice]

+ 2 - 1
distributor-node/package.json

@@ -27,7 +27,8 @@
     "tslib": "^1",
     "yaml": "^1.10.2",
     "queue": "^6.0.2",
-    "express-http-proxy": "^1.6.2"
+    "express-http-proxy": "^1.6.2",
+    "winston-elasticsearch": "^0.15.8"
   },
   "devDependencies": {
     "@graphql-codegen/cli": "^1.21.4",

+ 4 - 5
distributor-node/src/app/index.ts

@@ -87,6 +87,7 @@ export class App {
     }
 
     this.logger.info('Graceful exit finished')
+    await this.logging.end()
   }
 
   private exitCritically(): void {
@@ -103,23 +104,21 @@ export class App {
     this.stateCache.clearInterval()
     if (signal) {
       // Async exit can be executed
-      // TODO: this.logging.end() currently doesn't seem to be enough to make sure all logs are flushed to a file
       this.exitGracefully()
         .then(() => {
-          this.logging.end()
           process.kill(process.pid, signal)
         })
         .catch((err) => {
           this.logger.error('Graceful exit error', { err })
-          this.logging.end()
-          process.kill(process.pid, signal)
+          this.logging.end().finally(() => {
+            process.kill(process.pid, signal)
+          })
         })
       nodeCleanup.uninstall()
       return false
     } else {
       // Only synchronous work can be done here
       this.exitCritically()
-      this.logging.end()
     }
   }
 }

+ 7 - 4
distributor-node/src/services/cache/StateCacheService.ts

@@ -18,7 +18,7 @@ export interface PendingDownloadData {
 }
 
 export interface StorageNodeEndpointData {
-  responseTimes: number[]
+  last10ResponseTimes: number[]
 }
 
 export interface CacheItemData {
@@ -190,8 +190,11 @@ export class StateCacheService {
   }
 
   public setStorageNodeEndpointResponseTime(endpoint: string, time: number): void {
-    const data = this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint) || { responseTimes: [] }
-    data.responseTimes.push(time)
+    const data = this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint) || { last10ResponseTimes: [] }
+    if (data.last10ResponseTimes.length === 10) {
+      data.last10ResponseTimes.shift()
+    }
+    data.last10ResponseTimes.push(time)
   }
 
   public getStorageNodeEndpointData(endpoint: string): StorageNodeEndpointData | undefined {
@@ -220,7 +223,7 @@ export class StateCacheService {
           this.logger.error('Cache file save error', { err })
           resolve(false)
         } else {
-          this.logger.info('Cache file updated')
+          this.logger.verbose('Cache file updated')
           resolve(true)
         }
       })

+ 16 - 13
distributor-node/src/services/content/ContentService.ts

@@ -36,12 +36,18 @@ export class ContentService {
   public async startupInit(supportedObjects: DataObjectData[]): Promise<void> {
     const dataObjectsByHash = _.groupBy(supportedObjects, (o) => o.contentHash)
     const dataDirFiles = fs.readdirSync(this.dataDir)
+    const filesCountOnStartup = dataDirFiles.length
+    const cachedContentHashes = this.stateCache.getCachedContentHashes()
+    const cacheItemsOnStartup = cachedContentHashes.length
 
-    let filesCountOnStartup = 0
+    this.logger.info('ContentService initializing...', {
+      supportedObjects: supportedObjects.length,
+      filesCountOnStartup,
+      cacheItemsOnStartup,
+    })
     let filesDropped = 0
     for (const contentHash of dataDirFiles) {
-      ++filesCountOnStartup
-      this.logger.verbose('Checking content file', { contentHash })
+      this.logger.debug('Checking content file', { contentHash })
       // Add fileSize to contentSizeSum for each file. If the file ends up dropped - contentSizeSum will be reduced by this.drop().
       const fileSize = this.fileSize(contentHash)
       this.contentSizeSum += fileSize
@@ -74,8 +80,6 @@ export class ContentService {
       })
     }
 
-    const cachedContentHashes = this.stateCache.getCachedContentHashes()
-    const cacheItemsOnStartup = cachedContentHashes.length
     let cacheItemsDropped = 0
     for (const contentHash of cachedContentHashes) {
       if (!this.exists(contentHash)) {
@@ -86,9 +90,7 @@ export class ContentService {
     }
 
     this.logger.info('ContentService initialized', {
-      filesCountOnStartup,
       filesDropped,
-      cacheItemsOnStartup,
       cacheItemsDropped,
       contentSizeSum: this.contentSizeSum,
     })
@@ -99,9 +101,9 @@ export class ContentService {
       const size = this.fileSize(contentHash)
       fs.unlinkSync(this.path(contentHash))
       this.contentSizeSum -= size
-      this.logger.verbose('Dropping content', { contentHash, reason, size, contentSizeSum: this.contentSizeSum })
+      this.logger.debug('Dropping content', { contentHash, reason, size, contentSizeSum: this.contentSizeSum })
     } else {
-      this.logger.verbose('Trying to drop content that no loger exists', { contentHash, reason })
+      this.logger.warn('Trying to drop content that no loger exists', { contentHash, reason })
     }
     this.stateCache.dropByHash(contentHash)
   }
@@ -138,17 +140,18 @@ export class ContentService {
     return guessResult?.mime || DEFAULT_CONTENT_TYPE
   }
 
-  private async dropCacheItemsUntilFreeSpaceReached(expectedFreeSpace: number): Promise<void> {
-    this.logger.verbose(`Cache eviction free space target: ${expectedFreeSpace}`)
-    while (this.freeSpace < expectedFreeSpace) {
+  private async dropCacheItemsUntilFreeSpaceReached(targetFreeSpace: number): Promise<void> {
+    this.logger.verbose('Cache eviction initialized.', { targetFreeSpace })
+    while (this.freeSpace < targetFreeSpace) {
       const evictCandidateHash = this.stateCache.getCacheEvictCandidateHash()
       if (evictCandidateHash) {
         this.drop(evictCandidateHash, 'Cache eviction')
       } else {
-        this.logger.verbose('Nothing to drop from cache, waiting...', { freeSpace: this.freeSpace, expectedFreeSpace })
+        this.logger.verbose('Nothing to drop from cache, waiting...', { freeSpace: this.freeSpace, targetFreeSpace })
         await new Promise((resolve) => setTimeout(resolve, 1000))
       }
     }
+    this.logger.verbose('Cache eviction finalized.', { freeSpace: this.freeSpace })
   }
 
   public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {

+ 64 - 19
distributor-node/src/services/logging/LoggingService.ts

@@ -1,6 +1,10 @@
 import winston, { Logger, LoggerOptions } from 'winston'
 import escFormat from '@elastic/ecs-winston-format'
+import { ElasticsearchTransport } from 'winston-elasticsearch'
 import { ReadonlyConfig } from '../../types'
+import { blake2AsHex } from '@polkadot/util-crypto'
+import { Format } from 'logform'
+import NodeCache from 'node-cache'
 
 const cliColors = {
   error: 'red',
@@ -12,6 +16,26 @@ const cliColors = {
 
 winston.addColors(cliColors)
 
+const pausedLogs = new NodeCache({
+  deleteOnExpire: true,
+})
+
+// Pause log for a specified time period
+const pauseFormat: (opts: { id: string }) => Format = winston.format((info, opts: { id: string }) => {
+  if (info['@pauseFor']) {
+    const messageHash = blake2AsHex(`${opts.id}:${info.level}:${info.message}`)
+    if (!pausedLogs.has(messageHash)) {
+      pausedLogs.set(messageHash, null, info['@pauseFor'])
+      info.message += ` (this log message will be skipped for the next ${info['@pauseFor']}s)`
+      delete info['@pauseFor']
+      return info
+    }
+    return false
+  }
+
+  return info
+})
+
 const cliFormat = winston.format.combine(
   winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
   winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }),
@@ -25,30 +49,47 @@ const cliFormat = winston.format.combine(
 
 export class LoggingService {
   private rootLogger: Logger
+  private esTransport: ElasticsearchTransport | undefined
 
-  private constructor(options: LoggerOptions) {
+  private constructor(options: LoggerOptions, esTransport?: ElasticsearchTransport) {
+    this.esTransport = esTransport
     this.rootLogger = winston.createLogger(options)
   }
 
   public static withAppConfig(config: ReadonlyConfig): LoggingService {
-    const transports: winston.LoggerOptions['transports'] = [
-      new winston.transports.File({
-        filename: `${config.directories.logs}/logs.json`,
-        level: config.log?.file || 'debug',
-        format: escFormat(),
-      }),
-    ]
+    const esTransport = new ElasticsearchTransport({
+      level: config.log?.elastic || 'warn',
+      format: winston.format.combine(pauseFormat({ id: 'es' }), escFormat()),
+      flushInterval: 5000,
+      source: config.id,
+      clientOpts: {
+        node: {
+          url: new URL(config.endpoints.elasticSearch),
+        },
+      },
+    })
+
+    const fileTransport = new winston.transports.File({
+      filename: `${config.directories.logs}/logs.json`,
+      level: config.log?.file || 'debug',
+      format: winston.format.combine(pauseFormat({ id: 'file' }), escFormat()),
+    })
+
+    const transports: winston.LoggerOptions['transports'] = [esTransport, fileTransport]
+
     if (config.log?.console) {
-      transports.push(
-        new winston.transports.Console({
-          level: config.log.console,
-          format: cliFormat,
-        })
-      )
+      const consoleTransport = new winston.transports.Console({
+        level: config.log.console,
+        format: winston.format.combine(pauseFormat({ id: 'cli' }), cliFormat),
+      })
+      transports.push(consoleTransport)
     }
-    return new LoggingService({
-      transports,
-    })
+    return new LoggingService(
+      {
+        transports,
+      },
+      esTransport
+    )
   }
 
   public static withCLIConfig(): LoggingService {
@@ -56,7 +97,7 @@ export class LoggingService {
       transports: new winston.transports.Console({
         // Log everything to stderr, only the command output value will be written to stdout
         stderrLevels: Object.keys(winston.config.npm.levels),
-        format: cliFormat,
+        format: winston.format.combine(pauseFormat({ id: 'cli' }), cliFormat),
       }),
     })
   }
@@ -65,7 +106,11 @@ export class LoggingService {
     return this.rootLogger.child({ label, ...meta })
   }
 
-  public end(): void {
+  public async end(): Promise<void> {
+    if (this.esTransport) {
+      await this.esTransport.flush()
+    }
     this.rootLogger.end()
+    await Promise.all(this.rootLogger.transports.map((t) => new Promise((resolve) => t.on('finish', resolve))))
   }
 }

+ 14 - 6
distributor-node/src/services/networking/NetworkingService.ts

@@ -24,6 +24,7 @@ const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10 // 10 pending downloa
 
 const STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS = 60000
 const STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT = 5000
+const GLOBAL_AXIOS_TIMEOUT = 10000
 
 export class NetworkingService {
   private config: ReadonlyConfig
@@ -38,6 +39,7 @@ export class NetworkingService {
   private downloadQueue = queue({ concurrency: MAX_CONCURRENT_DOWNLOADS, autostart: true })
 
   constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
+    axios.defaults.timeout = GLOBAL_AXIOS_TIMEOUT
     this.config = config
     this.logging = logging
     this.stateCache = stateCache
@@ -68,10 +70,11 @@ export class NetworkingService {
         this.validateNodeEndpoint(b.endpoint)
         return true
       } catch (err) {
-        this.logger.warn('Invalid storage endpoint detected!', {
+        this.logger.warn(`Invalid storage node endpoint: ${b.endpoint} for bucket ${b.bucketId}`, {
           bucketId: b.bucketId,
           endpoint: b.endpoint,
           err,
+          '@pauseFor': 900,
         })
         return false
       }
@@ -123,8 +126,8 @@ export class NetworkingService {
       const dataA = this.stateCache.getStorageNodeEndpointData(a)
       const dataB = this.stateCache.getStorageNodeEndpointData(b)
       return (
-        _.mean(dataA?.responseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT]) -
-        _.mean(dataB?.responseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT])
+        _.mean(dataA?.last10ResponseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT]) -
+        _.mean(dataB?.last10ResponseTimes || [STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT])
       )
     })
   }
@@ -149,7 +152,7 @@ export class NetworkingService {
         reject(new Error(message))
       }
       const success = (response: StorageNodeDownloadResponse) => {
-        this.logger.info('Download source found', { contentHash, source: response.config.url })
+        this.logger.verbose('Download source found', { contentHash, source: response.config.url })
         pendingDownload.status = 'Downloading'
         onSuccess(response)
         resolve(response)
@@ -274,7 +277,11 @@ export class NetworkingService {
     )
     this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
 
-    endpoints.forEach(({ endpoint }) => this.testLatencyQueue.push(() => this.checkResponseTime(endpoint)))
+    endpoints.forEach(({ endpoint }) =>
+      this.testLatencyQueue.push(async () => {
+        await this.checkResponseTime(endpoint)
+      })
+    )
   }
 
   async checkResponseTime(endpoint: string): Promise<void> {
@@ -283,6 +290,7 @@ export class NetworkingService {
     try {
       // TODO: Use a status endpoint once available?
       await axios.get(endpoint, { timeout: STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT })
+      throw new Error('Unexpected status 200')
     } catch (err) {
       if (axios.isAxiosError(err) && err.response?.status === 404) {
         // This is the expected outcome currently
@@ -290,7 +298,7 @@ export class NetworkingService {
         this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
         this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)
       } else {
-        this.logger.warn('Storage node giving unexpected reponse on root endpoint!', { err })
+        this.logger.warn(`${endpoint} check request unexpected response`, { endpoint, err, '@pauseFor': 900 })
       }
     }
   }

+ 4 - 10
distributor-node/src/services/networking/storage-node/api.ts

@@ -5,20 +5,14 @@ import { LoggingService } from '../../logging'
 import { Logger } from 'winston'
 import { StorageNodeDownloadResponse } from '../../../types'
 
-const AXIOS_TIMEOUT = 10000
-
 export class StorageNodeApi {
   private logger: Logger
   private publicApi: PublicApi
   private endpoint: string
 
   public constructor(endpoint: string, logging: LoggingService) {
-    const axiosConfig: AxiosRequestConfig = {
-      timeout: AXIOS_TIMEOUT,
-    }
     const config = new Configuration({
       basePath: endpoint,
-      baseOptions: axiosConfig,
     })
     this.publicApi = new PublicApi(config)
     this.endpoint = new URL(endpoint).toString()
@@ -31,14 +25,14 @@ export class StorageNodeApi {
         Range: 'bytes=0-0',
       },
     }
-    this.logger.info('Checking object availibility', { contentHash })
+    this.logger.debug('Checking object availibility', { contentHash })
     try {
       await this.publicApi.publicApiFiles(contentHash, options)
-      this.logger.info('Data object available', { contentHash })
+      this.logger.debug('Data object available', { contentHash })
       return true
     } catch (err) {
       if (axios.isAxiosError(err)) {
-        this.logger.info('Data object not available', { err })
+        this.logger.debug('Data object not available', { err })
         return false
       }
       this.logger.error('Unexpected error while requesting data object', { err })
@@ -47,7 +41,7 @@ export class StorageNodeApi {
   }
 
   public async downloadObject(contentHash: string, startAt?: number): Promise<StorageNodeDownloadResponse> {
-    this.logger.info('Sending download request', { contentHash, startAt })
+    this.logger.verbose('Sending download request', { contentHash, startAt })
     const options: AxiosRequestConfig = {
       responseType: 'stream',
     }

+ 5 - 5
distributor-node/src/services/server/controllers/public.ts

@@ -107,7 +107,7 @@ export class PublicApiController {
     }
 
     // Range doesn't start from the beginning of the content or the file was not found - froward request to source storage node
-    this.logger.info(`Forwarding request to ${source.href}`, { source: source.href })
+    this.logger.verbose(`Forwarding request to ${source.href}`, { source: source.href })
     res.setHeader('x-data-source', 'external')
     return proxy(source.origin, { proxyReqPathResolver: () => source.pathname })(req, res, next)
   }
@@ -121,7 +121,7 @@ export class PublicApiController {
     rangeEnd?: number
   ) {
     const isRange = rangeEnd !== undefined
-    this.logger.info(`Serving pending download asset from file`, { contentHash, isRange, objectSize, rangeEnd })
+    this.logger.verbose(`Serving pending download asset from file`, { contentHash, isRange, objectSize, rangeEnd })
     const stream = this.content.createContinousReadStream(contentHash, {
       end: isRange ? rangeEnd || 0 : objectSize - 1,
     })
@@ -197,14 +197,14 @@ export class PublicApiController {
     })
 
     if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
-      this.logger.info('Requested file found in filesystem', { path: this.content.path(contentHash) })
+      this.logger.verbose('Requested file found in filesystem', { path: this.content.path(contentHash) })
       return this.serveAssetFromFilesystem(req, res, next, contentHash)
     } else if (contentHash && pendingDownload) {
-      this.logger.info('Requested file is in pending download state', { path: this.content.path(contentHash) })
+      this.logger.verbose('Requested file is in pending download state', { path: this.content.path(contentHash) })
       res.setHeader('x-cache', 'pending')
       return this.servePendingDownloadAsset(req, res, next, contentHash)
     } else {
-      this.logger.info('Requested file not found in filesystem')
+      this.logger.verbose('Requested file not found in filesystem')
       const objectInfo = await this.networking.dataObjectInfo(objectId)
       if (!objectInfo.exists) {
         const errorRes: ErrorResponse = {

+ 4 - 1
distributor-node/src/services/validation/schemas/configSchema.ts

@@ -7,12 +7,14 @@ export const bytesizeRegex = new RegExp(`^[0-9]+(${bytesizeUnits.join('|')})$`)
 
 export const configSchema: JSONSchema4 = {
   type: 'object',
-  required: ['endpoints', 'directories', 'buckets', 'keys', 'port', 'storageLimit'],
+  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'storageLimit'],
   additionalProperties: false,
   properties: {
+    id: { type: 'string' },
     endpoints: strictObject({
       queryNode: { type: 'string' },
       substrateNode: { type: 'string' },
+      elasticSearch: { type: 'string' },
     }),
     directories: strictObject({
       data: { type: 'string' },
@@ -25,6 +27,7 @@ export const configSchema: JSONSchema4 = {
       properties: {
         file: { type: 'string', enum: Object.keys(winston.config.npm.levels) },
         console: { type: 'string', enum: Object.keys(winston.config.npm.levels) },
+        elastic: { type: 'string', enum: Object.keys(winston.config.npm.levels) },
       },
     },
     storageLimit: { type: 'string', pattern: bytesizeRegex.source },

+ 3 - 0
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -6,9 +6,11 @@
  */
 
 export interface ConfigJson {
+  id: string
   endpoints: {
     queryNode: string
     substrateNode: string
+    elasticSearch: string
   }
   directories: {
     data: string
@@ -18,6 +20,7 @@ export interface ConfigJson {
   log?: {
     file?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
     console?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
+    elastic?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
   }
   storageLimit: string
   port: number

File diff suppressed because it is too large
+ 544 - 16
yarn.lock


Some files were not shown because too many files changed in this diff