Browse Source

backend: process and save block events

Joystream Stats 4 years ago
parent
commit
3f86d1b260
6 changed files with 167 additions and 120 deletions
  1. 12 0
      server/db/models/era.ts
  2. 14 0
      server/db/models/event.ts
  3. 18 1
      server/db/models/index.ts
  4. 1 2
      server/index.ts
  5. 96 117
      server/joystream/index.ts
  6. 26 0
      tsconfig.json

+ 12 - 0
server/db/models/era.ts

@@ -0,0 +1,12 @@
+import db from '../db'
+import { DataTypes } from 'sequelize'
+
+const Era = db.define('era', {
+  id: {
+    type: DataTypes.INTEGER,
+    primaryKey: true,
+  },
+  waiting: DataTypes.INTEGER,
+})
+
+export default Era

+ 14 - 0
server/db/models/event.ts

@@ -0,0 +1,14 @@
+import db from '../db'
+import { DataTypes } from 'sequelize'
+
+const Event = db.define('event', {
+  id: {
+    type: DataTypes.INTEGER,
+    primaryKey: true,
+  },
+  section: DataTypes.STRING,
+  method: DataTypes.STRING,
+  data: DataTypes.STRING,
+})
+
+export default Event

+ 18 - 1
server/db/models/index.ts

@@ -1,13 +1,19 @@
 import Block from './block'
 import Channel from './channel'
 import Council from './council'
+import Era from './era'
+import Event from './event'
 import Proposal from './proposal'
 import Member from './member'
 import Category from './category'
 import Thread from './thread'
 import Post from './post'
 
+Era.hasMany(Block)
+
+Block.belongsTo(Era)
 Block.belongsTo(Member, { as: 'author' })
+Block.hasMany(Event)
 
 Council.hasMany(Member, { as: 'seat' })
 
@@ -24,4 +30,15 @@ Thread.hasMany(Post)
 Post.belongsTo(Thread)
 Post.belongsTo(Member, { as: 'author' })
 
-export { Block, Channel, Council, Member, Proposal, Category, Thread, Post }
+export {
+  Block,
+  Channel,
+  Council,
+  Era,
+  Event,
+  Member,
+  Proposal,
+  Category,
+  Thread,
+  Post,
+}

+ 1 - 2
server/index.ts

@@ -4,14 +4,13 @@ import morgan from 'morgan'
 import socketio from 'socket.io'
 
 import db from './db'
-//const db = require('./db')
 const pg = require('pg')
 delete pg.native
 
 const PORT: number = process.env.PORT ? +process.env.PORT : 3500
 
 const app = express()
