Browse Source

backend: process all blocks

Joystream Stats 4 years ago
parent
commit
e2945952aa

+ 2 - 1
package.json

@@ -30,7 +30,8 @@
     "socket.io": "^2.2.0"
   },
   "scripts": {
-    "start": "concurrently \"PORT=3050 HOST=127.0.0.1 react-scripts start\" \"nodemon -i src server/index.ts\"",
+     "start": "concurrently \"PORT=3050 HOST=127.0.0.1 react-scripts start\" \"nodemon -i src server/index.ts\"",
+    "backend":  "nodemon -i src server/index.ts",
     "build": "react-scripts build",
     "prod": "node server",
     "test": "react-scripts test --env=jsdom",

+ 6 - 1
server/api/members.ts

@@ -12,6 +12,11 @@ import {
 const findMember = (handle: number | string) =>
   handle > 0 ? Member.findByPk(handle) : Member.findOne({ where: { handle } })
 
+const findMemberWithIncludes = (handle: number | string) =>
+  handle > 0
+    ? Member.findByIdWithIncludes(handle)
+    : Member.findWithIncludes({ where: { handle } })
+
 router.get('/', async (req: any, res: any, next: any) => {
   try {
     Member.findAll().then((m: any) => res.json(m))
@@ -22,7 +27,7 @@ router.get('/', async (req: any, res: any, next: any) => {
 
 router.get('/:id', async (req: any, res: any, next: any) => {
   try {
-    res.json(await findMember(req.params.id))
+    res.json(await findMemberWithIncludes(req.params.id))
   } catch (err) {
     next(err)
   }

+ 21 - 3
server/db/models/account.ts

@@ -2,21 +2,39 @@ import db from '../db'
 import { DataTypes } from 'sequelize'
 
 const Account = db.define('account', {
-  address: DataTypes.STRING,
+  key: {
+    type: DataTypes.STRING,
+    primaryKey: true,
+  },
   format: DataTypes.STRING,
   about: DataTypes.TEXT,
 })
 
 Account.findAllWithIncludes = function () {
   return this.findAll({
-    include: [{ model: db.models.member }, { association: 'vote' }],
+    include: [
+      { association: 'validated', attributes: ['id', 'timestamp'] },
+      { model: db.models.member, attributes: ['handle'] },
+    ],
+  })
+}
+
+Account.findByIdWithIncludes = function (id: number) {
+  return this.findByPk(id, {
+    include: [
+      { association: 'validated', attributes: ['id', 'timestamp'] },
+      { model: db.models.member, attributes: ['handle'] },
+    ],
   })
 }
 
 Account.findWithIncludes = function (args: { where: any }) {
   return this.findAll({
     ...args,
-    include: [{ model: db.models.member }, { association: 'vote' }],
+    include: [
+      { association: 'validated', attributes: ['id', 'timestamp'] },
+      { model: db.models.member, attributes: ['handle'] },
+    ],
   })
 }
 

+ 1 - 0
server/db/models/block.ts

@@ -6,6 +6,7 @@ const Block = db.define('block', {
     type: DataTypes.INTEGER,
     primaryKey: true,
   },
+  hash: DataTypes.STRING,
   timestamp: DataTypes.DATE,
   blocktime: DataTypes.INTEGER,
 })

+ 15 - 9
server/db/models/category.ts

@@ -21,11 +21,13 @@ Category.findAllWithIncludes = function () {
         model: db.models.thread,
         include: [
           { model: db.models.post, include: [{ association: 'author' }] },
-          { association: 'author' },
-          { association: 'moderator' },
+          { association: 'creator' },
         ],
       },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }
@@ -38,11 +40,13 @@ Category.findByIdWithIncludes = function (id: number, args?: { where: any }) {
         model: db.models.thread,
         include: [
           { model: db.models.post, include: [{ association: 'author' }] },
-          { association: 'author' },
-          { association: 'moderator' },
+          { association: 'creator' },
         ],
       },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }
@@ -55,11 +59,13 @@ Category.findWithIncludes = function (args?: { where: any }) {
         model: db.models.thread,
         include: [
           { model: db.models.post, include: [{ association: 'author' }] },
-          { association: 'author' },
-          { association: 'moderator' },
+          { association: 'creator' },
         ],
       },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }

+ 35 - 5
server/db/models/era.ts

@@ -7,8 +7,8 @@ const Era = db.define('era', {
     primaryKey: true,
   },
   waiting: DataTypes.INTEGER,
-  actives: DataTypes.INTEGER,
-  maxSlots: DataTypes.INTEGER,
+  active: DataTypes.INTEGER,
+  slots: DataTypes.INTEGER,
   timestamp: DataTypes.DATE,
 })
 
@@ -17,7 +17,17 @@ Era.findAllWithIncludes = function () {
     include: [
       {
         model: db.models.block,
-        include: [{ association: 'author' }, { model: db.models.event }],
+        attributes: ['id', 'blocktime', 'timestamp'],
+        include: [
+          {
+            association: 'validator',
+            attributes: ['key'],
+            include: [
+              { model: db.models.member, attributes: ['id', 'handle'] },
+            ],
+          },
+          { model: db.models.event },
+        ],
       },
     ],
   })
@@ -28,7 +38,17 @@ Era.findByIdWithIncludes = function (id: number) {
     include: [
       {
         model: db.models.block,
-        include: [{ association: 'author' }, { model: db.models.event }],
+        attributes: ['id', 'blocktime', 'timestamp'],
+        include: [
+          {
+            association: 'validator',
+            attributes: ['key'],
+            include: [
+              { model: db.models.member, attributes: ['id', 'handle'] },
+            ],
+          },
+          { model: db.models.event },
+        ],
       },
     ],
   })
@@ -40,7 +60,17 @@ Era.findWithIncludes = function (args: { where: any }) {
     include: [
       {
         model: db.models.block,
-        include: [{ association: 'author' }, { model: db.models.event }],
+        attributes: ['id', 'blocktime', 'timestamp'],
+        include: [
+          {
+            association: 'validator',
+            attributes: ['key'],
+            include: [
+              { model: db.models.member, attributes: ['id', 'handle'] },
+            ],
+          },
+          { model: db.models.event },
+        ],
       },
     ],
   })

+ 26 - 8
server/db/models/index.ts

@@ -8,23 +8,28 @@ import ConsulStake from './councilstake'
 import Era from './era'
 import Event from './event'
 import Proposal from './proposal'
-import ProposalVote from './proposalVote'
+import ProposalPost from './proposalpost'
+import ProposalVote from './proposalvote'
 import Member from './member'
 import Category from './category'
 import Thread from './thread'
 import Post from './post'
+import Moderation from './moderation'
 
 Member.hasMany(Account)
+Member.belongsTo(Account, { as: 'root', constraints: false })
+Member.belongsTo(Account, { as: 'controller', constraints: false })
 Member.hasMany(Consul, { as: 'terms' })
 Member.hasMany(ConsulStake, { as: 'votes' })
-Member.hasMany(Category, { as: 'categories' })
-Member.hasMany(Thread, { as: 'threads' })
-Member.hasMany(Post, { as: 'posts' })
-Member.hasMany(Proposal, { as: 'proposals' })
+Member.hasMany(Category)
+Member.hasMany(Thread)
+Member.hasMany(Post)
+Member.hasMany(Proposal)
 
 Account.belongsTo(Member)
 Account.hasMany(Balance)
-Account.hasMany(Block, { foreignKey: 'validatorId' })
+Account.hasMany(Block, { as: 'validated', foreignKey: 'validatorKey' })
+Account.hasMany(Moderation)
 
 Balance.belongsTo(Account)
 Balance.belongsTo(Era)
@@ -49,11 +54,17 @@ ConsulStake.belongsTo(Member)
 Channel.belongsTo(Member, { as: 'owner' })
 
 Category.hasMany(Thread)
-Category.belongsTo(Member, { as: 'moderator' })
+
+Category.belongsTo(Moderation)
+Thread.belongsTo(Moderation)
+Post.belongsTo(Moderation)
+Moderation.hasMany(Category)
+Moderation.hasMany(Thread)
+Moderation.hasMany(Post)
+Moderation.belongsTo(Account, { as: 'moderator' })
 
 Thread.belongsTo(Category)
 Thread.belongsTo(Member, { as: 'creator' })
-Thread.belongsTo(Member, { as: 'moderator' })
 Thread.hasMany(Post)
 
 Post.belongsTo(Thread)
@@ -61,7 +72,12 @@ Post.belongsTo(Member, { as: 'author' })
 Post.belongsTo(Member, { as: 'moderator' })
 
 Proposal.belongsTo(Member, { as: 'author' })
+Proposal.hasMany(ProposalPost, { as: 'posts' })
 Proposal.hasMany(ProposalVote, { as: 'votes' })
+
+ProposalPost.belongsTo(Proposal)
+ProposalPost.belongsTo(Member, { as: 'author' })
+
 ProposalVote.belongsTo(Proposal)
 ProposalVote.belongsTo(Consul)
 ProposalVote.belongsTo(Member)
@@ -78,8 +94,10 @@ export {
   Event,
   Member,
   Proposal,
+  ProposalPost,
   ProposalVote,
   Category,
   Thread,
   Post,
+  Moderation,
 }

+ 54 - 8
server/db/models/member.ts

@@ -7,7 +7,6 @@ const Member = db.define('member', {
     primaryKey: true,
   },
   createdAt: DataTypes.INTEGER,
-  account: DataTypes.STRING,
   handle: DataTypes.STRING,
   about: DataTypes.TEXT,
 })
