Przeglądaj źródła

storage-node: runtime-api signAndSend better event handling

Mokhtar Naamani 4 lat temu
rodzic
commit
1e5d820c02

+ 1 - 1
storage-node/packages/colossus/lib/sync.js

@@ -66,7 +66,7 @@ async function syncCallback(api, storage) {
       // create relationship
       debug(`Creating new storage relationship for ${contentId.encode()}`)
       try {
-        relationshipId = await api.assets.createAndReturnStorageRelationship(roleAddress, providerId, contentId)
+        relationshipId = await api.assets.createStorageRelationship(roleAddress, providerId, contentId)
         await api.assets.toggleStorageRelationshipReady(roleAddress, providerId, relationshipId, true)
       } catch (err) {
         debug(`Error creating new storage relationship ${contentId.encode()}: ${err.stack}`)

+ 1 - 1
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -157,7 +157,7 @@ module.exports = function (storage, runtime) {
 
             debug('creating storage relationship for newly uploaded content')
             // Create storage relationship and flip it to ready.
-            const dosrId = await runtime.assets.createAndReturnStorageRelationship(roleAddress, providerId, id)
+            const dosrId = await runtime.assets.createStorageRelationship(roleAddress, providerId, id)
 
             debug('toggling storage relationship for newly uploaded content')
             await runtime.assets.toggleStorageRelationshipReady(roleAddress, providerId, dosrId, true)

+ 9 - 25
storage-node/packages/runtime-api/assets.js

@@ -95,17 +95,6 @@ class AssetsApi {
     return this.base.signAndSend(providerAccountId, tx)
   }
 
-  /*
-   * Creates storage relationship for a data object and provider
-   */
-  async createStorageRelationship(providerAccountId, storageProviderId, contentId, callback) {
-    contentId = parseContentId(contentId)
-    const tx = this.base.api.tx.dataObjectStorageRegistry.addRelationship(storageProviderId, contentId)
-
-    const subscribed = [['dataObjectStorageRegistry', 'DataObjectStorageRelationshipAdded']]
-    return this.base.signAndSend(providerAccountId, tx, subscribed, callback)
-  }
-
   /*
    * Gets storage relationship for contentId for the given provider
    */
@@ -126,22 +115,17 @@ class AssetsApi {
   }
 
   /*
-   * Creates storage relationship for a data object and provider and returns the relationship id
+   * Creates storage relationship for a data object and provider and
+   * returns the relationship id
    */
-  async createAndReturnStorageRelationship(providerAccountId, storageProviderId, contentId) {
+  async createStorageRelationship(providerAccountId, storageProviderId, contentId) {
     contentId = parseContentId(contentId)
-    // TODO: rewrite this method to async-await style
-    // eslint-disable-next-line  no-async-promise-executor
-    return new Promise(async (resolve, reject) => {
-      try {
-        await this.createStorageRelationship(providerAccountId, storageProviderId, contentId, (events) => {
-          events.forEach((event) => {
-            resolve(event[1].DataObjectStorageRelationshipId)
-          })
-        })
-      } catch (err) {
-        reject(err)
-      }
+    const tx = this.base.api.tx.dataObjectStorageRegistry.addRelationship(storageProviderId, contentId)
+
+    return this.base.signAndSendThen(providerAccountId, tx, {
+      eventModule: 'dataObjectStorageRegistry',
+      eventName: 'DataObjectStorageRelationshipAdded',
+      eventProperty: 'DataObjectStorageRelationshipId', // index 0
     })
   }
 

+ 1 - 1
storage-node/packages/runtime-api/identities.js

@@ -188,7 +188,7 @@ class IdentitiesApi {
     return this.base.signAndSendThenGetEventResult(accountId, tx, {
       eventModule: 'members',
       eventName: 'MemberRegistered',
-      eventProperty: 'MemberId',
+      eventProperty: 'MemberId', // index 0
     })
   }
 

+ 129 - 97
storage-node/packages/runtime-api/index.js

@@ -30,7 +30,14 @@ const { AssetsApi } = require('@joystream/storage-runtime-api/assets')
 const { DiscoveryApi } = require('@joystream/storage-runtime-api/discovery')
 const { SystemApi } = require('@joystream/storage-runtime-api/system')
 const AsyncLock = require('async-lock')
-const { newExternallyControlledPromise } = require('@joystream/storage-utils/externalPromise')
+const Promise = require('bluebird')
+
+Promise.config({
+  cancellation: true,
+})
+
+// const ASYNC_LOCK_TIMEOUT = 30 * 1000
+const TX_TIMEOUT = 20 * 1000
 
 /*
  * Initialize runtime (substrate) API and keyring.
@@ -55,6 +62,7 @@ class RuntimeApi {
     // Create the API instrance
     this.api = await ApiPromise.create({ provider })
 
+    // this.asyncLock = new AsyncLock({ timeout: ASYNC_LOCK_TIMEOUT, maxPending: 100 })
     this.asyncLock = new AsyncLock()
 
     // Keep track locally of account nonces.
@@ -84,15 +92,9 @@ class RuntimeApi {
     return this.asyncLock.acquire(`${accountId}`, func)
   }
 
-  static matchingEvents(subscribed, events) {
-    if (!events.length) return []
-
+  static matchingEvents(subscribed = [], events = []) {
     const filtered = events.filter((record) => {
-      const { event /*phase*/ } = record
-
-      // Show what we are busy with
-      // debug(`\t${event.section}:${event.method}:: (phase=${phase.toString()})`)
-      // debug(`\t\t${event.meta.documentation.toString()}`)
+      const { event } = record
 
       // Skip events we're not interested in.
       const matching = subscribed.filter((value) => {
@@ -105,41 +107,26 @@ class RuntimeApi {
       const { event } = record
       const types = event.typeDef
 
-      // Loop through each of the parameters, displaying the type and data
+      // Loop through each of the event data items.
+      // FIX: we are loosing however some items if they have the same type
+      // only the first occurance is saved in the payload map, as the cost of convenience
+      // to get a value "by name" - why not just return the original EventRecord
+      // and let the calller use the index to get the value desired?
       const payload = {}
       event.data.forEach((data, index) => {
-        // debug(`\t${types[index].type}: ${data.toString()}`)
-        payload[types[index].type] = data
+        const type = types[index].type
+        payload[type] = payload[type] || data
       })
 
       const fullName = `${event.section}.${event.method}`
       return [fullName, payload]
     })
 
-    debug('Events', JSON.stringify(mapped))
+    mapped.length && debug('Mapped Events', JSON.stringify(mapped))
 
     return mapped
   }
 
-  // Returns a function that takes events from transaction lifecycle updates
-  // that look for matching events and makes a callback and absorbs any expections
-  // raised by the callback to ensure we continue to process the complete
-  // transaction lifecyle.
-  static makeEventsHandler(subscribed, callback) {
-    return function eventsHandler(events) {
-      try {
-        if (subscribed && callback) {
-          const matched = RuntimeApi.matchingEvents(subscribed, events)
-          if (matched.length) {
-            callback(matched)
-          }
-        }
-      } catch (err) {
-        debug(`Error handling events ${err.stack}`)
-      }
-    }
-  }
-
   /*
    * signAndSend() with nonce tracking, to enable concurrent sending of transacctions
    * so that they can be included in the same block. Allows you to use the accountId instead
@@ -148,88 +135,124 @@ class RuntimeApi {
    * If the subscribed events are given, and a callback as well, then the
    * callback is invoked with matching events.
    */
-  async signAndSend(accountId, tx, subscribed, callback) {
+  async signAndSend(accountId, tx, subscribed) {
+    // Accept both a string or AccountId as argument
     accountId = this.identities.keyring.encodeAddress(accountId)
 
-    // Key must be unlocked
+    // Throws if keyPair is not found
     const fromKey = this.identities.keyring.getPair(accountId)
 
+    // Key must be unlocked to use
     if (fromKey.isLocked) {
       throw new Error('Must unlock key before using it to sign!')
     }
 
-    // Promise that will be resolved when the submitted transaction is finalized
-    // it will be rejected if the transaction is rejected by the node.
-    const finalizedPromise = newExternallyControlledPromise()
+    // Functions to be called when the submitted transaction is finalized. They are initialized
+    // after the transaction is submitted to the resolve and reject function of the final promise
+    // returned by signAndSend
+    // on extrinsic success
+    let onFinalizedSuccess
+    // on extrinsic failure
+    let onFinalizedFailed
 
-    // function assigned when transaction is successfully submitted. Call
-    // it to unsubsribe from events.
+    // Function assigned when transaction is successfully submitted. Invoking it ubsubscribes from
+    // listening to tx status updates.
     let unsubscribe
 
-    const handleEvents = RuntimeApi.makeEventsHandler(subscribed, callback)
+    let lastTxUpdateResult
 
-    const handleTxUpdates = ({ events = [], status }) => {
-      // when handling tx life cycle we cannot detect api disconnect and could be waiting
-      // for events for ever!
-      handleEvents(events)
+    const handleTxUpdates = (result) => {
+      const { events = [], status } = result
+      lastTxUpdateResult = result
+      debug(status.type)
+
+      // Deal with statuses which will prevent
+      // extrinsic from finalizing.
+      if (status.isUsurped) {
+        debug(JSON.stringify(result))
+        onFinalizedFailed({ err: 'Usurped' })
+      }
+
+      if (status.isDropped) {
+        debug(JSON.stringify(result))
+        onFinalizedFailed({ err: 'Dropped' })
+      }
+
+      // My gutt says this comes before isReady and causes await send() to throw
+      // and therefore onFinalizedFailed isn't initialized.
+      // We don't need to do anything other than log it?
+      // This would be BadProof, bad encoding of the transaction.. etc?
+      if (status.isInvalid) {
+        debug(JSON.stringify(result))
+      }
 
       if (status.isFinalized) {
-        // transaction was included in block (finalized)
-        // resolve with the transaction hash
         unsubscribe()
-        finalizedPromise.resolve(status.asFinalized)
-      } else if (status.isFuture) {
-        // This can happen if the code is incorrect, but also in a scenario where a joystream-node
-        // lost connectivity, the storage node submitted a few transactions, and incremented the nonce.
-        // The joystream-node later was restarted and storage-node continues using cached nonce.
+        const mappedEvents = RuntimeApi.matchingEvents(subscribed, events)
 
-        // Can we detect api disconnect and reset nonce?
+        const failed = result.findRecord('system', 'ExtrinsicFailed')
+        const success = result.findRecord('system', 'ExtrinsicSuccess')
 
-        debug(`== Error: Submitted transaction with future nonce ==`)
-        delete this.nonces[accountId]
-        finalizedPromise.reject('Future Tx Nonce')
+        if (failed) {
+          onFinalizedFailed({ err: 'ExtrinsicFailed', result, tx: status.asFinalized })
+        } else if (success) {
+          // TODO: check if it was sudo call .. can we detect failed dispatch? Sudid true/false event?
+          onFinalizedSuccess({ mappedEvents, result, tx: status.asFinalized })
+        }
       }
     }
 
     // synchronize access to nonce
     await this.executeWithAccountLock(accountId, async () => {
-      // Try to get the next nonce to use
-      let nonce = this.nonces[accountId]
-      // Remember if we read a previously saved nonce
-      const nonceWasCached = nonce !== undefined
-      // If it wasn't cached read it from chain and save it
-      nonce = this.nonces[accountId] = nonce || (await this.api.query.system.accountNonce(accountId))
+      const nonce = this.nonces[accountId] || (await this.api.query.system.accountNonce(accountId))
 
       try {
         unsubscribe = await tx.sign(fromKey, { nonce }).send(handleTxUpdates)
-        // transaction submitted successfully, increment and save nonce,
-        // unless it was reset in handleTxCycle()
-        if (this.nonces[accountId] !== undefined) {
-          this.nonces[accountId] = nonce.addn(1)
-        }
+        debug('TransactionSubmitted')
+        // transaction submitted successfully, increment and save nonce.
+        this.nonces[accountId] = nonce.addn(1)
       } catch (err) {
-        debug('Transaction Rejected:', err.toString())
-        // Error here could be simply bad input to the transactions. It may also
-        // be due to bad nonce, resulting in attempt to replace transactions with same nonce
-        // either that were future transactions,
-        // or because of stale nonces (this will happen while a joystream-node is far behind in syncing because
-        // we will read the nonce from chain and by the time we submit the transaction, the node would have fetched a few more blocks
-        // where the nonce of the account might have changed to a higher value)
-        // Occasionally the storage node operator will use their role account from another application
-        // to send transactions to manage their role which will change the nonce, and due to a race condition
-        // between reading the nonce from chain, and signing a transaction, the selected nonce may become stale.
-
-        // All we can do is reset the nonce and re-read it from chain on next tx submit attempt.
-        // The storage node will eventually recover.
-        if (nonceWasCached) {
+        const errstr = err.toString()
+        debug('TransactionRejected:', errstr)
+        // This happens when nonce is already used in finalized transactions, ie. the selected nonce
+        // was less than current account nonce. A few scenarios where this happens (other than incorrect code)
+        // 1. When a past future tx got finalized because we submitted some transactions
+        // using up the nonces upto that point.
+        // 2. Can happen while storage-node is talkig to a joystream-node that is still not fully
+        // synced.
+        // 3. Storage role account holder sent a transaction just ahead of us via another app.
+        if (errstr.indexOf('ExtrinsicStatus:: 1010: Invalid Transaction: Stale') !== -1) {
+          // In case 1 or 3 a quick recovery could work by just incrementing, but since we
+          // cannot detect which case we are in just reset and force re-reading nonce. Even
+          // that may not be sufficient expect after a few more failures..
           delete this.nonces[accountId]
         }
 
-        finalizedPromise.reject(err)
+        // Technically it means a transaction in the mempool with same
+        // nonce and same fees being paid so we cannot replace it, either we didn't correctly
+        // increment the nonce or someone external to this application sent a transaction
+        // with same nonce ahead of us.
+        if (errstr.indexOf('ExtrinsicStatus:: 1014: Priority is too low') !== -1) {
+          delete this.nonces[accountId]
+        }
+
+        throw err
       }
     })
 
-    return finalizedPromise.promise
+    // We cannot get tx updates for a future tx so return now to avoid blocking caller
+    if (lastTxUpdateResult.status.isFuture) {
+      debug('Warning: Submitted extrinsic with future nonce')
+      return {}
+    }
+
+    // Return a promise that will resolve when the transaction finalizes.
+    // On timeout it will be rejected. Timeout is a workaround for dealing with the
+    // fact that if rpc connection is lost to node we have no way of detecting it or recovering.
+    return new Promise((resolve, reject) => {
+      onFinalizedSuccess = resolve
+      onFinalizedFailed = reject
+    }).timeout(TX_TIMEOUT)
   }
 
   /*
@@ -237,23 +260,32 @@ class RuntimeApi {
    * module and return eventProperty from the event.
    */
   async signAndSendThenGetEventResult(senderAccountId, tx, { eventModule, eventName, eventProperty }) {
+    if (!eventModule || !eventName || !eventProperty) {
+      throw new Error('MissingSubscribeEventDetails')
+    }
+
     // event from a module,
     const subscribed = [[eventModule, eventName]]
-    // TODO: rewrite this method to async-await style
-    // eslint-disable-next-line  no-async-promise-executor
-    return new Promise(async (resolve, reject) => {
-      try {
-        await this.signAndSend(senderAccountId, tx, subscribed, (events) => {
-          events.forEach((event) => {
-            // fix - we may not necessarily want the first event
-            // if there are multiple events emitted,
-            resolve(event[1][eventProperty])
-          })
-        })
-      } catch (err) {
-        reject(err)
-      }
-    })
+
+    const { mappedEvents } = await this.signAndSend(senderAccountId, tx, subscribed)
+
+    if (!mappedEvents) {
+      // The tx was a future so it was not possible and will not be possible to get events
+      throw new Error('NoEventsCanBeCaptured')
+    }
+
+    if (!mappedEvents.length) {
+      // our expected event was not emitted
+      throw new Error('ExpectedEventNotFound')
+    }
+
+    // fix - we may not necessarily want the first event
+    // if there are multiple instances of the same event
+    const firstEvent = mappedEvents[0]
+    const payload = firstEvent[1]
+    // Note if the event data contained more than one element of the same type
+    // we can only get the first occurance
+    return payload[eventProperty]
   }
 }
 

+ 3 - 3
storage-node/packages/runtime-api/workers.js

@@ -208,7 +208,7 @@ class WorkersApi {
     return this.base.signAndSendThenGetEventResult(senderAccount, tx, {
       eventModule: 'storageWorkingGroup',
       eventName: 'OpeningAdded',
-      eventProperty: 'OpeningId',
+      eventProperty: 'OpeningId', // index 0
     })
   }
 
@@ -228,7 +228,7 @@ class WorkersApi {
     return this.base.signAndSendThenGetEventResult(memberAccount, applyTx, {
       eventModule: 'storageWorkingGroup',
       eventName: 'AppliedOnOpening',
-      eventProperty: 'ApplicationId',
+      eventProperty: 'ApplicationId', // index 1
     })
   }
 
@@ -289,7 +289,7 @@ class WorkersApi {
     return this.base.signAndSendThenGetEventResult(senderAccount, tx, {
       eventModule: 'storageWorkingGroup',
       eventName: 'OpeningFilled',
-      eventProperty: 'ApplicationIdToWorkerIdMap',
+      eventProperty: 'ApplicationIdToWorkerIdMap', // index 1
     })
   }
 }