-const server = app.listen((PORT: number) => {
+const server = app.listen(() => {
   console.log(`[Express] Listening on port ${PORT}`)
 })
 const io: any = socketio(server)

+ 96 - 117
server/joystream/index.ts

@@ -1,12 +1,14 @@
 import {
   Block,
+  Category,
   Channel,
   Council,
-  Proposal,
-  Category,
+  Era,
+  Event,
+  Member,
   Post,
+  Proposal,
   Thread,
-  Member,
 } from '../db/models'
 const models: { [key: string]: any } = {
   channel: Channel,
@@ -17,6 +19,7 @@ const models: { [key: string]: any } = {
   block: Block,
   council: Council,
   member: Member,
+  era: Era,
 }
 
 import * as get from './lib/getters'
@@ -80,65 +83,111 @@ const addBlock = async (
 ): Promise<Status> => {
   const id = +header.number
   const last = await Block.findByPk(id - 1)
-  const author = header.author?.toString()
-  const member = await fetchMemberByAccount(api, author)
   const timestamp = moment.utc(await api.query.timestamp.now()).valueOf()
   const blocktime = last ? timestamp - last.timestamp : 6000
   const block = await save('block', { id, timestamp, blocktime })
-  if (member && member.id) block.setAuthor(member)
   io.emit('block', block)
 
+  const author = header.author?.toString()
+  const member = await fetchMemberByAccount(api, author)
+  if (member && member.id) block.setAuthor(member)
+
+  const currentEra = Number(await api.query.staking.currentEra())
+  const era = await save('era', { id: currentEra })
+  era.addBlock(block)
+
   const handle = member ? member.handle : author
   const queued = `(queued: ${queue.length})`
   console.log(`[Joystream] block ${block.id} ${handle} ${queued}`)
-  return updateStatus(api, io, status)
-}
 
-const processEvents = (api: Api, blockHash: string) => {
-  const blockEvents = api.query.system.events.at(blockHash)
-  // TODO as Vec<EventRecord>
-  let transfers = blockEvents.filter((event: any) => {
-    return event.section == 'balances' && event.method == 'Transfer'
-  })
-  let validatorRewards = blockEvents.filter((event: any) => {
-    return event.section == 'staking' && event.method == 'Reward'
-  })
+  processEvents(api, id)
+  return updateEra(api, io, status, currentEra)
 }
 
-const updateStatus = async (api: Api, io: any, status: any) => {
-  const era = Number(await api.query.staking.currentEra())
-  if (era > status.era) {
-    fetchStakes(api, status.era, status.validators)
-    fetchLastReward(api, status.era - 1)
-  } else if (status.lastReward === 0) fetchLastReward(api, era)
-  fetchEraRewardPoints(api, Number(era))
+const getBlockHash = (api: Api, blockId: number) =>
+  api.rpc.chain.getBlockHash(blockId)
+
+interface BlockEvent {
+  section: string
+  method: string
+  data: string
+}
 
-  //const postCount = await api.query.proposalsDiscussion.postCount()
-  // TODO save proposalComments: Number(postCount)
+const processEvents = async (api: Api, blockId: number) => {
+  const blockHash = await getBlockHash(api, blockId)
+  const blockEvents = await api.query.system.events.at(blockHash)
+  blockEvents.forEach(({ section, method, data }: BlockEvent) => {
+    console.log(`event`, section, method, data)
+    Event.create({ blockId, section, method, data: JSON.stringify(data) })
+  })
+}
 
+const updateEra = async (api: Api, io: any, status: any, era: number) => {
   const now: number = moment().valueOf()
   if (lastUpdate + 60000 > now) return status
   //console.log(`updating status`, lastUpdate)
   lastUpdate = now
-  processNext()
+
+  // session.disabledValidators: Vec<u32>
+  // check online: imOnline.keys
+  // imOnline.authoredBlocks: 2
+  // session.currentIndex: 17,081
+
+  const lastReward = Number(await api.query.staking.erasValidatorReward(era))
+  console.debug(`last reward`, era, lastReward)
+  if (lastReward > 0) {
+  } // TODO save lastReward
+
+  const nominatorEntries = await api.query.staking.nominators.entries()
+  const nominators = nominatorEntries.map((n: any) => String(n[0].toHuman()))
+
+  const rewardPoints = await api.query.staking.erasRewardPoints(era)
+  const validatorEntries = await api.query.session.validators()
+  const validators = validatorEntries.map((v: any) => String(v))
+
+  // TODO staking.bondedEras: Vec<(EraIndex,SessionIndex)>
+  const stashes = await api.derive.staking.stashes()
+  console.debug(`fetching stakes`)
+
+  if (!stashes) return
+  stashes.forEach(async (validator: string) => {
+    try {
+      const prefs = await api.query.staking.erasValidatorPrefs(era, validator)
+      const commission = Number(prefs.commission) / 10000000
+
+      const data = await api.query.staking.erasStakers(era, validator)
+      let { total, own, others } = data.toJSON()
+    } catch (e) {
+      console.warn(`Failed to fetch stakes for ${validator} in era ${era}`, e)
+    }
+  })
+
   return {
-    members: await fetchMembers(api),
-    categories: await fetchCategories(api),
-    threads: await fetchThreads(api),
-    proposals: await fetchProposals(api),
-    channels: await fetchChannels(api),
-    posts: await fetchPosts(api),
+    members: (await api.query.members.nextMemberId()) - 1,
+    categories: await get.currentCategoryId(api),
+    threads: await get.currentThreadId(api),
+    proposals: await get.proposalCount(api),
+    channels: await get.currentChannelId(api),
+    posts: await get.currentPostId(api),
+    proposalPosts: await api.query.proposalsDiscussion.postCount(),
     queued: queue.length,
     era,
   }
 }
 
-const fetchLastReward = async (api: Api, era: number) => {
-  const lastReward = Number(await api.query.staking.erasValidatorReward(era))
-  console.debug(`last reward`, era, lastReward)
-  if (lastReward > 0) {
-  } // TODO save lastReward
-  else fetchLastReward(api, era - 1)
+const validatorStatus = async (api: Api, blockId: number) => {
+  const hash = await getBlockHash(api, blockId)
+  let totalValidators = await api.query.staking.snapshotValidators.at(hash)
+  if (totalValidators.isEmpty) return
+
+  let totalNrValidators = totalValidators.unwrap().length
+  const maxSlots = Number(await api.query.staking.validatorCount.at(hash))
+  const actives = Math.min(maxSlots, totalNrValidators)
+  const waiting =
+    totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0
+  let timestamp = await api.query.timestamp.now.at(hash)
+  const date = moment(timestamp.toNumber()).valueOf()
+  return { blockId, actives, waiting, maxSlots, date }
 }
 
 const fetchTokenomics = async () => {
@@ -148,11 +197,6 @@ const fetchTokenomics = async () => {
   // TODO save 'tokenomics', data
 }
 
-const fetchChannels = async (api: Api) => {
-  const lastId = await get.currentChannelId(api)
-  for (let id = lastId; id > 0; id--) enqueue(() => fetchChannel(api, id))
-  return lastId
-}
 const fetchChannel = async (api: Api, id: number) => {
   const exists = await Channel.findByPk(id)
   if (exists) return exists
@@ -189,13 +233,10 @@ const fetchChannel = async (api: Api, id: number) => {
   const chan = await save('channel', channel)
   const owner = await fetchMember(api, ownerId)
   chan.setOwner(owner)
+  if (id > 1) fetchChannel(api, id - 1)
+  return chan
 }
 
-const fetchCategories = async (api: Api) => {
-  const lastId = await get.currentCategoryId(api)
-  for (let id = lastId; id > 0; id--) enqueue(() => fetchCategory(api, id))
-  return lastId
-}
 const fetchCategory = async (api: Api, id: number) => {
   const exists = await Category.findByPk(id)
   if (exists) return exists
@@ -230,14 +271,10 @@ const fetchCategory = async (api: Api, id: number) => {
   const category = await save('category', cat)
   const mod = await fetchMemberByAccount(api, moderator)
   if (mod) category.setModerator(mod)
+  if (id > 1) fetchCategory(api, id - 1)
   return category
 }
 
-const fetchPosts = async (api: Api) => {
-  const lastId = await get.currentPostId(api)
-  for (let id = lastId; id > 0; id--) enqueue(() => fetchPost(api, id))
-  return lastId
-}
 const fetchPost = async (api: Api, id: number) => {
   const exists = await Post.findByPk(id)
   if (exists) return exists
@@ -258,14 +295,10 @@ const fetchPost = async (api: Api, id: number) => {
   const member = await fetchMemberByAccount(api, author)
   if (member) post.setAuthor(member)
   const mod = await fetchMemberByAccount(api, moderation)
+  if (id > 1) fetchPost(api, id - 1)
   return post
 }
 
-const fetchThreads = async (api: Api) => {
-  const lastId = await get.currentThreadId(api)
-  for (let id = lastId; id > 0; id--) enqueue(() => fetchThread(api, id))
-  return lastId
-}
 const fetchThread = async (api: Api, id: number) => {
   const exists = await Thread.findByPk(id)
   if (exists) return exists
@@ -302,6 +335,7 @@ const fetchThread = async (api: Api, id: number) => {
     //const mod = await fetchMemberByAccount(api, moderation)
     //if (mod) thread.setModeration(mod)
   }
+  if (id > 1) fetchThread(api, id - 1)
   return thread
 }
 
@@ -326,10 +360,6 @@ const fetchCouncil = async (api: Api, block: number) => {
   return save('council', council)
 }
 
-const fetchProposals = async (api: Api) => {
-  const lastId = await get.proposalCount(api)
-  for (let i = lastId; i > 0; i--) enqueue(() => fetchProposal(api, i))
-}
 const fetchProposal = async (api: Api, id: number) => {
   const exists = await Proposal.findByPk(id)
   if (exists) return exists
@@ -340,7 +370,8 @@ const fetchProposal = async (api: Api, id: number) => {
 
   console.debug(`Fetching proposal ${id}`)
   const proposal = await get.proposalDetail(api, id)
-  save('proposal', proposal)
+  if (id > 1) fetchProposal(api, id - 1)
+  return save('proposal', proposal)
   //TODO fetchVotesPerProposal(api, proposal)
 }
 
@@ -390,60 +421,7 @@ const fetchVoteByProposalByVoter = async (
   return hasVoted ? String(vote) : ''
 }
 
-// nominators, validators
-
-const fetchNominators = async (api: Api) => {
-  const nominatorEntries = await api.query.staking.nominators.entries()
-  const nominators = nominatorEntries.map((n: any) => String(n[0].toHuman()))
-  // TODO save nominators
-}
-
-const fetchValidators = async (api: Api) => {
-  // session.disabledValidators: Vec<u32>
-  // TODO check online: imOnline.keys
-  //  imOnline.authoredBlocks: 2
-  // TODO session.currentIndex: 17,081
-  const stashes = await api.derive.staking.stashes()
-  // TODO save stashes
-
-  const validatorEntries = await api.query.session.validators()
-  const validators = await validatorEntries.map((v: any) => String(v))
-  // TODO save validators
-}
-
-const fetchStakes = async (api: Api, era: number, validators: string[]) => {
-  // TODO staking.bondedEras: Vec<(EraIndex,SessionIndex)>
-  console.debug(`fetching stakes`)
-  const stashes: any[] = [] // TODO await Stash.findAll()
-  if (!stashes) return
-  stashes.forEach(async (validator: string) => {
-    try {
-      const prefs = await api.query.staking.erasValidatorPrefs(era, validator)
-      const commission = Number(prefs.commission) / 10000000
-
-      const data = await api.query.staking.erasStakers(era, validator)
-      let { total, own, others } = data.toJSON()
-      //let { stakes = {} } = [] // TODO fetchStakes()
-      // { total, own, others, commission }
-      // TODO save stakes
-    } catch (e) {
-      console.warn(`Failed to fetch stakes for ${validator} in era ${era}`, e)
-    }
-  })
-}
-
-const fetchEraRewardPoints = async (api: Api, era: number) => {
-  const data = await api.query.staking.erasRewardPoints(era)
-  // TODO save rewardPoints
-}
-
 // accounts
-const fetchMembers = async (api: Api) => {
-  const lastId = await api.query.members.nextMemberId()
-  for (let id = lastId - 1; id > 0; id--) enqueue(() => fetchMember(api, id))
-  return lastId
-}
-
 const fetchMemberByAccount = async (
   api: Api,
   account: string
@@ -467,6 +445,7 @@ const fetchMember = async (api: Api, id: number): Promise<MemberType> => {
   const account = String(membership.root_account)
   const about = String(membership.about)
   const createdAt = +membership.registered_at_block
+  if (id > 1) fetchMember(api, id - 1)
   return save('member', { id, handle, createdAt, about })
 }
 

+ 26 - 0
tsconfig.json

@@ -0,0 +1,26 @@
+{
+  "compilerOptions": {
+    "target": "es6",
+    "lib": [
+      "dom",
+      "dom.iterable",
+      "esnext"
+    ],
+    "allowJs": true,
+    "skipLibCheck": true,
+    "esModuleInterop": true,
+    "allowSyntheticDefaultImports": true,
+    "strict": true,
+    "forceConsistentCasingInFileNames": true,
+    "noFallthroughCasesInSwitch": true,
+    "module": "commonjs",
+    "moduleResolution": "node",
+    "resolveJsonModule": true,
+    "isolatedModules": true,
+    "noEmit": true,
+    "jsx": "react-jsx"
+  },
+  "include": [
+      "src", "server"
+  ]
+}