index.ts 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. import { Op } from 'sequelize'
  2. import {
  3. Account,
  4. Balance,
  5. Block,
  6. Category,
  7. Channel,
  8. Council,
  9. Consul,
  10. ConsulStake,
  11. Era,
  12. Event,
  13. Member,
  14. Post,
  15. Proposal,
  16. ProposalPost,
  17. ProposalVote,
  18. Thread,
  19. Moderation,
  20. } from '../db/models'
  21. import * as get from './lib/getters'
  22. //import {fetchReports} from './lib/github'
  23. import axios from 'axios'
  24. import moment from 'moment'
  25. import chalk from 'chalk'
  26. import { VoteKind } from '@joystream/types/proposals'
  27. import { Seats } from '@joystream/types/council'
  28. import { AccountInfo } from '@polkadot/types/interfaces/system'
  29. import {
  30. Api,
  31. Handles,
  32. IState,
  33. MemberType,
  34. CategoryType,
  35. ChannelType,
  36. PostType,
  37. Seat,
  38. ThreadType,
  39. CouncilType,
  40. ProposalDetail,
  41. Status,
  42. } from '../types'
  43. import {
  44. AccountId,
  45. Moment,
  46. ActiveEraInfo,
  47. EventRecord,
  48. } from '@polkadot/types/interfaces'
  49. import Option from '@polkadot/types/codec/Option'
  50. import { Vec } from '@polkadot/types'
  51. // TODO fetch consts from db/chain
  52. const TERMDURATION = 144000
  53. const VOTINGDURATION = 57601
  54. const CYCLE = VOTINGDURATION + TERMDURATION
  55. const DELAY = 0 // ms
  56. let lastUpdate = 0
  57. let queuedAll = false
  58. let queue: any[] = []
  59. let processing = ''
  60. let busy = false
  61. const processNext = async () => {
  62. if (busy) return
  63. const task = queue.shift()
  64. if (!task) return
  65. const result = await task()
  66. busy = false
  67. setTimeout(() => processNext(), DELAY)
  68. }
  69. const getBlockHash = (api: Api, blockId: number) =>
  70. api.rpc.chain.getBlockHash(blockId).then((array: any) => array.toHuman())
  71. const getEraAtHash = (api: Api, hash: string) =>
  72. api.query.staking.activeEra
  73. .at(hash)
  74. .then((era: Option<ActiveEraInfo>) => era.unwrap().index.toNumber())
  75. const getEraAtBlock = async (api: Api, block: number) =>
  76. getEraAtHash(api, await getBlockHash(api, block))
  77. const getTimestamp = async (api: Api, hash?: string) =>
  78. hash
  79. ? moment.utc(await api.query.timestamp.now.at(hash)).valueOf()
  80. : moment.utc(await api.query.timestamp.now()).valueOf()
  81. const findCouncilAtBlock = (api: Api, block: number) =>
  82. Council.findOne({
  83. where: {
  84. start: { [Op.lte]: block },
  85. end: { [Op.gte]: block - VOTINGDURATION },
  86. },
  87. })
  88. const addBlock = async (
  89. api: Api,
  90. io: any,
  91. header: { number: number; author: string },
  92. status: Status = {
  93. block: 0,
  94. era: 0,
  95. round: 0,
  96. members: 0,
  97. channels: 0,
  98. categories: 0,
  99. threads: 0,
  100. posts: 0,
  101. proposals: 0,
  102. proposalPosts: 0,
  103. }
  104. ): Promise<Status> => {
  105. const id = +header.number
  106. const exists = await Block.findByPk(id)
  107. if (exists) {
  108. console.error(`TODO handle fork`, String(header.author))
  109. return status
  110. }
  111. const block = await processBlock(api, id)
  112. const key = header.author?.toString()
  113. const [account] = await Account.findOrCreate({ where: { key } })
  114. await block.setValidator(account.key)
  115. //account.addBlock(block.id) // TODO needed?
  116. io.emit('block', await Block.findByIdWithIncludes(id))
  117. // logging
  118. const handle = await getHandleOrKey(api, key)
  119. const q = queue.length ? chalk.green(` [${queue.length}:${processing}]`) : ''
  120. console.log(`[Joystream] block ${id} ${handle} ${q}`)
  121. return updateStatus(api, status, id)
  122. }
  123. const processBlock = async (api: Api, id: number) => {
  124. processing = `block ${id}`
  125. const last = await Block.findByPk(id - 1)
  126. const [block] = await Block.findOrCreate({ where: { id } })
  127. block.hash = await getBlockHash(api, id)
  128. block.timestamp = await getTimestamp(api, block.hash)
  129. block.blocktime = last ? block.timestamp - last.timestamp : 6000
  130. block.save()
  131. processEvents(api, id, block.hash)
  132. await importEraAtBlock(api, id, block.hash)
  133. return block
  134. }
  135. const addBlockRange = async (
  136. api: Api,
  137. startBlock: number,
  138. endBlock: number
  139. ) => {
  140. for (let block = startBlock; block <= endBlock; block++)
  141. queue.push(() => processBlock(api, block))
  142. }
  143. const updateStatus = async (api: Api, old: Status, block: number) => {
  144. const status = {
  145. block,
  146. era: await getEraAtBlock(api, block),
  147. round: Number(await api.query.councilElection.round()),
  148. members: (await api.query.members.nextMemberId()) - 1,
  149. channels: await get.currentChannelId(api),
  150. categories: await get.currentCategoryId(api),
  151. threads: await get.currentThreadId(api),
  152. posts: await get.currentPostId(api),
  153. proposals: await get.proposalCount(api),
  154. proposalPosts: (await api.query.proposalsDiscussion.postCount()).toHuman(),
  155. }
  156. if (!queuedAll) fetchAll(api, status)
  157. else {
  158. // TODO catch if more than one are added
  159. status.members > old.members && fetchMember(api, status.members)
  160. status.posts > old.posts && fetchPost(api, status.posts)
  161. status.proposals > old.proposals && fetchProposal(api, status.proposals)
  162. status.channels > old.channels && fetchChannel(api, status.channels)
  163. status.categories > old.categories && fetchCategory(api, status.categories)
  164. status.proposalPosts > old.proposalPosts &&
  165. fetchProposalPosts(api, status.proposalPosts)
  166. }
  167. return status
  168. }
  169. const fetchAll = async (api: Api, status: Status) => {
  170. queue.push(() => fetchAccounts(api, status.block))
  171. for (let id = status.members; id > 0; id--) {
  172. queue.push(() => fetchMember(api, id))
  173. }
  174. for (let id = status.round; id > 0; id--) {
  175. queue.push(() => fetchCouncil(api, id))
  176. }
  177. for (let id = status.proposals; id > 0; id--) {
  178. queue.push(() => fetchProposal(api, id))
  179. }
  180. for (let id = status.channels; id > 0; id--) {
  181. queue.push(() => fetchChannel(api, id))
  182. }
  183. for (let id = status.categories; id > 0; id--) {
  184. queue.push(() => fetchCategory(api, id))
  185. }
  186. for (let id = status.threads; id > 0; id--) {
  187. queue.push(() => fetchThread(api, id))
  188. }
  189. for (let id = status.posts; id > 0; id--) {
  190. queue.push(() => fetchPost(api, id))
  191. }
  192. queue.push(() => fetchProposalPosts(api, status.proposalPosts))
  193. queue.push(() => addBlockRange(api, 1, status.block))
  194. queuedAll = true
  195. processNext()
  196. }
  197. const processEvents = async (api: Api, blockId: number, hash: string) => {
  198. processing = `events block ${blockId}`
  199. try {
  200. const blockEvents = await api.query.system.events.at(hash)
  201. blockEvents.forEach(({ event }: EventRecord) => {
  202. let { section, method, data } = event
  203. Event.create({ blockId, section, method, data: JSON.stringify(data) })
  204. })
  205. } catch (e) {
  206. console.log(`failed to fetch events for block ${blockId} ${hash}`)
  207. }
  208. // TODO catch votes, posts, proposals?
  209. }
  210. const fetchValidators = async (api: Api, hash: string) =>
  211. api.query.staking.snapshotValidators.at(hash) as Option<Vec<AccountId>>
  212. const importEraAtBlock = async (api: Api, blockId: number, hash: string) => {
  213. const id = await getEraAtHash(api, hash)
  214. const [era] = await Era.findOrCreate({ where: { id } })
  215. if (era.active) return
  216. era.addBlock(blockId)
  217. processing = `era ${id}`
  218. try {
  219. fetchValidators(api, hash).then(
  220. async (snapshot: Option<Vec<AccountId>>) => {
  221. if (snapshot.isEmpty) return
  222. console.log(`[Joystream] Found validator info for era ${id}`)
  223. const validatorCount = snapshot.unwrap().length
  224. era.slots = (await api.query.staking.validatorCount.at(hash)).toNumber()
  225. era.active = Math.min(era.slots, validatorCount)
  226. era.waiting =
  227. validatorCount > era.slots ? validatorCount - era.slots : 0
  228. era.stake = await api.query.staking.erasTotalStake.at(hash, id)
  229. const chainTimestamp = (await api.query.timestamp.now.at(
  230. hash
  231. )) as Moment
  232. era.timestamp = moment(chainTimestamp.toNumber())
  233. // era.update({ slots, active, waiting, stake, timestamp })
  234. era.blockId = id
  235. era.save()
  236. updateBalances(api, hash)
  237. }
  238. )
  239. } catch (e) {
  240. console.error(`import era ${blockId} ${hash}`, e)
  241. }
  242. }
  243. const validatorStatus = async (api: Api, blockId: number) => {
  244. const hash = await getBlockHash(api, blockId)
  245. let totalValidators = await api.query.staking.snapshotValidators.at(hash)
  246. if (totalValidators.isEmpty) return
  247. let totalNrValidators = totalValidators.unwrap().length
  248. const maxSlots = Number(await api.query.staking.validatorCount.at(hash))
  249. const actives = Math.min(maxSlots, totalNrValidators)
  250. const waiting =
  251. totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0
  252. let timestamp = await api.query.timestamp.now.at(hash)
  253. const date = moment(timestamp.toNumber()).valueOf()
  254. return { blockId, actives, waiting, maxSlots, date }
  255. }
  256. const updateBalances = async (api: Api, blockHash: string) => {
  257. const currentEra: number = await api.query.staking.currentEra.at(blockHash)
  258. const era = await Era.findOrCreate({ where: { id: currentEra } })
  259. try {
  260. processing = `balances ${era}`
  261. Account.findAll().then(async (account: any) => {
  262. const { key } = account
  263. if (!key) return
  264. console.log(`updating balance of`, key, key)
  265. const { data } = await getAccountAtBlock(api, blockHash, key)
  266. const { free, reserved, miscFrozen, feeFrozen } = data
  267. const balance = { available: free, reserved, frozen: miscFrozen }
  268. console.log(`balance ${era}`, balance)
  269. Balance.create(balance).then((balance: any) => {
  270. balance.setAccount(key)
  271. balance.setEra(era.id)
  272. console.log(`balance`, era.id, key, balance.available)
  273. })
  274. })
  275. } catch (e) {
  276. console.error(`balances era ${era}`)
  277. }
  278. }
  279. const fetchTokenomics = async () => {
  280. console.debug(`Updating tokenomics`)
  281. const { data } = await axios.get('https://status.joystream.org/status')
  282. if (!data) return
  283. // TODO save 'tokenomics', data
  284. }
  285. const fetchChannel = async (api: Api, id: number) => {
  286. if (id <= 0) return
  287. const exists = await Channel.findByPk(id)
  288. if (exists) return exists
  289. processing = `channel ${id}`
  290. const data = await api.query.contentWorkingGroup.channelById(id)
  291. const { handle, title, description, avatar, banner, content, created } = data
  292. // TODO const accountId = String(data.role_account)
  293. const channel = {
  294. id,
  295. handle: String(handle),
  296. title: String(title),
  297. description: String(description),
  298. avatar: String(avatar),
  299. banner: String(banner),
  300. content: String(content),
  301. publicationStatus: data.publication_status === 'Public' ? true : false,
  302. curation: String(data.curation_status),
  303. createdAt: +created,
  304. principal: Number(data.principal_id),
  305. }
  306. const chan = await Channel.create(channel)
  307. const owner = await fetchMember(api, data.owner)
  308. chan.setOwner(owner)
  309. return chan
  310. }
  311. const fetchCategory = async (api: Api, id: number) => {
  312. if (id <= 0) return
  313. const exists = await Category.findByPk(+id)
  314. if (exists) return exists
  315. processing = `category ${id}`
  316. const data = await api.query.forum.categoryById(id)
  317. const { title, description, deleted, archived } = data
  318. const category = await Category.create({
  319. id,
  320. title,
  321. threadId: +data.thread_id, // TODO needed?
  322. description,
  323. createdAt: +data.created_at.block,
  324. deleted,
  325. archived,
  326. subcategories: Number(data.num_direct_subcategories),
  327. moderatedThreads: Number(data.num_direct_moderated_threads),
  328. unmoderatedThreads: Number(data.num_direct_unmoderated_threads),
  329. //position:+data.position_in_parent_category // TODO sometimes NaN,
  330. })
  331. createModeration(api, { categoryId: id }, String(data.moderator_id), category)
  332. return category
  333. }
  334. const fetchPost = async (api: Api, id: number) => {
  335. if (id <= 0) return
  336. const exists = await Post.findByPk(id)
  337. if (exists) return exists
  338. processing = `post ${id}`
  339. const data = await api.query.forum.postById(id)
  340. const author: string = String(data.author_id)
  341. const member = await fetchMemberByAccount(api, author)
  342. const authorId = member ? member.id : null
  343. const threadId = Number(data.thread_id)
  344. const thread = await fetchThread(api, threadId)
  345. const text = data.current_text
  346. const history = data.text_change_history // TODO needed?
  347. const createdAt = data.created_at.block
  348. const post = await Post.create({ id, authorId, text, createdAt, threadId })
  349. if (data.moderation)
  350. createModeration(api, { postId: id }, data.moderation, post)
  351. return post
  352. }
  353. const createModeration = async (
  354. api: Api,
  355. where: {},
  356. key: string,
  357. object: { setModeration: (id: number) => {} }
  358. ) => {
  359. if (key === '') return
  360. await Account.findOrCreate({ where: { key } })
  361. const moderation = await Moderation.create({ moderatorKey: key })
  362. object.setModeration(moderation.id)
  363. return moderation
  364. }
  365. const fetchThread = async (api: Api, id: number) => {
  366. if (id <= 0) return
  367. const exists = await Thread.findByPk(id)
  368. if (exists) return exists
  369. processing = `thread ${id}`
  370. const data = await api.query.forum.threadById(id)
  371. const { title, moderation, nr_in_category } = data
  372. const account = String(data.author_id)
  373. const t = {
  374. id,
  375. title,
  376. nrInCategory: +nr_in_category,
  377. createdAt: +data.created_at.block,
  378. }
  379. const thread = await Thread.create(t)
  380. const category = await fetchCategory(api, +data.category_id)
  381. if (category) thread.setCategory(category.id)
  382. const author = await fetchMemberByAccount(api, account)
  383. if (author) thread.setCreator(author.id)
  384. if (moderation) {
  385. const { moderated_at, moderator_id, rationale } = moderation
  386. const created = moderated_at.block
  387. const createdAt = moment.utc(moderated_at.time)
  388. createModeration(
  389. api,
  390. { created, createdAt, rationale },
  391. moderator_id.toHuman(),
  392. thread
  393. )
  394. }
  395. return thread
  396. }
  397. const fetchCouncil = async (api: Api, round: number) => {
  398. if (round <= 0) return console.log(chalk.red(`[fetchCouncil] round:${round}`))
  399. const exists = await Council.findByPk(round)
  400. if (exists) return exists
  401. processing = `council ${round}`
  402. const start = 57601 + (round - 1) * CYCLE
  403. const end = start + TERMDURATION
  404. let council = { round, start, end, startDate: 0, endDate: 0 }
  405. let seats: Seats
  406. try {
  407. const startHash = await getBlockHash(api, start)
  408. council.startDate = await getTimestamp(api, startHash)
  409. seats = await api.query.council.activeCouncil.at(startHash)
  410. } catch (e) {
  411. return console.log(`council term ${round} lies in the future ${start}`)
  412. }
  413. try {
  414. const endHash = await getBlockHash(api, end)
  415. council.endDate = await getTimestamp(api, endHash)
  416. } catch (e) {
  417. console.warn(`end of council term ${round} lies in the future ${end}`)
  418. }
  419. try {
  420. Council.create(council).then(({ round }: any) =>
  421. seats.map(({ member, stake, backers }) =>
  422. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  423. Consul.create({
  424. stake: Number(stake),
  425. councilRound: round,
  426. memberId: id,
  427. }).then((consul: any) =>
  428. backers.map(async ({ member, stake }) =>
  429. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  430. ConsulStake.create({
  431. stake: Number(stake),
  432. consulId: consul.id,
  433. memberId: id,
  434. })
  435. )
  436. )
  437. )
  438. )
  439. )
  440. )
  441. } catch (e) {
  442. console.error(`Failed to save council ${round}`, e)
  443. }
  444. }
  445. const fetchProposal = async (api: Api, id: number) => {
  446. if (id <= 0) return
  447. const exists = await Proposal.findByPk(+id)
  448. if (exists) {
  449. fetchProposalVotes(api, exists)
  450. return exists
  451. }
  452. processing = `proposal ${id}`
  453. const proposal = await get.proposalDetail(api, id)
  454. await fetchMember(api, proposal.authorId)
  455. fetchProposalVotes(api, proposal)
  456. return Proposal.create(proposal)
  457. }
  458. const fetchProposalPost = (api: Api, threadId: number, postId: number) =>
  459. api.query.proposalsDiscussion.postThreadIdByPostId(threadId, postId)
  460. const fetchProposalPosts = async (api: Api, posts: number) => {
  461. const threads = (await api.query.proposalsDiscussion.threadCount()).toNumber()
  462. let proposalId = 1
  463. for (let id = 1; id <= posts && proposalId <= threads; ) {
  464. const exists = await ProposalPost.findByPk(id)
  465. if (exists) {
  466. id++
  467. proposalId = 1
  468. continue
  469. }
  470. processing = `proposal post ${id}/${posts} ${proposalId}/${threads}`
  471. const post = await fetchProposalPost(api, proposalId, id)
  472. if (!post.text.length) {
  473. proposalId++
  474. continue
  475. }
  476. const proposal = await Proposal.findByPk(proposalId)
  477. if (!proposal) {
  478. console.warn(`[fetchProposalPosts] proposal ${proposalId} not found.`)
  479. id++
  480. continue
  481. }
  482. ProposalPost.create({
  483. id,
  484. text: post.text.toHuman(),
  485. created: Number(post.created_at),
  486. updated: Number(post.updated_at),
  487. edition: Number(post.edition_number),
  488. authorId: Number(post.author_id),
  489. }).then((p: any) => proposal.addPost(p))
  490. id++
  491. proposalId = 1
  492. }
  493. }
  494. const fetchProposalVotes = async (api: Api, proposal: ProposalDetail) => {
  495. if (!proposal) return console.error(`[fetchProposalVotes] empty proposal`)
  496. processing = `votes proposal ${proposal.id}`
  497. const { createdAt, finalizedAt } = proposal
  498. try {
  499. const start = createdAt ? await findCouncilAtBlock(api, createdAt) : null
  500. if (start) start.addProposal(proposal.id)
  501. else
  502. return console.error(
  503. `[fetchProposalVotes] no council found for proposal ${proposal.id}`
  504. )
  505. // some proposals make it into a second term
  506. const end = finalizedAt ? await findCouncilAtBlock(api, finalizedAt) : null
  507. const councils = [start && start.round, end && end.round]
  508. const consuls = await Consul.findAll({
  509. where: { councilRound: { [Op.or]: councils } },
  510. })
  511. consuls.map(({ id, memberId }: any) =>
  512. fetchProposalVoteByConsul(api, proposal.id, id, memberId)
  513. )
  514. } catch (e) {
  515. console.log(`failed to fetch votes of proposal ${proposal.id}`, e)
  516. }
  517. }
  518. const fetchProposalVoteByConsul = async (
  519. api: Api,
  520. proposalId: number,
  521. consulId: number,
  522. memberId: number
  523. ): Promise<any> => {
  524. processing = `vote by ${consulId} for proposal ${proposalId}`
  525. const exists = await ProposalVote.findOne({
  526. where: { proposalId, memberId, consulId },
  527. })
  528. if (exists) return exists
  529. const query = api.query.proposalsEngine
  530. const args = [proposalId, memberId]
  531. const hasVoted = await query.voteExistsByProposalByVoter.size(...args)
  532. if (!hasVoted.toNumber()) return
  533. const vote = (await query.voteExistsByProposalByVoter(...args)).toHuman()
  534. await fetchMember(api, memberId) // TODO needed?
  535. return ProposalVote.create({ vote: vote, proposalId, consulId, memberId })
  536. }
  537. // accounts
  538. const getHandleOrKey = async (api: Api, key: string) => {
  539. const member = await fetchMemberByAccount(api, key)
  540. return member ? member.handle : key //abbrKey(key)
  541. }
  542. const abbrKey = (key: string) =>
  543. `${key.slice(0, 5)}..${key.slice(key.length - 5)}`
  544. const getAccountAtBlock = (
  545. api: Api,
  546. hash: string,
  547. account: string
  548. ): Promise<AccountInfo> => api.query.system.account.at(hash, account)
  549. const fetchAccounts = async (api: Api, blockId: number) => {
  550. processing = `accounts`
  551. api.query.system.account
  552. .entries()
  553. .then((account: any) =>
  554. Account.findOrCreate({ where: { key: account[0][0].toHuman()[0] } })
  555. )
  556. }
  557. const fetchMemberByAccount = async (api: Api, rootKey: string) => {
  558. const member = await Member.findOne({ where: { rootKey } })
  559. if (member) return member
  560. const id = Number(await get.memberIdByAccount(api, rootKey))
  561. if (id) return fetchMember(api, id)
  562. else Account.findOrCreate({ where: { key: rootKey } })
  563. }
  564. const fetchMember = async (
  565. api: Api,
  566. id: number
  567. ): Promise<MemberType | undefined> => {
  568. if (id <= 0) return
  569. const exists = await Member.findByPk(+id)
  570. if (exists) return exists
  571. processing = `member ${id}`
  572. const membership = await get.membership(api, id)
  573. const about = String(membership.about)
  574. const handle = String(membership.handle)
  575. const createdAt = +membership.registered_at_block
  576. const rootKey = String(membership.root_account)
  577. return Member.create({ id, about, createdAt, handle, rootKey }).then(
  578. (member: any) => {
  579. Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) =>
  580. account.setMember(id)
  581. )
  582. return member
  583. }
  584. )
  585. }
  586. module.exports = { addBlock, addBlockRange }