123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- /*
- * This file is part of the storage node for the Joystream project.
- * Copyright (C) 2019 Joystream Contributors
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- */
- 'use strict';
- const debug = require('debug')('joystream:runtime:base');
- const { registerJoystreamTypes } = require('@joystream/types');
- const { ApiPromise, WsProvider } = require('@polkadot/api');
- const { IdentitiesApi } = require('@joystream/runtime-api/identities');
- const { BalancesApi } = require('@joystream/runtime-api/balances');
- const { RolesApi } = require('@joystream/runtime-api/roles');
- const { AssetsApi } = require('@joystream/runtime-api/assets');
- const { DiscoveryApi } = require('@joystream/runtime-api/discovery');
- const AsyncLock = require('async-lock');
- /*
- * Initialize runtime (substrate) API and keyring.
- */
- class RuntimeApi
- {
- static async create(options)
- {
- const runtime_api = new RuntimeApi();
- await runtime_api.init(options || {});
- return runtime_api;
- }
- async init(options)
- {
- debug('Init');
- options = options || {};
- // Register joystream types
- registerJoystreamTypes();
- const provider = new WsProvider(options.provider_url || 'ws://localhost:9944');
- // Create the API instrance
- this.api = await ApiPromise.create({ provider });
- this.asyncLock = new AsyncLock();
- // Keep track locally of account nonces.
- this.nonces = {};
- // Ok, create individual APIs
- this.identities = await IdentitiesApi.create(this, {
- account_file: options.account_file,
- passphrase: options.passphrase,
- canPromptForPassphrase: options.canPromptForPassphrase
- });
- this.balances = await BalancesApi.create(this);
- this.roles = await RolesApi.create(this);
- this.assets = await AssetsApi.create(this);
- this.discovery = await DiscoveryApi.create(this);
- }
- disconnect()
- {
- this.api.disconnect();
- }
- executeWithAccountLock(account_id, func) {
- return this.asyncLock.acquire(`${account_id}`, func);
- }
- /*
- * Wait for an event. Filters out any events that don't match the module and
- * event name.
- *
- * The result of the Promise is an array containing first the full event
- * name, and then the event fields as an object.
- */
- async waitForEvent(module, name)
- {
- return this.waitForEvents([[module, name]]);
- }
- _matchingEvents(subscribed, events)
- {
- debug(`Number of events: ${events.length}; subscribed to ${subscribed}`);
- const filtered = events.filter((record) => {
- const { event, phase } = record;
- // Show what we are busy with
- debug(`\t${event.section}:${event.method}:: (phase=${phase.toString()})`);
- debug(`\t\t${event.meta.documentation.toString()}`);
- // Skip events we're not interested in.
- const matching = subscribed.filter((value) => {
- return event.section == value[0] && event.method == value[1];
- });
- return matching.length > 0;
- });
- debug(`Filtered: ${filtered.length}`);
- const mapped = filtered.map((record) => {
- const { event } = record;
- const types = event.typeDef;
- // Loop through each of the parameters, displaying the type and data
- const payload = {};
- event.data.forEach((data, index) => {
- debug(`\t\t\t${types[index].type}: ${data.toString()}`);
- payload[types[index].type] = data;
- });
- const full_name = `${event.section}.${event.method}`;
- return [full_name, payload];
- });
- debug('Mapped', mapped);
- return mapped;
- }
- /*
- * Same as waitForEvent, but filter on multiple events. The parameter is an
- * array of arrays containing module and name. Calling waitForEvent is
- * identical to calling this with [[module, name]].
- *
- * Returns the first matched event *only*.
- */
- async waitForEvents(subscribed)
- {
- return new Promise((resolve, reject) => {
- this.api.query.system.events((events) => {
- const matches = this._matchingEvents(subscribed, events);
- if (matches && matches.length) {
- resolve(matches);
- }
- });
- });
- }
- /*
- * Nonce-aware signAndSend(). Also allows you to use the accountId instead
- * of the key, making calls a little simpler. Will lock to prevent concurrent
- * calls so correct nonce is used.
- *
- * If the subscribed events are given, and a callback as well, then the
- * callback is invoked with matching events.
- */
- async signAndSend(accountId, tx, attempts, subscribed, callback)
- {
- // Prepare key
- const from_key = this.identities.keyring.getPair(accountId);
- if (from_key.isLocked) {
- throw new Error('Must unlock key before using it to sign!');
- }
- const finalizedPromise = newExternallyControlledPromise();
- let unsubscribe = await this.executeWithAccountLock(accountId, async () => {
- // Try to get the next nonce to use
- let nonce = this.nonces[accountId];
- let incrementNonce = () => {
- // only increment once
- incrementNonce = () => {}; // turn it into a no-op
- nonce = nonce.addn(1);
- this.nonces[accountId] = nonce;
- }
- // If the nonce isn't available, get it from chain.
- if (!nonce) {
- // current nonce
- nonce = await this.api.query.system.accountNonce(accountId);
- debug(`Got nonce for ${accountId} from chain: ${nonce}`);
- }
- return new Promise((resolve, reject) => {
- debug('Signing and sending tx');
- // send(statusUpdates) returns a function for unsubscribing from status updates
- let unsubscribe = tx.sign(from_key, { nonce })
- .send(({events = [], status}) => {
- debug(`TX status: ${status.type}`);
- // Whatever events we get, process them if there's someone interested.
- // It is critical that this event handling doesn't prevent
- try {
- if (subscribed && callback) {
- const matched = this._matchingEvents(subscribed, events);
- debug('Matching events:', matched);
- if (matched.length) {
- callback(matched);
- }
- }
- } catch(err) {
- debug(`Error handling events ${err.stack}`)
- }
- // We want to release lock as early as possible, sometimes Ready status
- // doesn't occur, so we do it on Broadcast instead
- if (status.isReady) {
- debug('TX Ready.');
- incrementNonce();
- resolve(unsubscribe); //releases lock
- } else if (status.isBroadcast) {
- debug('TX Broadcast.');
- incrementNonce();
- resolve(unsubscribe); //releases lock
- } else if (status.isFinalized) {
- debug('TX Finalized.');
- finalizedPromise.resolve(status)
- } else if (status.isFuture) {
- // comes before ready.
- // does that mean it will remain in mempool or in api internal queue?
- // nonce was set in the future. Treating it as an error for now.
- debug('TX Future!')
- // nonce is likely out of sync, delete it so we reload it from chain on next attempt
- delete this.nonces[accountId];
- const err = new Error('transaction nonce set in future');
- finalizedPromise.reject(err);
- reject(err);
- }
- /* why don't we see these status updates on local devchain (single node)
- isUsurped
- isBroadcast
- isDropped
- isInvalid
- */
- })
- .catch((err) => {
- // 1014 error: Most likely you are sending transaction with the same nonce,
- // so it assumes you want to replace existing one, but the priority is too low to replace it (priority = fee = len(encoded_transaction) currently)
- // Remember this can also happen if in the past we sent a tx with a future nonce, and the current nonce
- // now matches it.
- if (err) {
- const errstr = err.toString();
- // not the best way to check error code.
- // https://github.com/polkadot-js/api/blob/master/packages/rpc-provider/src/coder/index.ts#L52
- if (errstr.indexOf('Error: 1014:') < 0 && // low priority
- errstr.indexOf('Error: 1010:') < 0) // bad transaction
- {
- // Error but not nonce related. (bad arguments maybe)
- debug('TX error', err);
- } else {
- // nonce is likely out of sync, delete it so we reload it from chain on next attempt
- delete this.nonces[accountId];
- }
- }
- finalizedPromise.reject(err);
- // releases lock
- reject(err);
- });
- });
- })
- // when does it make sense to manyally unsubscribe?
- // at this point unsubscribe.then and unsubscribe.catch have been deleted
- // unsubscribe(); // don't unsubscribe if we want to wait for additional status
- // updates to know when the tx has been finalized
- return finalizedPromise.promise;
- }
- }
- module.exports = {
- RuntimeApi: RuntimeApi,
- }
- function newExternallyControlledPromise () {
- // externally controller promise
- let resolve, reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return ({resolve, reject, promise});
- }
|