Преглед изворни кода

storage-node: refactor to make ipfs host configurable on commandline

Mokhtar Naamani пре 4 година
родитељ
комит
bce0ba650e

+ 0 - 3
docker-compose-with-storage.yml

@@ -26,9 +26,6 @@ services:
       - chain-data:/data
     command: --dev --ws-external --base-path /data
 
-  # colossus exits immediately if it cannot connect to joystream node
-  # some workarounds... https://docs.docker.com/compose/startup-order/
-  # Better option is to have colossus retry when starting up
   colossus:
     image: joystream/apps
     restart: on-failure

+ 29 - 20
storage-node/packages/colossus/bin/cli.js

@@ -116,14 +116,14 @@ function startExpressApp(app, port) {
 }
 
 // Start app
-function startAllServices({ store, api, port }) {
-  const app = require('../lib/app')(PROJECT_ROOT, store, api)
+function startAllServices({ store, api, port, discoveryClient, ipfsHttpGatewayUrl }) {
+  const app = require('../lib/app')(PROJECT_ROOT, store, api, discoveryClient, ipfsHttpGatewayUrl)
   return startExpressApp(app, port)
 }
 
 // Start discovery service app only
-function startDiscoveryService({ api, port }) {
-  const app = require('../lib/discovery')(PROJECT_ROOT, api)
+function startDiscoveryService({ port, discoveryClient }) {
+  const app = require('../lib/discovery')(PROJECT_ROOT, discoveryClient)
   return startExpressApp(app, port)
 }
 
@@ -218,10 +218,10 @@ function getServiceInformation(publicUrl) {
 
 // TODO: instead of recursion use while/async-await and use promise/setTimout based sleep
 // or cleaner code with generators?
-async function announcePublicUrl(api, publicUrl) {
+async function announcePublicUrl(api, publicUrl, publisherClient) {
   // re-announce in future
   const reannounce = function (timeoutMs) {
-    setTimeout(announcePublicUrl, timeoutMs, api, publicUrl)
+    setTimeout(announcePublicUrl, timeoutMs, api, publicUrl, publisherClient)
   }
 
   const chainIsSyncing = await api.chainIsSyncing()
@@ -243,12 +243,11 @@ async function announcePublicUrl(api, publicUrl) {
   }
 
   debug('announcing public url')
-  const { publish } = require('@joystream/service-discovery')
 
   try {
     const serviceInformation = getServiceInformation(publicUrl)
 
-    const keyId = await publish.publish(serviceInformation)
+    const keyId = await publisherClient.publish(serviceInformation)
 
     await api.discovery.setAccountInfo(keyId)
 
@@ -275,18 +274,9 @@ if (!command) {
   command = 'server'
 }
 
-async function startColossus({ api, publicUrl, port }) {
-  // TODO: check valid url, and valid port number
-  const store = getStorage(api, cli.flags)
-  banner()
-  const { startSyncing } = require('../lib/sync')
-  startSyncing(api, { syncPeriod: SYNC_PERIOD_MS }, store)
-  announcePublicUrl(api, publicUrl)
-  return startAllServices({ store, api, port })
-}
-
 const commands = {
   server: async () => {
+    banner()
     let publicUrl, port, api
 
     if (cli.flags.dev) {
@@ -300,7 +290,22 @@ const commands = {
       port = cli.flags.port
     }
 
-    return startColossus({ api, publicUrl, port })
+    // TODO: check valid url, and valid port number
+    const store = getStorage(api, cli.flags)
+
+    const ipfsHost = cli.flags.ipfsHost
+    const ipfs = require('ipfs-http-client')(ipfsHost, '5001', { protocol: 'http' })
+    const { PublisherClient, DiscoveryClient } = require('@joystream/service-discovery')
+    const publisherClient = new PublisherClient(ipfs)
+    const discoveryClient = new DiscoveryClient(ipfs, api)
+    const ipfsHttpGatewayUrl = `http://${ipfsHost}:8080/`
+
+    const { startSyncing } = require('../lib/sync')
+    startSyncing(api, { syncPeriod: SYNC_PERIOD_MS }, store)
+
+    announcePublicUrl(api, publicUrl, publisherClient)
+
+    return startAllServices({ store, api, port, discoveryClient, ipfsHttpGatewayUrl })
   },
   discovery: async () => {
     banner()
@@ -309,8 +314,12 @@ const commands = {
     const wsProvider = cli.flags.wsProvider
     const api = await RuntimeApi.create({ provider_url: wsProvider })
     const port = cli.flags.port
+    const ipfsHost = cli.flags.ipfsHost
+    const ipfs = require('ipfs-http-client')(ipfsHost, '5001', { protocol: 'http' })
+    const { DiscoveryClient } = require('@joystream/service-discovery')
+    const discoveryClient = new DiscoveryClient(ipfs, api)
     await api.untilChainIsSynced()
-    await startDiscoveryService({ api, port })
+    await startDiscoveryService({ api, port, discoveryClient })
   },
 }
 

+ 3 - 1
storage-node/packages/colossus/lib/app.js

@@ -35,7 +35,7 @@ const fileUploads = require('./middleware/file_uploads')
 const pagination = require('@joystream/storage-utils/pagination')
 
 // Configure app
-function createApp(projectRoot, storage, runtime) {
+function createApp(projectRoot, storage, runtime, discoveryClient, ipfsHttpGatewayUrl) {
   const app = express()
   app.use(cors())
   app.use(bodyParser.json())
@@ -59,6 +59,8 @@ function createApp(projectRoot, storage, runtime) {
     dependencies: {
       storage,
       runtime,
+      discoveryClient,
+      ipfsHttpGatewayUrl,
     },
   })
 

+ 2 - 2
storage-node/packages/colossus/lib/discovery.js

@@ -33,7 +33,7 @@ const path = require('path')
 const validateResponses = require('./middleware/validate_responses')
 
 // Configure app
-function createApp(projectRoot, runtime) {
+function createApp(projectRoot, discoveryClient) {
   const app = express()
   app.use(cors())
   app.use(bodyParser.json())
@@ -54,7 +54,7 @@ function createApp(projectRoot, runtime) {
     },
     docsPath: '/swagger.json',
     dependencies: {
-      runtime,
+      discoveryClient,
     },
   })
 

+ 2 - 2
storage-node/packages/colossus/lib/middleware/ipfs_proxy.js

@@ -41,12 +41,12 @@ const createResolver = (storage) => {
   return async (id) => await storage.resolveContentIdWithTimeout(5000, id)
 }
 
-const createProxy = (storage) => {
+const createProxy = (storage, ipfsHttpGatewayUrl) => {
   const pathRewrite = createPathRewriter(createResolver(storage))
 
   return createProxyMiddleware(pathFilter, {
     // Default path to local IPFS HTTP GATEWAY
-    target: 'http://localhost:8080/',
+    target: ipfsHttpGatewayUrl || 'http://localhost:8080/',
     pathRewrite,
     onProxyRes: function (proxRes, req, res) {
       /*

+ 2 - 2
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -27,9 +27,9 @@ function errorHandler(response, err, code) {
   response.status(err.code || code || 500).send({ message: err.toString() })
 }
 
-module.exports = function (storage, runtime) {
+module.exports = function (storage, runtime, ipfsHttpGatewayUrl) {
   // Creat the IPFS HTTP Gateway proxy middleware
-  const proxy = ipfsProxy.createProxy(storage)
+  const proxy = ipfsProxy.createProxy(storage, ipfsHttpGatewayUrl)
 
   const doc = {
     // parameters for all operations in this path

+ 2 - 3
storage-node/packages/colossus/paths/discover/v0/{id}.js

@@ -1,10 +1,9 @@
-const { discover } = require('@joystream/service-discovery')
 const debug = require('debug')('joystream:colossus:api:discovery')
 
 const MAX_CACHE_AGE = 30 * 60 * 1000
 const USE_CACHE = true
 
-module.exports = function (runtime) {
+module.exports = function (discoveryClient) {
   const doc = {
     // parameters for all operations in this path
     parameters: [
@@ -45,7 +44,7 @@ module.exports = function (runtime) {
 
       try {
         debug(`resolving ${id}`)
-        const info = await discover.discover(id, runtime, USE_CACHE, cacheMaxAge)
+        const info = await discoveryClient.discover(id, USE_CACHE, cacheMaxAge)
         if (info === null) {
           debug('info not found')
           res.status(404).end()

+ 209 - 208
storage-node/packages/discovery/discover.js

@@ -2,7 +2,6 @@ const axios = require('axios')
 const debug = require('debug')('joystream:discovery:discover')
 const stripEndingSlash = require('@joystream/storage-utils/stripEndingSlash')
 
-const ipfs = require('ipfs-http-client')('localhost', '5001', { protocol: 'http' })
 const BN = require('bn.js')
 const { newExternallyControlledPromise } = require('@joystream/storage-utils/externalPromise')
 
@@ -14,259 +13,261 @@ function inBrowser() {
   return typeof window !== 'undefined'
 }
 
-/**
- * Map storage-provider id to a Promise of a discovery result. The purpose
- * is to avoid concurrent active discoveries for the same provider.
- */
-const activeDiscoveries = {}
-
-/**
- * Map of storage provider id to string
- * Cache of past discovery lookup results
- */
-const accountInfoCache = {}
-
 /**
  * After what period of time a cached record is considered stale, and would
  * trigger a re-discovery, but only if a query is made for the same provider.
  */
 const CACHE_TTL = 60 * 60 * 1000
 
-/**
- * Queries the ipns id (service key) of the storage provider from the blockchain.
- * If the storage provider is not registered it will return null.
- * @param {number | BN | u64} storageProviderId - the provider id to lookup
- * @param { RuntimeApi } runtimeApi - api instance to query the chain
- * @returns { Promise<string | null> } - ipns multiformat address
- */
-async function getIpnsIdentity(storageProviderId, runtimeApi) {
-  storageProviderId = new BN(storageProviderId)
-  // lookup ipns identity from chain corresponding to storageProviderId
-  const info = await runtimeApi.discovery.getAccountInfo(storageProviderId)
-
-  if (info === null) {
-    // no identity found on chain for account
-    return null
+class DiscoveryClient {
+  /**
+   * Map storage-provider id to a Promise of a discovery result. The purpose
+   * is to avoid concurrent active discoveries for the same provider.
+   */
+  activeDiscoveries = {}
+
+  /**
+   * Map of storage provider id to string
+   * Cache of past discovery lookup results
+   */
+  accountInfoCache = {}
+
+  /*
+   * @param {IpfsClient} ipfs - instance of an ipfs-http-client
+   * @param {RuntimeApi} runtimeApi - api instance to query the chain
+   */
+  constructor(ipfs, runtimeApi) {
+    this.ipfs = ipfs || require('ipfs-http-client')('localhost', '5001', { protocol: 'http' })
+    this.runtimeApi = runtimeApi
   }
-  return info.identity.toString()
-}
-
-/**
- * Resolves provider id to its service information.
- * Will use an IPFS HTTP gateway. If caller doesn't provide a url the default gateway on
- * the local ipfs node will be used.
- * If the storage provider is not registered it will throw an error
- * @param {number | BN | u64} storageProviderId - the provider id to lookup
- * @param {RuntimeApi} runtimeApi - api instance to query the chain
- * @param {string} gateway - optional ipfs http gateway url to perform ipfs queries
- * @returns { Promise<object> } - the published service information
- */
-async function discoverOverIpfsHttpGateway(storageProviderId, runtimeApi, gateway = 'http://localhost:8080') {
-  storageProviderId = new BN(storageProviderId)
-  const isProvider = await runtimeApi.workers.isStorageProvider(storageProviderId)
 
-  if (!isProvider) {
-    throw new Error('Cannot discover non storage providers')
+  /**
+   * Queries the ipns id (service key) of the storage provider from the blockchain.
+   * If the storage provider is not registered it will return null.
+   * @param {number | BN | u64} storageProviderId - the provider id to lookup
+   * @returns { Promise<string | null> } - ipns multiformat address
+   */
+  async getIpnsIdentity(storageProviderId) {
+    storageProviderId = new BN(storageProviderId)
+    // lookup ipns identity from chain corresponding to storageProviderId
+    const info = await this.runtimeApi.discovery.getAccountInfo(storageProviderId)
+
+    if (info === null) {
+      // no identity found on chain for account
+      return null
+    }
+    return info.identity.toString()
   }
 
-  const identity = await getIpnsIdentity(storageProviderId, runtimeApi)
-
-  if (identity === null) {
-    // dont waste time trying to resolve if no identity was found
-    throw new Error('no identity to resolve')
-  }
+  /**
+   * Resolves provider id to its service information.
+   * Will use an IPFS HTTP gateway. If caller doesn't provide a url the default gateway on
+   * the local ipfs node will be used.
+   * If the storage provider is not registered it will throw an error
+   * @param {number | BN | u64} storageProviderId - the provider id to lookup
+   * @param {string} gateway - optional ipfs http gateway url to perform ipfs queries
+   * @returns { Promise<object> } - the published service information
+   */
+  async discoverOverIpfsHttpGateway(storageProviderId, gateway = 'http://localhost:8080') {
+    storageProviderId = new BN(storageProviderId)
+    const isProvider = await this.runtimeApi.workers.isStorageProvider(storageProviderId)
+
+    if (!isProvider) {
+      throw new Error('Cannot discover non storage providers')
+    }
 
-  gateway = stripEndingSlash(gateway)
+    const identity = await this.getIpnsIdentity(storageProviderId)
 
-  const url = `${gateway}/ipns/${identity}`
+    if (identity === null) {
+      // dont waste time trying to resolve if no identity was found
+      throw new Error('no identity to resolve')
+    }
 
-  const response = await axios.get(url)
+    gateway = stripEndingSlash(gateway)
 
-  return response.data
-}
+    const url = `${gateway}/ipns/${identity}`
 
-/**
- * Resolves id of provider to its service information.
- * Will use the provided colossus discovery api endpoint. If no api endpoint
- * is provided it attempts to use the configured endpoints from the chain.
- * If the storage provider is not registered it will throw an error
- * @param {number | BN | u64 } storageProviderId - provider id to lookup
- * @param {RuntimeApi} runtimeApi - api instance to query the chain
- * @param {string} discoverApiEndpoint - url for a colossus discovery api endpoint
- * @returns { Promise<object> } - the published service information
- */
-async function discoverOverJoystreamDiscoveryService(storageProviderId, runtimeApi, discoverApiEndpoint) {
-  storageProviderId = new BN(storageProviderId)
-  const isProvider = await runtimeApi.workers.isStorageProvider(storageProviderId)
+    const response = await axios.get(url)
 
-  if (!isProvider) {
-    throw new Error('Cannot discover non storage providers')
+    return response.data
   }
 
-  const identity = await getIpnsIdentity(storageProviderId, runtimeApi)
-
-  // dont waste time trying to resolve if no identity was found
-  if (identity === null) {
-    throw new Error('no identity to resolve')
-  }
+  /**
+   * Resolves id of provider to its service information.
+   * Will use the provided colossus discovery api endpoint. If no api endpoint
+   * is provided it attempts to use the configured endpoints from the chain.
+   * If the storage provider is not registered it will throw an error
+   * @param {number | BN | u64 } storageProviderId - provider id to lookup
+   * @param {string} discoverApiEndpoint - url for a colossus discovery api endpoint
+   * @returns { Promise<object> } - the published service information
+   */
+  async discoverOverJoystreamDiscoveryService(storageProviderId, discoverApiEndpoint) {
+    storageProviderId = new BN(storageProviderId)
+    const isProvider = await this.runtimeApi.workers.isStorageProvider(storageProviderId)
+
+    if (!isProvider) {
+      throw new Error('Cannot discover non storage providers')
+    }
 
-  if (!discoverApiEndpoint) {
-    // Use bootstrap nodes
-    const discoveryBootstrapNodes = await runtimeApi.discovery.getBootstrapEndpoints()
+    const identity = await this.getIpnsIdentity(storageProviderId)
 
-    if (discoveryBootstrapNodes.length) {
-      discoverApiEndpoint = stripEndingSlash(discoveryBootstrapNodes[0].toString())
-    } else {
-      throw new Error('No known discovery bootstrap nodes found on network')
+    // dont waste time trying to resolve if no identity was found
+    if (identity === null) {
+      throw new Error('no identity to resolve')
     }
-  }
 
-  const url = `${discoverApiEndpoint}/discover/v0/${storageProviderId.toNumber()}`
+    if (!discoverApiEndpoint) {
+      // Use bootstrap nodes
+      const discoveryBootstrapNodes = await this.runtimeApi.discovery.getBootstrapEndpoints()
 
-  // should have parsed if data was json?
-  const response = await axios.get(url)
+      if (discoveryBootstrapNodes.length) {
+        discoverApiEndpoint = stripEndingSlash(discoveryBootstrapNodes[0].toString())
+      } else {
+        throw new Error('No known discovery bootstrap nodes found on network')
+      }
+    }
 
-  return response.data
-}
+    const url = `${discoverApiEndpoint}/discover/v0/${storageProviderId.toNumber()}`
 
-/**
- * Resolves id of provider to its service information.
- * Will use the local IPFS node over RPC interface.
- * If the storage provider is not registered it will throw an error.
- * @param {number | BN | u64 } storageProviderId - provider id to lookup
- * @param {RuntimeApi} runtimeApi - api instance to query the chain
- * @returns { Promise<object> } - the published service information
- */
-async function discoverOverLocalIpfsNode(storageProviderId, runtimeApi) {
-  storageProviderId = new BN(storageProviderId)
-  const isProvider = await runtimeApi.workers.isStorageProvider(storageProviderId)
+    // should have parsed if data was json?
+    const response = await axios.get(url)
 
-  if (!isProvider) {
-    throw new Error('Cannot discover non storage providers')
+    return response.data
   }
 
-  const identity = await getIpnsIdentity(storageProviderId, runtimeApi)
+  /**
+   * Resolves id of provider to its service information.
+   * Will use the local IPFS node over RPC interface.
+   * If the storage provider is not registered it will throw an error.
+   * @param {number | BN | u64 } storageProviderId - provider id to lookup
+   * @returns { Promise<object> } - the published service information
+   */
+  async discoverOverLocalIpfsNode(storageProviderId) {
+    storageProviderId = new BN(storageProviderId)
+    const isProvider = await this.runtimeApi.workers.isStorageProvider(storageProviderId)
+
+    if (!isProvider) {
+      throw new Error('Cannot discover non storage providers')
+    }
 
-  if (identity === null) {
-    // dont waste time trying to resolve if no identity was found
-    throw new Error('no identity to resolve')
-  }
+    const identity = await this.getIpnsIdentity(storageProviderId)
 
-  const ipnsAddress = `/ipns/${identity}/`
+    if (identity === null) {
+      // dont waste time trying to resolve if no identity was found
+      throw new Error('no identity to resolve')
+    }
 
-  debug('resolved ipns to ipfs object')
-  // Can this call hang forever!? can/should we set a timeout?
-  const ipfsName = await ipfs.name.resolve(ipnsAddress, {
-    // don't recurse, there should only be one indirection to the service info file
-    recursive: false,
-    nocache: false,
-  })
+    const ipnsAddress = `/ipns/${identity}/`
 
-  debug('getting ipfs object', ipfsName)
-  const data = await ipfs.get(ipfsName) // this can sometimes hang forever!?! can we set a timeout?
+    debug('resolved ipns to ipfs object')
+    // Can this call hang forever!? can/should we set a timeout?
+    const ipfsName = await this.ipfs.name.resolve(ipnsAddress, {
+      // don't recurse, there should only be one indirection to the service info file
+      recursive: false,
+      nocache: false,
+    })
 
-  // there should only be one file published under the resolved path
-  const content = data[0].content
+    debug('getting ipfs object', ipfsName)
+    const data = await this.ipfs.get(ipfsName) // this can sometimes hang forever!?! can we set a timeout?
 
-  return JSON.parse(content)
-}
+    // there should only be one file published under the resolved path
+    const content = data[0].content
 
-/**
- * Internal method that handles concurrent discoveries and caching of results. Will
- * select the appropriate discovery protocol based on whether we are in a browser environment or not.
- * If not in a browser it expects a local ipfs node to be running.
- * @param {number | BN | u64} storageProviderId - ID of the storage provider
- * @param {RuntimeApi} runtimeApi - api instance for querying the chain
- * @returns { Promise<object | null> } - the published service information
- */
-async function _discover(storageProviderId, runtimeApi) {
-  storageProviderId = new BN(storageProviderId)
-  const id = storageProviderId.toNumber()
-
-  const discoveryResult = activeDiscoveries[id]
-  if (discoveryResult) {
-    debug('discovery in progress waiting for result for', id)
-    return discoveryResult
+    return JSON.parse(content)
   }
 
-  debug('starting new discovery for', id)
-  const deferredDiscovery = newExternallyControlledPromise()
-  activeDiscoveries[id] = deferredDiscovery.promise
-
-  let result
-  try {
-    if (inBrowser()) {
-      result = await discoverOverJoystreamDiscoveryService(storageProviderId, runtimeApi)
-    } else {
-      result = await discoverOverLocalIpfsNode(storageProviderId, runtimeApi)
+  /**
+   * Internal method that handles concurrent discoveries and caching of results. Will
+   * select the appropriate discovery protocol based on whether we are in a browser environment or not.
+   * If not in a browser it expects a local ipfs node to be running.
+   * @param {number | BN | u64} storageProviderId - ID of the storage provider
+   * @returns { Promise<object | null> } - the published service information
+   */
+  async _discover(storageProviderId) {
+    storageProviderId = new BN(storageProviderId)
+    const id = storageProviderId.toNumber()
+
+    const discoveryResult = this.activeDiscoveries[id]
+    if (discoveryResult) {
+      debug('discovery in progress waiting for result for', id)
+      return discoveryResult
     }
 
-    debug(result)
-    result = JSON.stringify(result)
-    accountInfoCache[id] = {
-      value: result,
-      updated: Date.now(),
-    }
+    debug('starting new discovery for', id)
+    const deferredDiscovery = newExternallyControlledPromise()
+    this.activeDiscoveries[id] = deferredDiscovery.promise
 
-    deferredDiscovery.resolve(result)
-    delete activeDiscoveries[id]
-    return result
-  } catch (err) {
-    // we catch the error so we can update all callers
-    // and throw again to inform the first caller.
-    debug(err.message)
-    delete activeDiscoveries[id]
-    // deferredDiscovery.reject(err)
-    deferredDiscovery.resolve(null) // resolve to null until we figure out the issue below
-    // throw err // <-- throwing but this isn't being
-    // caught correctly in express server! Is it because there is an uncaught promise somewhere
-    // in the prior .reject() call ?
-    // I've only seen this behaviour when error is from ipfs-client
-    // ... is this unique to errors thrown from ipfs-client?
-    // Problem is its crashing the node so just return null for now
-    return null
-  }
-}
+    let result
+    try {
+      if (inBrowser()) {
+        result = await this.discoverOverJoystreamDiscoveryService(storageProviderId)
+      } else {
+        result = await this.discoverOverLocalIpfsNode(storageProviderId)
+      }
 
-/**
- * Cached discovery of storage provider service information. If useCachedValue is
- * set to true, will always return the cached result if found. New discovery will be triggered
- * if record is found to be stale. If a stale record is not desired (CACHE_TTL old) pass a non zero
- * value for maxCacheAge, which will force a new discovery and return the new resolved value.
- * This method in turn calls _discovery which handles concurrent discoveries and selects the appropriate
- * protocol to perform the query.
- * If the storage provider is not registered it will resolve to null
- * @param {number | BN | u64} storageProviderId - provider to discover
- * @param {RuntimeApi} runtimeApi - api instance to query the chain
- * @param {bool} useCachedValue - optionaly use chached queries
- * @param {number} maxCacheAge - maximum age of a cached query that triggers automatic re-discovery
- * @returns { Promise<object | null> } - the published service information
- */
-async function discover(storageProviderId, runtimeApi, useCachedValue = false, maxCacheAge = 0) {
-  storageProviderId = new BN(storageProviderId)
-  const id = storageProviderId.toNumber()
-  const cached = accountInfoCache[id]
-
-  if (cached && useCachedValue) {
-    if (maxCacheAge > 0) {
-      // get latest value
-      if (Date.now() > cached.updated + maxCacheAge) {
-        return _discover(storageProviderId, runtimeApi)
+      debug(result)
+      result = JSON.stringify(result)
+      this.accountInfoCache[id] = {
+        value: result,
+        updated: Date.now(),
       }
+
+      deferredDiscovery.resolve(result)
+      delete this.activeDiscoveries[id]
+      return result
+    } catch (err) {
+      // we catch the error so we can update all callers
+      // and throw again to inform the first caller.
+      debug(err.message)
+      delete this.activeDiscoveries[id]
+      // deferredDiscovery.reject(err)
+      deferredDiscovery.resolve(null) // resolve to null until we figure out the issue below
+      // throw err // <-- throwing but this isn't being
+      // caught correctly in express server! Is it because there is an uncaught promise somewhere
+      // in the prior .reject() call ?
+      // I've only seen this behaviour when error is from ipfs-client
+      // ... is this unique to errors thrown from ipfs-client?
+      // Problem is its crashing the node so just return null for now
+      return null
     }
-    // refresh if cache if stale, new value returned on next cached query
-    if (Date.now() > cached.updated + CACHE_TTL) {
-      _discover(storageProviderId, runtimeApi)
+  }
+
+  /**
+   * Cached discovery of storage provider service information. If useCachedValue is
+   * set to true, will always return the cached result if found. New discovery will be triggered
+   * if record is found to be stale. If a stale record is not desired (CACHE_TTL old) pass a non zero
+   * value for maxCacheAge, which will force a new discovery and return the new resolved value.
+   * This method in turn calls _discovery which handles concurrent discoveries and selects the appropriate
+   * protocol to perform the query.
+   * If the storage provider is not registered it will resolve to null
+   * @param {number | BN | u64} storageProviderId - provider to discover
+   * @param {bool} useCachedValue - optionaly use chached queries
+   * @param {number} maxCacheAge - maximum age of a cached query that triggers automatic re-discovery
+   * @returns { Promise<object | null> } - the published service information
+   */
+  async discover(storageProviderId, useCachedValue = false, maxCacheAge = 0) {
+    storageProviderId = new BN(storageProviderId)
+    const id = storageProviderId.toNumber()
+    const cached = this.accountInfoCache[id]
+
+    if (cached && useCachedValue) {
+      if (maxCacheAge > 0) {
+        // get latest value
+        if (Date.now() > cached.updated + maxCacheAge) {
+          return this._discover(storageProviderId)
+        }
+      }
+      // refresh if cache if stale, new value returned on next cached query
+      if (Date.now() > cached.updated + CACHE_TTL) {
+        this._discover(storageProviderId)
+      }
+      // return best known value
+      return cached.value
     }
-    // return best known value
-    return cached.value
+    return this._discover(storageProviderId)
   }
-  return _discover(storageProviderId, runtimeApi)
 }
 
 module.exports = {
-  discover,
-  discoverOverJoystreamDiscoveryService,
-  discoverOverIpfsHttpGateway,
-  discoverOverLocalIpfsNode,
+  DiscoveryClient,
 }

+ 0 - 37
storage-node/packages/discovery/example.js

@@ -1,37 +0,0 @@
-const { RuntimeApi } = require('@joystream/storage-runtime-api')
-
-const { discover, publish } = require('./')
-
-async function main() {
-  // The assigned storage-provider id
-  const providerId = 0
-
-  const runtimeApi = await RuntimeApi.create({
-    // Path to the role account key file of the provider
-    account_file: '/path/to/role_account_key_file.json',
-    storageProviderId: providerId,
-  })
-
-  const ipnsId = await publish.publish(
-    {
-      asset: {
-        version: 1,
-        endpoint: 'http://endpoint.com',
-      },
-    },
-    runtimeApi
-  )
-
-  console.log(ipnsId)
-
-  // register ipnsId on chain
-  await runtimeApi.setAccountInfo(ipnsId)
-
-  const serviceInfo = await discover.discover(providerId, runtimeApi)
-
-  console.log(serviceInfo)
-
-  runtimeApi.api.disconnect()
-}
-
-main()

+ 5 - 2
storage-node/packages/discovery/index.js

@@ -1,4 +1,7 @@
+const { PublisherClient } = require('./publish')
+const { DiscoveryClient } = require('./discover')
+
 module.exports = {
-  discover: require('./discover'),
-  publish: require('./publish'),
+  PublisherClient,
+  DiscoveryClient,
 }

+ 51 - 46
storage-node/packages/discovery/publish.js

@@ -1,6 +1,5 @@
-const ipfsClient = require('ipfs-http-client')
-
-const ipfs = ipfsClient('localhost', '5001', { protocol: 'http' })
+// const ipfsClient = require('ipfs-http-client')
+// const ipfs = ipfsClient('localhost', '5001', { protocol: 'http' })
 
 const debug = require('debug')('joystream:discovery:publish')
 
@@ -33,56 +32,62 @@ function encodeServiceInfo(info) {
   })
 }
 
-/**
- * Publishes the service information, encoded using the standard defined in encodeServiceInfo()
- * to ipfs, using the local ipfs node's PUBLISH_KEY, and returns the key id used to publish.
- * What we refer to as the ipns id.
- * @param {object} serviceInfo - the service information to publish
- * @returns {string} - the ipns id
- */
-async function publish(serviceInfo) {
-  const keys = await ipfs.key.list()
-  let servicesKey = keys.find((key) => key.name === PUBLISH_KEY)
-
-  // An ipfs node will always have the self key.
-  // If the publish key is specified as anything else and it doesn't exist
-  // we create it.
-  if (PUBLISH_KEY !== 'self' && !servicesKey) {
-    debug('generating ipns services key')
-    servicesKey = await ipfs.key.gen(PUBLISH_KEY, {
-      type: 'rsa',
-      size: 2048,
-    })
+class PublisherClient {
+  constructor(ipfs) {
+    this.ipfs = ipfs || require('ipfs-http-client')('localhost', '5001', { protocol: 'http' })
   }
 
-  if (!servicesKey) {
-    throw new Error('No IPFS publishing key available!')
-  }
+  /**
+   * Publishes the service information, encoded using the standard defined in encodeServiceInfo()
+   * to ipfs, using the local ipfs node's PUBLISH_KEY, and returns the key id used to publish.
+   * What we refer to as the ipns id.
+   * @param {object} serviceInfo - the service information to publish
+   * @returns {string} - the ipns id
+   */
+  async publish(serviceInfo) {
+    const keys = await this.ipfs.key.list()
+    let servicesKey = keys.find((key) => key.name === PUBLISH_KEY)
 
-  debug('adding service info file to node')
-  const files = await ipfs.add(encodeServiceInfo(serviceInfo))
+    // An ipfs node will always have the self key.
+    // If the publish key is specified as anything else and it doesn't exist
+    // we create it.
+    if (PUBLISH_KEY !== 'self' && !servicesKey) {
+      debug('generating ipns services key')
+      servicesKey = await this.ipfs.key.gen(PUBLISH_KEY, {
+        type: 'rsa',
+        size: 2048,
+      })
+    }
 
-  debug('publishing...')
-  const published = await ipfs.name.publish(files[0].hash, {
-    key: PUBLISH_KEY,
-    resolve: false,
-    // lifetime: // string - Time duration of the record. Default: 24h
-    // ttl:      // string - Time duration this record should be cached
-  })
+    if (!servicesKey) {
+      throw new Error('No IPFS publishing key available!')
+    }
+
+    debug('adding service info file to node')
+    const files = await this.ipfs.add(encodeServiceInfo(serviceInfo))
 
-  // The name and ipfs hash of the published service information file, eg.
-  // {
-  //   name: 'QmUNQCkaU1TRnc1WGixqEP3Q3fazM8guSdFRsdnSJTN36A',
-  //   value: '/ipfs/QmcSjtVMfDSSNYCxNAb9PxNpEigCw7h1UZ77gip3ghfbnA'
-  // }
-  // .. The name is equivalent to the key id that was used.
-  debug(published)
+    debug('publishing...')
+    const published = await this.ipfs.name.publish(files[0].hash, {
+      key: PUBLISH_KEY,
+      resolve: false,
+      // lifetime: // string - Time duration of the record. Default: 24h
+      // ttl:      // string - Time duration this record should be cached
+    })
 
-  // Return the key id under which the content was published. Which is used
-  // to lookup the actual ipfs content id of the published service information
-  return servicesKey.id
+    // The name and ipfs hash of the published service information file, eg.
+    // {
+    //   name: 'QmUNQCkaU1TRnc1WGixqEP3Q3fazM8guSdFRsdnSJTN36A',
+    //   value: '/ipfs/QmcSjtVMfDSSNYCxNAb9PxNpEigCw7h1UZ77gip3ghfbnA'
+    // }
+    // .. The name is equivalent to the key id that was used.
+    debug(published)
+
+    // Return the key id under which the content was published. Which is used
+    // to lookup the actual ipfs content id of the published service information
+    return servicesKey.id
+  }
 }
 
 module.exports = {
-  publish,
+  PublisherClient,
 }