@@ -15,14 +14,16 @@ const Member = db.define('member', {
 Member.findAllWithIncludes = function () {
   return this.findAll({
     include: [
-      {
-        model: db.models.post,
-        required: false,
-        include: [{ model: db.models.thread }],
-      },
+      { model: db.models.post, include: [{ model: db.models.thread }] },
       { model: db.models.thread, include: [{ model: db.models.category }] },
       { model: db.models.proposal, include: [{ association: 'votes' }] },
-      { model: db.models.account },
+      {
+        model: db.models.account,
+        include: [
+          { association: 'validated', attributes: ['id', 'timestamp'] },
+        ],
+      },
+
       {
         association: 'terms',
         include: [
@@ -54,6 +55,40 @@ Member.findAllWithIncludes = function () {
 Member.findByIdWithIncludes = function (id: number, args?: { where: any }) {
   return this.findByPk(id, {
     ...args,
+    include: [
+      { model: db.models.post, include: [{ model: db.models.thread }] },
+      { model: db.models.proposal, include: [{ association: 'votes' }] },
+      {
+        model: db.models.account,
+        include: [
+          { association: 'validated', attributes: ['id', 'timestamp'] },
+        ],
+      },
+      {
+        association: 'terms',
+        include: [
+          {
+            association: 'votes',
+            include: [
+              {
+                model: db.models.proposal,
+                include: [{ association: 'author' }],
+              },
+            ],
+          },
+          { association: 'voters', include: [{ model: db.models.member }] },
+        ],
+      },
+      {
+        association: 'votes',
+        include: [
+          {
+            model: db.models.consul,
+            include: [{ model: db.models.member }],
+          },
+        ],
+      },
+    ],
   })
 }
 
@@ -63,7 +98,12 @@ Member.findWithIncludes = function (args: { where: any }) {
     include: [
       { model: db.models.post, include: [{ model: db.models.thread }] },
       { model: db.models.proposal, include: [{ association: 'votes' }] },
-      { model: db.models.account },
+      {
+        model: db.models.account,
+        include: [
+          { association: 'validated', attributes: ['id', 'timestamp'] },
+        ],
+      },
       {
         association: 'terms',
         include: [
@@ -88,6 +128,12 @@ Member.findWithIncludes = function (args: { where: any }) {
           },
         ],
       },
+      {
+        model: db.models.account,
+        include: [
+          { association: 'validated', attributes: ['id', 'timestamp'] },
+        ],
+      },
     ],
   })
 }

+ 45 - 0
server/db/models/moderation.ts

@@ -0,0 +1,45 @@
+import db from '../db'
+import { DataTypes } from 'sequelize'
+
+const Moderation = db.define('moderation', {
+  created: DataTypes.INTEGER,
+  createdAt: DataTypes.DATE,
+  rationale: DataTypes.TEXT,
+})
+
+Moderation.findAllWithIncludes = function () {
+  return this.findAll({
+    include: [
+      { model: db.models.category },
+      { model: db.models.post, include: [{ association: 'author' }] },
+      { association: 'creator' },
+      { association: 'moderator' },
+    ],
+  })
+}
+
+Moderation.findByIdWithIncludes = function (id: number, args?: { where: any }) {
+  return this.findByPk(id, {
+    ...args,
+    include: [
+      { model: db.models.category },
+      { model: db.models.post, include: [{ association: 'author' }] },
+      { association: 'creator' },
+      { association: 'moderator' },
+    ],
+  })
+}
+
+Moderation.findWithIncludes = function (args?: { where: any }) {
+  return this.findAll({
+    ...args,
+    include: [
+      { model: db.models.category },
+      { model: db.models.post, include: [{ association: 'author' }] },
+      { association: 'creator' },
+      { association: 'moderator' },
+    ],
+  })
+}
+
+export default Moderation

+ 13 - 1
server/db/models/proposal.ts

@@ -21,6 +21,10 @@ Proposal.findAllWithIncludes = function () {
   return this.findAll({
     include: [
       { association: 'author', attributes: ['handle'] },
+      {
+        association: 'posts',
+        include: [{ association: 'author', attributes: ['handle'] }],
+      },
       {
         association: 'votes',
         attributes: ['id', 'vote'],
@@ -35,6 +39,10 @@ Proposal.findByIdWithIncludes = function (id: number, args?: { where: any }) {
     ...args,
     include: [
       { association: 'author', attributes: ['handle'] },
+      {
+        association: 'posts',
+        include: [{ association: 'author', attributes: ['handle'] }],
+      },
       {
         association: 'votes',
         attributes: ['id', 'vote'],
@@ -49,10 +57,14 @@ Proposal.findWithIncludes = function (args: { where: any }) {
     ...args,
     include: [
       { association: 'author', attributes: ['handle'] },
+      {
+        association: 'posts',
+        include: [{ association: 'author', attributes: ['handle'] }],
+      },
       {
         association: 'votes',
         attributes: ['id', 'vote'],
-        include: [{ model: db.models.member, attributes: ['id', 'handle'] }],
+        include: [{ association: 'author', attributes: ['id', 'handle'] }],
       },
     ],
   })

+ 38 - 0
server/db/models/proposalpost.ts

@@ -0,0 +1,38 @@
+import db from '../db'
+import { DataTypes } from 'sequelize'
+
+const ProposalPost = db.define('proposalpost', {
+  id: {
+    type: DataTypes.INTEGER,
+    primaryKey: true,
+  },
+  created: DataTypes.INTEGER,
+  updated: DataTypes.INTEGER,
+  edition: DataTypes.INTEGER,
+  text: DataTypes.TEXT,
+})
+
+ProposalPost.findAllWithIncludes = function () {
+  return this.findAll({
+    include: [{ association: 'author', attributes: ['handle'] }],
+  })
+}
+
+ProposalPost.findByIdWithIncludes = function (
+  id: number,
+  args?: { where: any }
+) {
+  return this.findByPk(id, {
+    ...args,
+    include: [{ association: 'author', attributes: ['handle'] }],
+  })
+}
+
+ProposalPost.findWithIncludes = function (args: { where: any }) {
+  return this.findAll({
+    ...args,
+    include: [{ association: 'author', attributes: ['handle'] }],
+  })
+}
+
+export default ProposalPost

+ 43 - 0
server/db/models/proposalvote.ts

@@ -0,0 +1,43 @@
+import db from '../db'
+import { DataTypes } from 'sequelize'
+
+const ProposalVote = db.define('proposalvote', {
+  vote: DataTypes.STRING,
+})
+
+ProposalVote.findAllWithIncludes = function () {
+  return this.findAll({
+    include: [
+      { model: db.models.member, attributes: ['handle'] },
+      { model: db.models.consul },
+      { model: db.models.proposal, attributes: ['title'] },
+    ],
+  })
+}
+
+ProposalVote.findByIdWithIncludes = function (
+  id: number,
+  args?: { where: any }
+) {
+  return this.findByPk(id, {
+    ...args,
+    include: [
+      { model: db.models.member, attributes: ['handle'] },
+      { model: db.models.consul },
+      { model: db.models.proposal, attributes: ['title'] },
+    ],
+  })
+}
+
+ProposalVote.findWithIncludes = function (args: { where: any }) {
+  return this.findAll({
+    ...args,
+    include: [
+      { model: db.models.member, attributes: ['handle'] },
+      { model: db.models.consul },
+      { model: db.models.proposal, attributes: ['title'] },
+    ],
+  })
+}
+
+export default ProposalVote

+ 12 - 3
server/db/models/thread.ts

@@ -17,7 +17,10 @@ Thread.findAllWithIncludes = function () {
       { model: db.models.category },
       { model: db.models.post, include: [{ association: 'author' }] },
       { association: 'creator' },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }
@@ -29,7 +32,10 @@ Thread.findByIdWithIncludes = function (id: number, args?: { where: any }) {
       { model: db.models.category },
       { model: db.models.post, include: [{ association: 'author' }] },
       { association: 'creator' },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }
@@ -41,7 +47,10 @@ Thread.findWithIncludes = function (args?: { where: any }) {
       { model: db.models.category },
       { model: db.models.post, include: [{ association: 'author' }] },
       { association: 'creator' },
-      { association: 'moderator' },
+      {
+        model: db.models.moderation,
+        include: [{ association: 'moderator', attributes: ['handle'] }],
+      },
     ],
   })
 }

+ 276 - 256
server/joystream/index.ts

@@ -13,8 +13,10 @@ import {
   Member,
   Post,
   Proposal,
+  ProposalPost,
   ProposalVote,
   Thread,
+  Moderation,
 } from '../db/models'
 
 import * as get from './lib/getters'
@@ -58,30 +60,49 @@ const CYCLE = VOTINGDURATION + TERMDURATION
 const DELAY = 0 // ms
 let lastUpdate = 0
 let queuedAll = false
-let processing = false
 let queue: any[] = []
-let fetching = ''
+let processing = ''
+let busy = false
+
+const processNext = async () => {
+  if (busy) return
+  const task = queue.shift()
+  if (!task) return
+  const result = await task()
+  busy = false
+  setTimeout(() => processNext(), DELAY)
+}
 
 const getBlockHash = (api: Api, blockId: number) =>
-  api.rpc.chain.getBlockHash(blockId)
+  api.rpc.chain.getBlockHash(blockId).then((array: any) => array.toHuman())
+
+const getEraAtHash = (api: Api, hash: string) =>
+  api.query.staking.activeEra
+    .at(hash)
+    .then((era: Option<ActiveEraInfo>) => era.unwrap().index.toNumber())
 
-const getEraAtBlock = (api: Api, hash: string) =>
-  api.query.staking.activeEra.at(hash)
+const getEraAtBlock = async (api: Api, block: number) =>
+  getEraAtHash(api, await getBlockHash(api, block))
 
 const getTimestamp = async (api: Api, hash?: string) =>
-  moment
-    .utc(
-      hash
-        ? await api.query.timestamp.now.at(hash)
-        : await api.query.timestamp.now(),
-    )
-    .valueOf()
+  hash
+    ? moment.utc(await api.query.timestamp.now.at(hash)).valueOf()
+    : moment.utc(await api.query.timestamp.now()).valueOf()
+
+const findCouncilAtBlock = (api: Api, block: number) =>
+  Council.findOne({
+    where: {
+      start: { [Op.lte]: block },
+      end: { [Op.gte]: block - VOTINGDURATION },
+    },
+  })
 
 const addBlock = async (
   api: Api,
   io: any,
   header: { number: number; author: string },
   status: Status = {
+    block: 0,
     era: 0,
     round: 0,
     members: 0,
@@ -91,80 +112,57 @@ const addBlock = async (
     posts: 0,
     proposals: 0,
     proposalPosts: 0,
-  },
+  }
 ): Promise<Status> => {
   const id = +header.number
-  const last = await Block.findByPk(id - 1)
   const exists = await Block.findByPk(id)
   if (exists) {
     console.error(`TODO handle fork`, String(header.author))
     return status
   }
-  const timestamp = await getTimestamp(api)
-  const blocktime = last ? timestamp - last.timestamp : 6000
-  const address = header.author?.toString()
-  const account = await Account.findOrCreate({ where: { address } })
-  const block = await Block.create({ id, timestamp, blocktime })
-  block.setValidator(account.id)
-
-  const currentEra = Number(await api.query.staking.currentEra())
-  await Era.findOrCreate({ where: { id: currentEra } })
-  await block.setEra(currentEra)
-  io.emit('block', await Block.findByIdWithIncludes(block.id))
+
+  const block = await processBlock(api, id)
+  const key = header.author?.toString()
+  const [account] = await Account.findOrCreate({ where: { key } })
+  await block.setValidator(account.key)
+  //account.addBlock(block.id) // TODO needed?
+  io.emit('block', await Block.findByIdWithIncludes(id))
 
   // logging
-  const member = await fetchMemberByAccount(api, address)
-  const handle = member ? member.handle : address
-  const f = fetching !== '' ? `, fetching ${fetching}` : ''
-  const q = queue.length ? ` (${queue.length} queued${f})` : ''
-  console.log(`[Joystream] block ${block.id} ${handle}${q}`)
-
-  await processEvents(api, id)
-  if (await isEraInfoAvailable(api, id)) {
-    await updateEra(api, currentEra, id)
-  }
+  const handle = await getHandleOrKey(api, key)
+  const q = queue.length ? chalk.green(` [${queue.length}:${processing}]`) : ''
+  console.log(`[Joystream] block ${id} ${handle} ${q}`)
 
-  //updateBalances(api, id)
-  return updateStatus(api, status, currentEra)
+  return updateStatus(api, status, id)
+}
+
+const processBlock = async (api: Api, id: number) => {
+  processing = `block ${id}`
+  const last = await Block.findByPk(id - 1)
+  const [block] = await Block.findOrCreate({ where: { id } })
+  block.hash = await getBlockHash(api, id)
+  block.timestamp = await getTimestamp(api, block.hash)
+  block.blocktime = last ? block.timestamp - last.timestamp : 6000
+  block.save()
+
+  processEvents(api, id, block.hash)
+  await importEraAtBlock(api, id, block.hash)
+  return block
 }
 
 const addBlockRange = async (
   api: Api,
   startBlock: number,
-  endBlock: number,
+  endBlock: number
 ) => {
-  const previousHash = await getBlockHash(api, startBlock - 1)
-  let previousEra = (await api.query.staking.activeEra.at(
-    previousHash,
-  )) as Option<ActiveEraInfo>
-
-  for (let i = startBlock; i < endBlock; i++) {
-    console.log(`[Joystream] Processing block ${i}`)
-
-    const hash = await getBlockHash(api, i)
-    const blockEra = (await api.query.staking.activeEra.at(
-      hash,
-    )) as Option<ActiveEraInfo>
-    const currentEra = blockEra.unwrap().index.toNumber();
-
-    if (
-      currentEra ===
-      previousEra.unwrap().index.toNumber()
-    ) {
-      continue
-    }
-
-    if (await isEraInfoAvailable(api, i)) {
-      await Era.findOrCreate({ where: { id: currentEra } })
-      await updateEra(api, currentEra, i)
-      previousEra = blockEra
-    }
-  }
+  for (let block = startBlock; block <= endBlock; block++)
+    queue.push(() => processBlock(api, block))
 }
 
-const updateStatus = async (api: Api, old: Status, era: number) => {
+const updateStatus = async (api: Api, old: Status, block: number) => {
   const status = {
-    era,
+    block,
+    era: await getEraAtBlock(api, block),
     round: Number(await api.query.councilElection.round()),
     members: (await api.query.members.nextMemberId()) - 1,
     channels: await get.currentChannelId(api),
@@ -174,7 +172,7 @@ const updateStatus = async (api: Api, old: Status, era: number) => {
     posts: await get.currentPostId(api),
 
     proposals: await get.proposalCount(api),
-    proposalPosts: await api.query.proposalsDiscussion.postCount(),
+    proposalPosts: (await api.query.proposalsDiscussion.postCount()).toHuman(),
   }
   if (!queuedAll) fetchAll(api, status)
   else {
@@ -185,13 +183,13 @@ const updateStatus = async (api: Api, old: Status, era: number) => {
     status.channels > old.channels && fetchChannel(api, status.channels)
     status.categories > old.categories && fetchCategory(api, status.categories)
     status.proposalPosts > old.proposalPosts &&
-    fetchProposalPosts(api, status.proposalPosts)
+      fetchProposalPosts(api, status.proposalPosts)
   }
   return status
 }
 
 const fetchAll = async (api: Api, status: Status) => {
-  // trying to avoid SequelizeUniqueConstraintError
+  queue.push(() => fetchAccounts(api, status.block))
 
   for (let id = status.members; id > 0; id--) {
     queue.push(() => fetchMember(api, id))
@@ -203,7 +201,6 @@ const fetchAll = async (api: Api, status: Status) => {
   for (let id = status.proposals; id > 0; id--) {
     queue.push(() => fetchProposal(api, id))
   }
-  // queue.push(() => fetchProposalPosts(api, status.proposalPosts))
 
   for (let id = status.channels; id > 0; id--) {
     queue.push(() => fetchChannel(api, id))
@@ -218,67 +215,60 @@ const fetchAll = async (api: Api, status: Status) => {
     queue.push(() => fetchPost(api, id))
   }
 
+  queue.push(() => fetchProposalPosts(api, status.proposalPosts))
+  queue.push(() => addBlockRange(api, 1, status.block))
   queuedAll = true
   processNext()
 }
 
-const processNext = async () => {
-  if (processing) return
-  processing = true
-  const task = queue.shift()
-  if (!task) return
-  const result = await task()
-  processing = false
-  setTimeout(() => processNext(), DELAY)
-}
-
-const processEvents = async (api: Api, blockId: number) => {
-  const blockHash = await getBlockHash(api, blockId)
-  const blockEvents = await api.query.system.events.at(blockHash)
-  blockEvents.forEach(({ event }: EventRecord) => {
-    let { section, method, data } = event
-    Event.create({ blockId, section, method, data: JSON.stringify(data) })
-  })
+const processEvents = async (api: Api, blockId: number, hash: string) => {
+  processing = `events block ${blockId}`
+  try {
+    const blockEvents = await api.query.system.events.at(hash)
+    blockEvents.forEach(({ event }: EventRecord) => {
+      let { section, method, data } = event
+      Event.create({ blockId, section, method, data: JSON.stringify(data) })
+    })
+  } catch (e) {
+    console.log(`failed to fetch events for block  ${blockId} ${hash}`)
+  }
   // TODO catch votes, posts, proposals?
 }
 
-const isEraInfoAvailable = async (api: Api, blockId: number) => {
-  const hash = await getBlockHash(api, blockId)
-  let totalValidators = (await api.query.staking.snapshotValidators.at(
-    hash,
-  )) as Option<Vec<AccountId>>
-  return !totalValidators.isEmpty
-}
+const fetchValidators = async (api: Api, hash: string) =>
+  api.query.staking.snapshotValidators.at(hash) as Option<Vec<AccountId>>
 
-const updateEra = async (api: Api, eraId: number, blockId: number) => {
+const importEraAtBlock = async (api: Api, blockId: number, hash: string) => {
+  const id = await getEraAtHash(api, hash)
+  const [era] = await Era.findOrCreate({ where: { id } })
+  if (era.active) return
+  era.addBlock(blockId)
 
-  let dbEra = await Era.findByPk(eraId)
-  if (dbEra.actives !== null) {
-    console.log(`[Joystream] Era ${eraId} contains info, skipping update...`)
-    return
+  processing = `era ${id}`
+  try {
+    fetchValidators(api, hash).then(
+      async (snapshot: Option<Vec<AccountId>>) => {
+        if (snapshot.isEmpty) return
+        console.log(`[Joystream] Found validator info for era ${id}`)
+        const validatorCount = snapshot.unwrap().length
+        era.slots = (await api.query.staking.validatorCount.at(hash)).toNumber()
+        era.active = Math.min(era.slots, validatorCount)
+        era.waiting =
+          validatorCount > era.slots ? validatorCount - era.slots : 0
+        era.stake = await api.query.staking.erasTotalStake.at(hash, id)
+        const chainTimestamp = (await api.query.timestamp.now.at(
+          hash
+        )) as Moment
+        era.timestamp = moment(chainTimestamp.toNumber())
+        // era.update({ slots, active, waiting, stake, timestamp })
+        era.blockId = id
+        era.save()
+        updateBalances(api, hash)
+      }
+    )
+  } catch (e) {
+    console.error(`import era ${blockId} ${hash}`, e)
   }
-
-  const hash = await getBlockHash(api, blockId)
-  let totalValidators = (await api.query.staking.snapshotValidators.at(
-    hash,
-  )) as Option<Vec<AccountId>>
-
-  console.log(`[Joystream] Processing era ${eraId}`)
-
-  const totalNrValidators = totalValidators.unwrap().length
-  dbEra.maxSlots = Number(
-    (await api.query.staking.validatorCount.at(hash)).toString(),
-  )
-  dbEra.actives = Math.min(dbEra.maxSlots, totalNrValidators)
-  dbEra.waiting =
-    totalNrValidators > dbEra.maxSlots ? totalNrValidators - dbEra.maxSlots : 0
-
-  const chainTimestamp = (await api.query.timestamp.now.at(hash)) as Moment
-  dbEra.timestamp = new Date(chainTimestamp.toNumber())
-
-  dbEra.stake = await api.query.staking.erasTotalStake.at(hash, eraId)
-
-  await dbEra.save();
 }
 
 const validatorStatus = async (api: Api, blockId: number) => {
@@ -296,39 +286,29 @@ const validatorStatus = async (api: Api, blockId: number) => {
   return { blockId, actives, waiting, maxSlots, date }
 }
 
-const getAccountAtBlock = (
-  api: Api,
-  hash: string,
-  account: string,
-): Promise<AccountInfo> => api.query.system.account.at(hash, account)
-
-// TODO when to cal
-const fetchAccounts = async (api: Api, blockId: number) => {
-  api.query.system.account.entries().then((account: any) => {
-    const address = account[0].toHuman()[0]
-    Account.create({ address })
-  })
-}
-
-const updateBalances = async (api: Api, blockId: number) => {
-  const blockHash = await getBlockHash(api, blockId)
+const updateBalances = async (api: Api, blockHash: string) => {
   const currentEra: number = await api.query.staking.currentEra.at(blockHash)
   const era = await Era.findOrCreate({ where: { id: currentEra } })
-
-  Account.findAll().then(async (account: any) => {
-    const { id, address } = account
-    if (!address) return
-    console.log(`updating balance of`, id, address)
-
-    const { data } = await getAccountAtBlock(api, blockHash, address)
-    const { free, reserved, miscFrozen, feeFrozen } = data
-    const balance = { available: free, reserved, frozen: miscFrozen }
-    Balance.create(balance).then((balance: any) => {
-      balance.setAccount(id)
-      balance.setEra(era.id)
-      console.log(`balance`, era.id, address, balance.available)
+  try {
+    processing = `balances ${era}`
+    Account.findAll().then(async (account: any) => {
+      const { key } = account
+      if (!key) return
+      console.log(`updating balance of`, key, key)
+
+      const { data } = await getAccountAtBlock(api, blockHash, key)
+      const { free, reserved, miscFrozen, feeFrozen } = data
+      const balance = { available: free, reserved, frozen: miscFrozen }
+      console.log(`balance ${era}`, balance)
+      Balance.create(balance).then((balance: any) => {
+        balance.setAccount(key)
+        balance.setEra(era.id)
+        console.log(`balance`, era.id, key, balance.available)
+      })
     })
-  })
+  } catch (e) {
+    console.error(`balances era ${era}`)
+  }
 }
 
 const fetchTokenomics = async () => {
@@ -343,10 +323,10 @@ const fetchChannel = async (api: Api, id: number) => {
   const exists = await Channel.findByPk(id)
   if (exists) return exists
 
-  fetching = `channel ${id}`
+  processing = `channel ${id}`
   const data = await api.query.contentWorkingGroup.channelById(id)
   const { handle, title, description, avatar, banner, content, created } = data
-  //const accountId = String(data.role_account)
+  // TODO const accountId = String(data.role_account)
   const channel = {
     id,
     handle: String(handle),
@@ -371,15 +351,14 @@ const fetchCategory = async (api: Api, id: number) => {
   const exists = await Category.findByPk(+id)
   if (exists) return exists
 
-  fetching = `category ${id}`
+  processing = `category ${id}`
   const data = await api.query.forum.categoryById(id)
-  const { title, description, deleted, archived } = data
-  const threadId = +data.thread_id // TODO needed?
-  const moderator: string = String(data.moderator_id) // account
 
-  const cat = {
+  const { title, description, deleted, archived } = data
+  const category = await Category.create({
     id,
     title,
+    threadId: +data.thread_id, // TODO needed?
     description,
     createdAt: +data.created_at.block,
     deleted,
@@ -388,10 +367,8 @@ const fetchCategory = async (api: Api, id: number) => {
     moderatedThreads: Number(data.num_direct_moderated_threads),
     unmoderatedThreads: Number(data.num_direct_unmoderated_threads),
     //position:+data.position_in_parent_category // TODO sometimes NaN,
-  }
-  const category = await Category.create(cat)
-  const mod = await fetchMemberByAccount(api, moderator)
-  if (mod) category.setModerator(mod.id)
+  })
+  createModeration(api, { categoryId: id }, String(data.moderator_id), category)
   return category
 }
 
@@ -400,37 +377,44 @@ const fetchPost = async (api: Api, id: number) => {
   const exists = await Post.findByPk(id)
   if (exists) return exists
 
-  fetching = `post ${id}`
+  processing = `post ${id}`
   const data = await api.query.forum.postById(id)
 
-  const threadId = Number(data.thread_id)
-  const text = data.current_text
-  const moderation = data.moderation
-  //const history = data.text_change_history;
-  const createdAt = data.created_at.block
   const author: string = String(data.author_id)
+  const member = await fetchMemberByAccount(api, author)
+  const authorId = member ? member.id : null
 
-  const post = await Post.create({ id, text, createdAt })
+  const threadId = Number(data.thread_id)
   const thread = await fetchThread(api, threadId)
-  if (thread) post.setThread(thread.id)
-  const member = await fetchMemberByAccount(api, author)
-  if (member) {
-    post.setAuthor(member.id)
-    member.addPost(post.id)
-  }
-  if (moderation) {
-    const mod = await fetchMemberByAccount(api, moderation)
-    post.setModerator(mod)
-  }
+
+  const text = data.current_text
+  const history = data.text_change_history // TODO needed?
+  const createdAt = data.created_at.block
+  const post = await Post.create({ id, authorId, text, createdAt, threadId })
+  if (data.moderation)
+    createModeration(api, { postId: id }, data.moderation, post)
   return post
 }
 
+const createModeration = async (
+  api: Api,
+  where: {},
+  key: string,
+  object: { setModeration: (id: number) => {} }
+) => {
+  if (key === '') return
+  await Account.findOrCreate({ where: { key } })
+  const moderation = await Moderation.create({ moderatorKey: key })
+  object.setModeration(moderation.id)
+  return moderation
+}
+
 const fetchThread = async (api: Api, id: number) => {
   if (id <= 0) return
   const exists = await Thread.findByPk(id)
   if (exists) return exists
 
-  fetching = `thread ${id}`
+  processing = `thread ${id}`
   const data = await api.query.forum.threadById(id)
   const { title, moderation, nr_in_category } = data
   const account = String(data.author_id)
@@ -446,22 +430,15 @@ const fetchThread = async (api: Api, id: number) => {
   const author = await fetchMemberByAccount(api, account)
   if (author) thread.setCreator(author.id)
   if (moderation) {
-    /* TODO
-  Error: Invalid value ModerationAction(3) [Map] {
-[1]   'moderated_at' => BlockAndTime(2) [Map] {
-[1]     'block' => <BN: 4f4ff>,
-[1]     'time' => <BN: 17526e65a40>,
-[1]     registry: TypeRegistry {},
-[1]     block: [Getter],
-[1]     time: [Getter],
-[1]     typeDefs: { block: [Function: U32], time: [Function: U64] }
-[1]   },
-[1]   'moderator_id'
-[1]   'rationale' => [String (Text): 'Irrelevant as posted in another thread.'] {
-*/
-    //console.log(`thread mod`, moderation
-    //const mod = await fetchMemberByAccount(api, moderation)
-    //if (mod) thread.setModeration(mod.id)
+    const { moderated_at, moderator_id, rationale } = moderation
+    const created = moderated_at.block
+    const createdAt = moment.utc(moderated_at.time)
+    createModeration(
+      api,
+      { created, createdAt, rationale },
+      moderator_id.toHuman(),
+      thread
+    )
   }
   return thread
 }
@@ -472,7 +449,7 @@ const fetchCouncil = async (api: Api, round: number) => {
   const exists = await Council.findByPk(round)
   if (exists) return exists
 
-  fetching = `council ${round}`
+  processing = `council ${round}`
   const start = 57601 + (round - 1) * CYCLE
   const end = start + TERMDURATION
   let council = { round, start, end, startDate: 0, endDate: 0 }
@@ -495,11 +472,11 @@ const fetchCouncil = async (api: Api, round: number) => {
   try {
     Council.create(council).then(({ round }: any) =>
       seats.map(({ member, stake, backers }) =>
-        fetchMemberByAccount(api, member.toHuman()).then((m: any) =>
+        fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
           Consul.create({
             stake: Number(stake),
             councilRound: round,
-            memberId: m.id,
+            memberId: id,
           }).then((consul: any) =>
             backers.map(async ({ member, stake }) =>
               fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
@@ -507,12 +484,12 @@ const fetchCouncil = async (api: Api, round: number) => {
                   stake: Number(stake),
                   consulId: consul.id,
                   memberId: id,
-                }),
-              ),
-            ),
-          ),
-        ),
-      ),
+                })
+              )
+            )
+          )
+        )
+      )
     )
   } catch (e) {
     console.error(`Failed to save council ${round}`, e)
@@ -527,58 +504,75 @@ const fetchProposal = async (api: Api, id: number) => {
     return exists
   }
 
-  fetching = `proposal ${id}`
+  processing = `proposal ${id}`
   const proposal = await get.proposalDetail(api, id)
+  await fetchMember(api, proposal.authorId)
   fetchProposalVotes(api, proposal)
   return Proposal.create(proposal)
 }
 
-const fetchProposalPosts = async (api: Api, max: number) => {
-  console.log(`posts`, max)
-  let postId = 1
-  for (let threadId = 1; postId <= max; threadId++) {
-    fetching = `proposal posts ${threadId} ${postId}`
-    const post = await api.query.proposalsDiscussion.postThreadIdByPostId(
-      threadId,
-      postId,
-    )
-    if (post.text.length) {
-      console.log(postId, threadId, post.text.toHuman())
-      postId++
+const fetchProposalPost = (api: Api, threadId: number, postId: number) =>
+  api.query.proposalsDiscussion.postThreadIdByPostId(threadId, postId)
+
+const fetchProposalPosts = async (api: Api, posts: number) => {
+  const threads = (await api.query.proposalsDiscussion.threadCount()).toNumber()
+  let proposalId = 1
+
+  for (let id = 1; id <= posts && proposalId <= threads; ) {
+    const exists = await ProposalPost.findByPk(id)
+    if (exists) {
+      id++
+      proposalId = 1
+      continue
     }
-  }
-}
 
-const findCouncilAtBlock = (api: Api, block: number) => {
-  if (!block) {
-    console.error(`[findCouncilAtBlock] empty block`)
-    return
+    processing = `proposal post ${id}/${posts} ${proposalId}/${threads}`
+    const post = await fetchProposalPost(api, proposalId, id)
+
+    if (!post.text.length) {
+      proposalId++
+      continue
+    }
+
+    const proposal = await Proposal.findByPk(proposalId)
+    if (!proposal) {
+      console.warn(`[fetchProposalPosts] proposal ${proposalId} not found.`)
+      id++
+      continue
+    }
+    ProposalPost.create({
+      id,
+      text: post.text.toHuman(),
+      created: Number(post.created_at),
+      updated: Number(post.updated_at),
+      edition: Number(post.edition_number),
+      authorId: Number(post.author_id),
+    }).then((p: any) => proposal.addPost(p))
+
+    id++
+    proposalId = 1
   }
-  return Council.findOne({
-    where: {
-      start: { [Op.lte]: block },
-      end: { [Op.gte]: block - VOTINGDURATION },
-    },
-  })
 }
 
 const fetchProposalVotes = async (api: Api, proposal: ProposalDetail) => {
   if (!proposal) return console.error(`[fetchProposalVotes] empty proposal`)
-  fetching = `votes proposal ${proposal.id}`
-  const { createdAt } = proposal
-  if (!createdAt) return console.error(`empty start block`, proposal)
+  processing = `votes proposal ${proposal.id}`
+  const { createdAt, finalizedAt } = proposal
   try {
-    const start = await findCouncilAtBlock(api, createdAt)
+    const start = createdAt ? await findCouncilAtBlock(api, createdAt) : null
     if (start) start.addProposal(proposal.id)
-    else return console.error(`no council found for proposal ${proposal.id}`)
+    else
+      return console.error(
+        `[fetchProposalVotes] no council found for proposal ${proposal.id}`
+      )
     // some proposals make it into a second term
-    const end = await findCouncilAtBlock(api, proposal.finalizedAt)
-    const councils = [start.round, end && end.round]
+    const end = finalizedAt ? await findCouncilAtBlock(api, finalizedAt) : null
+    const councils = [start && start.round, end && end.round]
     const consuls = await Consul.findAll({
       where: { councilRound: { [Op.or]: councils } },
     })
     consuls.map(({ id, memberId }: any) =>
-      fetchProposalVoteByConsul(api, proposal.id, id, memberId),
+      fetchProposalVoteByConsul(api, proposal.id, id, memberId)
     )
   } catch (e) {
     console.log(`failed to fetch votes of proposal ${proposal.id}`, e)
@@ -589,9 +583,9 @@ const fetchProposalVoteByConsul = async (
   api: Api,
   proposalId: number,
   consulId: number,
-  memberId: number,
+  memberId: number
 ): Promise<any> => {
-  fetching = `vote by ${consulId} for proposal ${proposalId}`
+  processing = `vote by ${consulId} for proposal ${proposalId}`
   const exists = await ProposalVote.findOne({
     where: { proposalId, memberId, consulId },
   })
@@ -604,39 +598,65 @@ const fetchProposalVoteByConsul = async (
   if (!hasVoted.toNumber()) return
 
   const vote = (await query.voteExistsByProposalByVoter(...args)).toHuman()
+  await fetchMember(api, memberId) // TODO needed?
   return ProposalVote.create({ vote: vote, proposalId, consulId, memberId })
 }
 
 // accounts
-const fetchMemberByAccount = async (
+const getHandleOrKey = async (api: Api, key: string) => {
+  const member = await fetchMemberByAccount(api, key)
+  return member ? member.handle : key //abbrKey(key)
+}
+
+const abbrKey = (key: string) =>
+  `${key.slice(0, 5)}..${key.slice(key.length - 5)}`
+
+const getAccountAtBlock = (
   api: Api,
-  account: string,
-): Promise<MemberType | undefined> => {
-  if (!account) {
-    console.error(`fetchMemberByAccount called without account`)
-    return undefined
-  }
-  const exists = await Member.findOne({ where: { account } })
-  if (exists) return exists
-  const id: number = Number(await get.memberIdByAccount(api, account))
-  return id ? fetchMember(api, id) : undefined
+  hash: string,
+  account: string
+): Promise<AccountInfo> => api.query.system.account.at(hash, account)
+
+const fetchAccounts = async (api: Api, blockId: number) => {
+  processing = `accounts`
+  api.query.system.account
+    .entries()
+    .then((account: any) =>
+      Account.findOrCreate({ where: { key: account[0][0].toHuman()[0] } })
+    )
+}
+
+const fetchMemberByAccount = async (api: Api, rootKey: string) => {
+  const member = await Member.findOne({ where: { rootKey } })
+  if (member) return member
+  const id = Number(await get.memberIdByAccount(api, rootKey))
+  if (id) return fetchMember(api, id)
+  else Account.findOrCreate({ where: { key: rootKey } })
 }
 
 const fetchMember = async (
   api: Api,
-  id: number,
+  id: number
 ): Promise<MemberType | undefined> => {
   if (id <= 0) return
   const exists = await Member.findByPk(+id)
   if (exists) return exists
 
-  fetching = `member ${id}`
+  processing = `member ${id}`
   const membership = await get.membership(api, id)
   const about = String(membership.about)
-  const account = String(membership.root_account)
   const handle = String(membership.handle)
   const createdAt = +membership.registered_at_block
-  return Member.create({ id, about, account, createdAt, handle })
+  const rootKey = String(membership.root_account)
+
+  return Member.create({ id, about, createdAt, handle, rootKey }).then(
+    (member: any) => {
+      Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) =>
+        account.setMember(id)
+      )
+      return member
+    }
+  )
 }
 
 module.exports = { addBlock, addBlockRange }

+ 1 - 0
server/types.ts

@@ -266,6 +266,7 @@ export interface CalendarGroup {
 }
 
 export interface Status {
+  block: number
   era: number
   round: number
   members: number