|
@@ -22,7 +22,6 @@ const debug = require('debug')('joystream:runtime:base')
|
|
|
|
|
|
const { registerJoystreamTypes } = require('@joystream/types')
|
|
|
const { ApiPromise, WsProvider } = require('@polkadot/api')
|
|
|
-
|
|
|
const { IdentitiesApi } = require('@joystream/storage-runtime-api/identities')
|
|
|
const { BalancesApi } = require('@joystream/storage-runtime-api/balances')
|
|
|
const { WorkersApi } = require('@joystream/storage-runtime-api/workers')
|
|
@@ -30,7 +29,13 @@ const { AssetsApi } = require('@joystream/storage-runtime-api/assets')
|
|
|
const { DiscoveryApi } = require('@joystream/storage-runtime-api/discovery')
|
|
|
const { SystemApi } = require('@joystream/storage-runtime-api/system')
|
|
|
const AsyncLock = require('async-lock')
|
|
|
-const { newExternallyControlledPromise } = require('@joystream/storage-utils/externalPromise')
|
|
|
+const Promise = require('bluebird')
|
|
|
+
|
|
|
+Promise.config({
|
|
|
+ cancellation: true,
|
|
|
+})
|
|
|
+
|
|
|
+const TX_TIMEOUT = 20 * 1000
|
|
|
|
|
|
/*
|
|
|
* Initialize runtime (substrate) API and keyring.
|
|
@@ -84,225 +89,257 @@ class RuntimeApi {
|
|
|
return this.asyncLock.acquire(`${accountId}`, func)
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Wait for an event. Filters out any events that don't match the module and
|
|
|
- * event name.
|
|
|
- *
|
|
|
- * The result of the Promise is an array containing first the full event
|
|
|
- * name, and then the event fields as an object.
|
|
|
- */
|
|
|
- async waitForEvent(module, name) {
|
|
|
- return this.waitForEvents([[module, name]])
|
|
|
- }
|
|
|
-
|
|
|
- static matchingEvents(subscribed, events) {
|
|
|
- debug(`Number of events: ${events.length} subscribed to ${subscribed}`)
|
|
|
-
|
|
|
+ static matchingEvents(subscribed = [], events = []) {
|
|
|
const filtered = events.filter((record) => {
|
|
|
- const { event, phase } = record
|
|
|
-
|
|
|
- // Show what we are busy with
|
|
|
- debug(`\t${event.section}:${event.method}:: (phase=${phase.toString()})`)
|
|
|
- debug(`\t\t${event.meta.documentation.toString()}`)
|
|
|
+ const { event } = record
|
|
|
|
|
|
// Skip events we're not interested in.
|
|
|
const matching = subscribed.filter((value) => {
|
|
|
- return event.section === value[0] && event.method === value[1]
|
|
|
+ if (value[0] === '*' && value[1] === '*') {
|
|
|
+ return true
|
|
|
+ } else if (value[0] === '*') {
|
|
|
+ return event.method === value[1]
|
|
|
+ } else if (value[1] === '*') {
|
|
|
+ return event.section === value[0]
|
|
|
+ } else {
|
|
|
+ return event.section === value[0] && event.method === value[1]
|
|
|
+ }
|
|
|
})
|
|
|
return matching.length > 0
|
|
|
})
|
|
|
- debug(`Filtered: ${filtered.length}`)
|
|
|
|
|
|
- const mapped = filtered.map((record) => {
|
|
|
+ return filtered.map((record) => {
|
|
|
const { event } = record
|
|
|
const types = event.typeDef
|
|
|
+ const payload = new Map()
|
|
|
|
|
|
- // Loop through each of the parameters, displaying the type and data
|
|
|
- const payload = {}
|
|
|
- event.data.forEach((data, index) => {
|
|
|
- debug(`\t\t\t${types[index].type}: ${data.toString()}`)
|
|
|
- payload[types[index].type] = data
|
|
|
- })
|
|
|
-
|
|
|
+ // this check may be un-necessary but doing it just incase
|
|
|
+ if (event.data) {
|
|
|
+ event.data.forEach((data, index) => {
|
|
|
+ const type = types[index].type
|
|
|
+ payload.set(index, { type, data })
|
|
|
+ })
|
|
|
+ }
|
|
|
const fullName = `${event.section}.${event.method}`
|
|
|
+ debug(`matched event: ${fullName} =>`, event.data && event.data.join(', '))
|
|
|
return [fullName, payload]
|
|
|
})
|
|
|
- debug('Mapped', mapped)
|
|
|
+ }
|
|
|
|
|
|
- return mapped
|
|
|
+ // Get cached nonce and use unless system nonce is greater, to avoid stale nonce if
|
|
|
+ // there was a long gap in time between calls to signAndSend during which an external app
|
|
|
+ // submitted a transaction.
|
|
|
+ async selectBestNonce(accountId) {
|
|
|
+ const cachedNonce = this.nonces[accountId]
|
|
|
+ // In future use this rpc method to take the pending tx pool into account when fetching the nonce
|
|
|
+ // const nonce = await this.api.rpc.system.accountNextIndex(accountId)
|
|
|
+ const systemNonce = await this.api.query.system.accountNonce(accountId)
|
|
|
+
|
|
|
+ if (cachedNonce) {
|
|
|
+ // we have it cached.. but lets do a look ahead to see if we need to adjust
|
|
|
+ if (systemNonce.gt(cachedNonce)) {
|
|
|
+ return systemNonce
|
|
|
+ } else {
|
|
|
+ return cachedNonce
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return systemNonce
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Same as waitForEvent, but filter on multiple events. The parameter is an
|
|
|
- * array of arrays containing module and name. Calling waitForEvent is
|
|
|
- * identical to calling this with [[module, name]].
|
|
|
- *
|
|
|
- * Returns the first matched event *only*.
|
|
|
- */
|
|
|
- async waitForEvents(subscribed) {
|
|
|
- return new Promise((resolve) => {
|
|
|
- this.api.query.system.events((events) => {
|
|
|
- const matches = RuntimeApi.matchingEvents(subscribed, events)
|
|
|
- if (matches && matches.length) {
|
|
|
- resolve(matches)
|
|
|
- }
|
|
|
- })
|
|
|
- })
|
|
|
+ incrementAndSaveNonce(accountId, nonce) {
|
|
|
+ this.nonces[accountId] = nonce.addn(1)
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * Nonce-aware signAndSend(). Also allows you to use the accountId instead
|
|
|
- * of the key, making calls a little simpler. Will lock to prevent concurrent
|
|
|
- * calls so correct nonce is used.
|
|
|
+ * signAndSend() with nonce tracking, to enable concurrent sending of transacctions
|
|
|
+ * so that they can be included in the same block. Allows you to use the accountId instead
|
|
|
+ * of the key, without requiring an external Signer configured on the underlying ApiPromie
|
|
|
*
|
|
|
* If the subscribed events are given, and a callback as well, then the
|
|
|
* callback is invoked with matching events.
|
|
|
*/
|
|
|
- async signAndSend(accountId, tx, attempts, subscribed, callback) {
|
|
|
+ async signAndSend(accountId, tx, subscribed) {
|
|
|
+ // Accept both a string or AccountId as argument
|
|
|
accountId = this.identities.keyring.encodeAddress(accountId)
|
|
|
|
|
|
- // Key must be unlocked
|
|
|
+ // Throws if keyPair is not found
|
|
|
const fromKey = this.identities.keyring.getPair(accountId)
|
|
|
+
|
|
|
+ // Key must be unlocked to use
|
|
|
if (fromKey.isLocked) {
|
|
|
throw new Error('Must unlock key before using it to sign!')
|
|
|
}
|
|
|
|
|
|
- const finalizedPromise = newExternallyControlledPromise()
|
|
|
+ // Functions to be called when the submitted transaction is finalized. They are initialized
|
|
|
+ // after the transaction is submitted to the resolve and reject function of the final promise
|
|
|
+ // returned by signAndSend
|
|
|
+ // on extrinsic success
|
|
|
+ let onFinalizedSuccess
|
|
|
+ // on extrinsic failure
|
|
|
+ let onFinalizedFailed
|
|
|
|
|
|
- await this.executeWithAccountLock(accountId, async () => {
|
|
|
- // Try to get the next nonce to use
|
|
|
- let nonce = this.nonces[accountId]
|
|
|
+ // Function assigned when transaction is successfully submitted. Invoking it ubsubscribes from
|
|
|
+ // listening to tx status updates.
|
|
|
+ let unsubscribe
|
|
|
|
|
|
- let incrementNonce = () => {
|
|
|
- // only increment once
|
|
|
- incrementNonce = () => {
|
|
|
- /* turn it into a no-op */
|
|
|
- }
|
|
|
- nonce = nonce.addn(1)
|
|
|
- this.nonces[accountId] = nonce
|
|
|
- }
|
|
|
+ let lastTxUpdateResult
|
|
|
|
|
|
- // If the nonce isn't available, get it from chain.
|
|
|
- if (!nonce) {
|
|
|
- // current nonce
|
|
|
- // TODO: possible race condition here found by the linter
|
|
|
- // eslint-disable-next-line require-atomic-updates
|
|
|
- nonce = await this.api.query.system.accountNonce(accountId)
|
|
|
- debug(`Got nonce for ${accountId} from chain: ${nonce}`)
|
|
|
+ const handleTxUpdates = (result) => {
|
|
|
+ const { events = [], status } = result
|
|
|
+
|
|
|
+ if (!result || !status) {
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- debug('Signing and sending tx')
|
|
|
- // send(statusUpdates) returns a function for unsubscribing from status updates
|
|
|
- const unsubscribe = tx
|
|
|
- .sign(fromKey, { nonce })
|
|
|
- .send(({ events = [], status }) => {
|
|
|
- debug(`TX status: ${status.type}`)
|
|
|
-
|
|
|
- // Whatever events we get, process them if there's someone interested.
|
|
|
- // It is critical that this event handling doesn't prevent
|
|
|
- try {
|
|
|
- if (subscribed && callback) {
|
|
|
- const matched = RuntimeApi.matchingEvents(subscribed, events)
|
|
|
- debug('Matching events:', matched)
|
|
|
- if (matched.length) {
|
|
|
- callback(matched)
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (err) {
|
|
|
- debug(`Error handling events ${err.stack}`)
|
|
|
+ lastTxUpdateResult = result
|
|
|
+
|
|
|
+ if (result.isError) {
|
|
|
+ unsubscribe()
|
|
|
+ debug('Tx Error', status.type)
|
|
|
+ onFinalizedFailed &&
|
|
|
+ onFinalizedFailed({ err: status.type, result, tx: status.isUsurped ? status.asUsurped : undefined })
|
|
|
+ } else if (result.isFinalized) {
|
|
|
+ unsubscribe()
|
|
|
+ const mappedEvents = RuntimeApi.matchingEvents(subscribed, events)
|
|
|
+ const failed = result.findRecord('system', 'ExtrinsicFailed')
|
|
|
+ const success = result.findRecord('system', 'ExtrinsicSuccess')
|
|
|
+ const sudid = result.findRecord('sudo', 'Sudid')
|
|
|
+ const sudoAsDone = result.findRecord('sudo', 'SudoAsDone')
|
|
|
+
|
|
|
+ if (failed) {
|
|
|
+ const {
|
|
|
+ event: { data },
|
|
|
+ } = failed
|
|
|
+ const dispatchError = data[0]
|
|
|
+ onFinalizedFailed({
|
|
|
+ err: 'ExtrinsicFailed',
|
|
|
+ mappedEvents,
|
|
|
+ result,
|
|
|
+ block: status.asFinalized,
|
|
|
+ dispatchError, // we get module number/id and index into the Error enum
|
|
|
+ })
|
|
|
+ } else if (success) {
|
|
|
+ // Note: For root origin calls, the dispatch error is logged to the joystream-node
|
|
|
+ // console, we cannot get it in the events
|
|
|
+ if (sudid) {
|
|
|
+ const dispatchSuccess = sudid.event.data[0]
|
|
|
+ if (dispatchSuccess.isTrue) {
|
|
|
+ onFinalizedSuccess({ mappedEvents, result, block: status.asFinalized })
|
|
|
+ } else {
|
|
|
+ onFinalizedFailed({ err: 'SudoFailed', mappedEvents, result, block: status.asFinalized })
|
|
|
}
|
|
|
-
|
|
|
- // We want to release lock as early as possible, sometimes Ready status
|
|
|
- // doesn't occur, so we do it on Broadcast instead
|
|
|
- if (status.isReady) {
|
|
|
- debug('TX Ready.')
|
|
|
- incrementNonce()
|
|
|
- resolve(unsubscribe) // releases lock
|
|
|
- } else if (status.isBroadcast) {
|
|
|
- debug('TX Broadcast.')
|
|
|
- incrementNonce()
|
|
|
- resolve(unsubscribe) // releases lock
|
|
|
- } else if (status.isFinalized) {
|
|
|
- debug('TX Finalized.')
|
|
|
- finalizedPromise.resolve(status)
|
|
|
- } else if (status.isFuture) {
|
|
|
- // comes before ready.
|
|
|
- // does that mean it will remain in mempool or in api internal queue?
|
|
|
- // nonce was set in the future. Treating it as an error for now.
|
|
|
- debug('TX Future!')
|
|
|
- // nonce is likely out of sync, delete it so we reload it from chain on next attempt
|
|
|
- delete this.nonces[accountId]
|
|
|
- const err = new Error('transaction nonce set in future')
|
|
|
- finalizedPromise.reject(err)
|
|
|
- reject(err)
|
|
|
+ } else if (sudoAsDone) {
|
|
|
+ const dispatchSuccess = sudoAsDone.event.data[0]
|
|
|
+ if (dispatchSuccess.isTrue) {
|
|
|
+ onFinalizedSuccess({ mappedEvents, result, block: status.asFinalized })
|
|
|
+ } else {
|
|
|
+ onFinalizedFailed({ err: 'SudoAsFailed', mappedEvents, result, block: status.asFinalized })
|
|
|
}
|
|
|
+ } else {
|
|
|
+ onFinalizedSuccess({ mappedEvents, result, block: status.asFinalized })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- /* why don't we see these status updates on local devchain (single node)
|
|
|
- isUsurped
|
|
|
- isBroadcast
|
|
|
- isDropped
|
|
|
- isInvalid
|
|
|
- */
|
|
|
- })
|
|
|
- .catch((err) => {
|
|
|
- // 1014 error: Most likely you are sending transaction with the same nonce,
|
|
|
- // so it assumes you want to replace existing one, but the priority is too low to replace it (priority = fee = len(encoded_transaction) currently)
|
|
|
- // Remember this can also happen if in the past we sent a tx with a future nonce, and the current nonce
|
|
|
- // now matches it.
|
|
|
- if (err) {
|
|
|
- const errstr = err.toString()
|
|
|
- // not the best way to check error code.
|
|
|
- // https://github.com/polkadot-js/api/blob/master/packages/rpc-provider/src/coder/index.ts#L52
|
|
|
- if (
|
|
|
- errstr.indexOf('Error: 1014:') < 0 && // low priority
|
|
|
- errstr.indexOf('Error: 1010:') < 0
|
|
|
- ) {
|
|
|
- // bad transaction
|
|
|
- // Error but not nonce related. (bad arguments maybe)
|
|
|
- debug('TX error', err)
|
|
|
- } else {
|
|
|
- // nonce is likely out of sync, delete it so we reload it from chain on next attempt
|
|
|
- delete this.nonces[accountId]
|
|
|
- }
|
|
|
- }
|
|
|
+ // synchronize access to nonce
|
|
|
+ await this.executeWithAccountLock(accountId, async () => {
|
|
|
+ const nonce = await this.selectBestNonce(accountId)
|
|
|
|
|
|
- finalizedPromise.reject(err)
|
|
|
- // releases lock
|
|
|
- reject(err)
|
|
|
- })
|
|
|
- })
|
|
|
+ try {
|
|
|
+ unsubscribe = await tx.sign(fromKey, { nonce }).send(handleTxUpdates)
|
|
|
+ debug('TransactionSubmitted')
|
|
|
+ // transaction submitted successfully, increment and save nonce.
|
|
|
+ this.incrementAndSaveNonce(accountId, nonce)
|
|
|
+ } catch (err) {
|
|
|
+ const errstr = err.toString()
|
|
|
+ debug('TransactionRejected:', errstr)
|
|
|
+ // This happens when nonce is already used in finalized transactions, ie. the selected nonce
|
|
|
+ // was less than current account nonce. A few scenarios where this happens (other than incorrect code)
|
|
|
+ // 1. When a past future tx got finalized because we submitted some transactions
|
|
|
+ // using up the nonces upto that point.
|
|
|
+ // 2. Can happen while storage-node is talkig to a joystream-node that is still not fully
|
|
|
+ // synced.
|
|
|
+ // 3. Storage role account holder sent a transaction just ahead of us via another app.
|
|
|
+ if (errstr.indexOf('ExtrinsicStatus:: 1010: Invalid Transaction: Stale') !== -1) {
|
|
|
+ // In case 1 or 3 a quick recovery could work by just incrementing, but since we
|
|
|
+ // cannot detect which case we are in just reset and force re-reading nonce. Even
|
|
|
+ // that may not be sufficient expect after a few more failures..
|
|
|
+ delete this.nonces[accountId]
|
|
|
+ }
|
|
|
+
|
|
|
+ // Technically it means a transaction in the mempool with same
|
|
|
+ // nonce and same fees being paid so we cannot replace it, either we didn't correctly
|
|
|
+ // increment the nonce or someone external to this application sent a transaction
|
|
|
+ // with same nonce ahead of us.
|
|
|
+ if (errstr.indexOf('ExtrinsicStatus:: 1014: Priority is too low') !== -1) {
|
|
|
+ delete this.nonces[accountId]
|
|
|
+ }
|
|
|
+
|
|
|
+ throw err
|
|
|
+ }
|
|
|
})
|
|
|
|
|
|
- // when does it make sense to manyally unsubscribe?
|
|
|
- // at this point unsubscribe.then and unsubscribe.catch have been deleted
|
|
|
- // unsubscribe() // don't unsubscribe if we want to wait for additional status
|
|
|
- // updates to know when the tx has been finalized
|
|
|
- return finalizedPromise.promise
|
|
|
+ // We cannot get tx updates for a future tx so return now to avoid blocking caller
|
|
|
+ if (lastTxUpdateResult.status.isFuture) {
|
|
|
+ debug('Warning: Submitted extrinsic with future nonce')
|
|
|
+ return {}
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return a promise that will resolve when the transaction finalizes.
|
|
|
+ // On timeout it will be rejected. Timeout is a workaround for dealing with the
|
|
|
+ // fact that if rpc connection is lost to node we have no way of detecting it or recovering.
|
|
|
+ // Timeout can also occur if a transaction that was part of batch of transactions submitted
|
|
|
+ // gets usurped.
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ onFinalizedSuccess = resolve
|
|
|
+ onFinalizedFailed = reject
|
|
|
+ }).timeout(TX_TIMEOUT)
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Sign and send a transaction expect event from
|
|
|
* module and return eventProperty from the event.
|
|
|
*/
|
|
|
- async signAndSendThenGetEventResult(senderAccountId, tx, { eventModule, eventName, eventProperty }) {
|
|
|
- // event from a module,
|
|
|
- const subscribed = [[eventModule, eventName]]
|
|
|
- // TODO: rewrite this method to async-await style
|
|
|
- // eslint-disable-next-line no-async-promise-executor
|
|
|
- return new Promise(async (resolve, reject) => {
|
|
|
- try {
|
|
|
- await this.signAndSend(senderAccountId, tx, 1, subscribed, (events) => {
|
|
|
- events.forEach((event) => {
|
|
|
- // fix - we may not necessarily want the first event
|
|
|
- // if there are multiple events emitted,
|
|
|
- resolve(event[1][eventProperty])
|
|
|
- })
|
|
|
- })
|
|
|
- } catch (err) {
|
|
|
- reject(err)
|
|
|
- }
|
|
|
- })
|
|
|
+ async signAndSendThenGetEventResult(senderAccountId, tx, { module, event, index, type }) {
|
|
|
+ if (!module || !event || index === undefined || !type) {
|
|
|
+ throw new Error('MissingSubscribeEventDetails')
|
|
|
+ }
|
|
|
+
|
|
|
+ const subscribed = [[module, event]]
|
|
|
+
|
|
|
+ const { mappedEvents } = await this.signAndSend(senderAccountId, tx, subscribed)
|
|
|
+
|
|
|
+ if (!mappedEvents) {
|
|
|
+ // The tx was a future so it was not possible and will not be possible to get events
|
|
|
+ throw new Error('NoEventsWereCaptured')
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!mappedEvents.length) {
|
|
|
+ // our expected event was not emitted
|
|
|
+ throw new Error('ExpectedEventNotFound')
|
|
|
+ }
|
|
|
+
|
|
|
+ // fix - we may not necessarily want the first event
|
|
|
+ // when there are multiple instances of the same event
|
|
|
+ const firstEvent = mappedEvents[0]
|
|
|
+
|
|
|
+ if (firstEvent[0] !== `${module}.${event}`) {
|
|
|
+ throw new Error('WrongEventCaptured')
|
|
|
+ }
|
|
|
+
|
|
|
+ const payload = firstEvent[1]
|
|
|
+ if (!payload.has(index)) {
|
|
|
+ throw new Error('DataIndexOutOfRange')
|
|
|
+ }
|
|
|
+
|
|
|
+ const value = payload.get(index)
|
|
|
+ if (value.type !== type) {
|
|
|
+ throw new Error('DataTypeNotExpectedType')
|
|
|
+ }
|
|
|
+
|
|
|
+ return value.data
|
|
|
}
|
|
|
}
|
|
|
|