Browse Source

Extract WS logic from socket.ts and start the iteration of blocks

Ricardo Maltez 4 years ago
parent
commit
71df2010aa
5 changed files with 104 additions and 36 deletions
  1. 3 0
      server/db/models/era.ts
  2. 22 3
      server/index.ts
  3. 42 5
      server/joystream/index.ts
  4. 24 0
      server/joystream/ws.ts
  5. 13 28
      server/socket.ts

+ 3 - 0
server/db/models/era.ts

@@ -7,6 +7,9 @@ const Era = db.define('era', {
     primaryKey: true,
   },
   waiting: DataTypes.INTEGER,
+  actives: DataTypes.INTEGER,
+  maxSlots: DataTypes.INTEGER,
+  timestamp: DataTypes.DATE,
 })
 
 export default Era

+ 22 - 3
server/index.ts

@@ -7,14 +7,33 @@ import db from './db'
 const pg = require('pg')
 delete pg.native
 
+const {connectUpstream} = require('./joystream/ws')
+const { addBlockRange } = require('./joystream')
+
+const {setupSocket} = require('./socket')
 const PORT: number = process.env.PORT ? +process.env.PORT : 3500
 
 const app = express()
 const server = app.listen(PORT, () => {
   console.log(`[Express] Listening on port ${PORT}`)
-})
-const io: any = socketio(server)
-require('./socket')(io)
+});
+
+(async () =>{
+
+  const api = await connectUpstream()
+
+  const io: any = socketio(server)
+  setupSocket(io, api)
+  const args = process.argv.slice(2);
+  if (args.length === 2){
+    const startBlock = args[0];
+    const endBlock = args[1];
+    console.log(`[Joystream] syncing blocks from ${startBlock} to ${endBlock}`);
+    addBlockRange(api, startBlock, endBlock);
+  }
+
+})()
+
 
 //const cors = require("cors");
 //const passport = require('passport')

+ 42 - 5
server/joystream/index.ts

@@ -43,6 +43,10 @@ import {
   Status,
 } from '../types'
 
+import {AccountId, Moment, ActiveEraInfo} from "@polkadot/types/interfaces";
+import Option from "@polkadot/types/codec/Option";
+import {Vec} from "@polkadot/types";
+
 // queuing
 let lastUpdate = 0
 const queue: any[] = []
@@ -95,16 +99,48 @@ const addBlock = async (
 
   const currentEra = Number(await api.query.staking.currentEra())
   const era = await save('era', { id: currentEra })
-  era.addBlock(block)
+  await era.addBlock(block)
 
   const handle = member ? member.handle : author
   const queued = `(queued: ${queue.length})`
   console.log(`[Joystream] block ${block.id} ${handle} ${queued}`)
 
-  processEvents(api, id)
+  await processEvents(api, id)
   return updateEra(api, io, status, currentEra)
 }
 
+const addBlockRange = async (api: Api, startBlock: number, endBlock: number) => {
+
+  const previousHash = await api.rpc.chain.getBlockHash(startBlock - 1);
+  let previousEra = await api.query.staking.activeEra.at(previousHash) as Option<ActiveEraInfo>;
+
+  for (let i = startBlock; i < endBlock; i++) {
+    const hash = await api.rpc.chain.getBlockHash(i);
+    const blockEra = await api.query.staking.activeEra.at(hash) as Option<ActiveEraInfo>;
+    if (blockEra.unwrap().index.toNumber() === previousEra.unwrap().index.toNumber()){
+      continue;
+    }
+
+    let totalValidators = await api.query.staking.snapshotValidators.at(hash) as Option<Vec<AccountId>>;
+    if (totalValidators.isEmpty) {
+      continue;
+    }
+    console.log(`found validators: ${totalValidators.unwrap().length}`)
+
+    const totalNrValidators = totalValidators.unwrap().length;
+    const maxSlots = Number((await api.query.staking.validatorCount.at(hash)).toString());
+    const actives = Math.min(maxSlots, totalNrValidators);
+    const waiting = totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0;
+
+    const timestamp = (await api.query.timestamp.now.at(hash)) as Moment;
+    const date = new Date(timestamp.toNumber());
+    await save('era', { id: blockEra.unwrap().index.toNumber(),  waiting: waiting, actives: actives, maxSlots: maxSlots, timestamp: date})
+
+    previousEra = blockEra;
+  }
+
+}
+
 const getBlockHash = (api: Api, blockId: number) =>
   api.rpc.chain.getBlockHash(blockId)
 
@@ -147,7 +183,8 @@ const updateEra = async (api: Api, io: any, status: any, era: number) => {
   console.debug(`fetching stakes`)
 
   if (!stashes) return
-  stashes.forEach(async (validator: string) => {
+
+  for (let validator of stashes){
     try {
       const prefs = await api.query.staking.erasValidatorPrefs(era, validator)
       const commission = Number(prefs.commission) / 10000000
@@ -157,7 +194,7 @@ const updateEra = async (api: Api, io: any, status: any, era: number) => {
     } catch (e) {
       console.warn(`Failed to fetch stakes for ${validator} in era ${era}`, e)
     }
-  })
+  }
 
   return {
     members: (await api.query.members.nextMemberId()) - 1,
@@ -485,4 +522,4 @@ const fetchGithubDir = async (url: string) => {
   )
 }
 
-module.exports = { addBlock }
+module.exports = { addBlock, addBlockRange }

+ 24 - 0
server/joystream/ws.ts

@@ -0,0 +1,24 @@
+// TODO allow alternative backends
+
+import { ApiPromise, WsProvider } from '@polkadot/api'
+import { types } from '@joystream/types'
+
+const wsLocation =
+    'ws://localhost:9944'
+    // 'wss://rome-rpc-endpoint.joystream.org:9944'
+
+const connectUpstream = async (): Promise<ApiPromise> => {
+    try {
+        console.debug(`[Joystream] Connecting to ${wsLocation}`)
+        const provider = new WsProvider(wsLocation)
+        const api = await ApiPromise.create({ provider, types })
+        await api.isReady
+        console.debug(`[Joystream] Connected.`)
+        return api
+    } catch (e) {
+        console.error(`[Joystream] upstream connection failed`, e)
+        throw new Error()
+    }
+}
+
+module.exports = { connectUpstream }

+ 13 - 28
server/socket.ts

@@ -6,21 +6,16 @@ const { ApiPromise, WsProvider } = require('@polkadot/api')
 const { addBlock } = require('./joystream')
 const chalk = require('chalk')
 
-// TODO allow alternative backends
-const wsLocation = 'ws://localhost:9944'
-// 'wss://rome-rpc-endpoint.joystream.org:9944'
-
-module.exports = (io: any) => {
-  const handleUpstream = async (api: Api) => {
-    let status: Status = {}
-    let lastHeader: Header
-    api.derive.chain.subscribeNewHeads(async (header: Header) => {
-      if (lastHeader && lastHeader.number === header.number)
-        return console.debug(`skipping seen block`)
-      lastHeader = header
-      status = await addBlock(api, io, header, status)
-    })
-  }
+
+const setupSocket = async (io: any, api: Api) => {
+  let status: Status = {}
+  let lastHeader: Header
+  api.derive.chain.subscribeNewHeads(async (header: Header) => {
+    if (lastHeader && lastHeader.number === header.number)
+      return console.debug(`skipping seen block`)
+    lastHeader = header
+    status = await addBlock(api, io, header, status)
+  })
 
   io.on('connection', async (socket: any) => {
     console.log(chalk.green(`[socket.io] Connection: ${socket.id}`))
@@ -35,17 +30,7 @@ module.exports = (io: any) => {
     })
   })
 
-  const connectUpstream = async () => {
-    try {
-      console.debug(`[Joystream] Connecting to ${wsLocation}`)
-      const provider = new WsProvider(wsLocation)
-      const api = await ApiPromise.create({ provider, types })
-      await api.isReady
-      console.debug(`[Joystream] Connected.`)
-      handleUpstream(api)
-    } catch (e) {
-      //console.log(`[Joystream] upstream connection failed`, e)
-    }
-  }
-  connectUpstream()
 }
+
+
+module.exports = { setupSocket }