Explorar o código

storage-node-v2: Add query-node data paging.

- add offset and limit vars to queries
- add full db test data generating
Shamil Gadelshin %!s(int64=3) %!d(string=hai) anos
pai
achega
1ed2630d97

+ 124 - 18
storage-node-v2/scripts/generate-test-data.ts

@@ -3,47 +3,153 @@
 import fs from 'fs'
 import path from 'path'
 const fsPromises = fs.promises
-import { Client, ClientConfig } from 'pg'
+import { Client, ClientConfig,QueryResult } from 'pg'
 import { exit } from 'process'
 
 async function doJob(): Promise<void> {
   const uploadDirectory = '/Users/shamix/uploads2'
-  const objectNumber = 100
+  const fileSize = 100
 
-  const config : ClientConfig = {
-    user: 'postgres',
-    password: 'postgres',
-    database: 'query_node_processor'
+  const objectNumber = 100000
+  const bagNumber = 10
+  const bucketNumber = 150
+
+
+  const updateDb = false
+  const generateFiles = true
+
+  if (updateDb) {
+    const config : ClientConfig = {
+      user: 'postgres',
+      password: 'postgres',
+      database: 'query_node_processor'
+    }
+    const client = new Client(config)
+    await client.connect()
+
+    // Cleanup
+    await client.query('TRUNCATE storage_data_object')
+    await client.query('TRUNCATE storage_bucket CASCADE')
+    await client.query('TRUNCATE storage_bag CASCADE')
+    await client.query('TRUNCATE storage_bag_storage_bucket')
+  
+    // Generate objects
+    await createBags(client, bagNumber)
+    await createBuckets(client, bucketNumber)
+    await createBagBucketLinks(client)
+    await createBucketWorkerLinks(client)
+    const dbTasks = createDataObjects(client, objectNumber)
+    await Promise.all(dbTasks)
+
+    await client.end()
   }
-  const client = new Client(config)
-  await client.connect()
-  await client.query('TRUNCATE storage_data_object')
   
-  const data = new Uint8Array(100000000)
+  if (generateFiles) {
+    const fileTasks = createFiles(uploadDirectory, fileSize, objectNumber)
+    await Promise.all(fileTasks)
+  }
+}
+
+function createDataObjects(client: Client, objectNumber: number): Promise<QueryResult<any>>[] {
   const tasks: any[] = []
+
+  const bagId = '1'
   for(let i: number = 1; i <= objectNumber; i++){
     const name = i.toString()
 
     console.log(`Writing ${i} data object...`)
 
+    const dbTask = client.query(
+      `INSERT INTO storage_data_object(storage_bag_id, ipfs_hash, id, created_by_id, version, is_accepted, size) 
+       values(${bagId}, ${name}, ${name}, 'some', '1', true, 100)`
+    )
+
+    tasks.push(dbTask)
+  }
+
+  return tasks
+}
+
+function createFiles(uploadDirectory: string, fileSize: number, objectNumber: number): Promise<void>[] {
+  const data = new Uint8Array(fileSize)
+  const tasks: any[] = []
+  for(let i: number = 1; i <= objectNumber; i++){
+    const name = i.toString()
+
+    console.log(`Writing ${i} file...`)
+
     const fileTask = fsPromises.writeFile(
       path.join(uploadDirectory, name), 
       data
-      //Buffer.from(name, 'utf8')
     )
 
-    const dbTask = client.query(
-      `INSERT INTO storage_data_object(storage_bag_id, ipfs_hash, id, created_by_id, version, is_accepted, size) 
-       values('CO', ${name}, ${name}, 'some', '1', false, 100)`
+    tasks.push(fileTask)
+  }
+
+  return tasks
+}
+
+async function createBags(client: Client, bagNumber: number): Promise<void> {
+  for(let i: number = 1; i <= bagNumber; i++){
+    const name = i.toString()
+
+    console.log(`Writing ${i} bag...`)
+
+    await client.query(
+      `INSERT INTO storage_bag(id, created_by_id, version, owner) 
+       values(${name}, 'some', '1',  '{}')`
     )
+  }
+}
 
-    tasks.push(dbTask)
-    tasks.push(fileTask)
+async function createBuckets(client: Client, bucketNumber: number): Promise<void> {
+  const missingWorkerId = `{"isTypeOf": "StorageBucketOperatorStatusMissing"}`
+  for(let i: number = 1; i <= bucketNumber; i++){
+    const name = i.toString()
+
+    console.log(`Writing ${i} bucket...`)
+
+    await client.query(
+      `INSERT INTO storage_bucket(id, created_by_id, version, operator_status, accepting_new_bags, data_objects_size_limit,data_object_count_limit) 
+       values(${name}, 'some', '1',  '${missingWorkerId}', true, 100000000, 100000000)`
+    )
   }
+}
 
-  await Promise.all(tasks)
 
-  await client.end()
+async function createBagBucketLinks(client: Client): Promise<void> {
+    console.log(`Writing bag to bucket links...`)
+
+    // Bucket1 to Bag1
+    await client.query(
+      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
+       values('1', '1')`
+    )
+    // Bucket2 to Bag1
+    await client.query(
+      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
+       values('1', '2')`
+    )
+}
+
+async function createBucketWorkerLinks(client: Client): Promise<void> {
+    console.log(`Writing bucket worker links...`)
+
+    const assignedWorker0 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 0}`
+    const assignedWorker1 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 1}`
+
+    // Bucket1 to Worker0
+    await client.query(
+      `UPDATE storage_bucket
+       SET operator_status = '${assignedWorker0}'
+       WHERE id = '1'`
+    )
+    // Bucket2 to Worker1
+    await client.query(
+      `UPDATE storage_bucket
+       SET operator_status = '${assignedWorker1}'
+       WHERE id = '2'`
+    )
 }
 
 doJob().then(() => {

+ 66 - 35
storage-node-v2/src/services/queryNode/api.ts

@@ -10,9 +10,6 @@ import {
   GetStorageBucketDetails,
   GetStorageBucketDetailsQuery,
   GetStorageBucketDetailsQueryVariables,
-  GetAllStorageBucketDetails,
-  GetAllStorageBucketDetailsQuery,
-  GetAllStorageBucketDetailsQueryVariables,
   StorageBucketDetailsFragment,
   GetStorageBagDetailsQuery,
   GetStorageBagDetails,
@@ -51,10 +48,16 @@ export class QueryNodeApi {
     variables: VariablesT,
     resultKey: keyof QueryT
   ): Promise<Required<QueryT>[keyof QueryT] | null> {
-    return (
-      (await this.apolloClient.query<QueryT, VariablesT>({ query, variables }))
-        .data[resultKey] || null
-    )
+    const result = await this.apolloClient.query<QueryT, VariablesT>({
+      query,
+      variables,
+    })
+
+    if (result?.data === null) {
+      return null
+    }
+
+    return result.data[resultKey]
   }
 
   // Get entities by "non-unique" input and return first result
@@ -66,10 +69,15 @@ export class QueryNodeApi {
     variables: VariablesT,
     resultKey: keyof QueryT
   ): Promise<QueryT[keyof QueryT][number] | null> {
-    return (
-      (await this.apolloClient.query<QueryT, VariablesT>({ query, variables }))
-        .data[resultKey][0] || null
-    )
+    const result = await this.apolloClient.query<QueryT, VariablesT>({
+      query,
+      variables,
+    })
+
+    if (result?.data === null) {
+      return null
+    }
+    return result.data[resultKey][0]
   }
 
   // Query-node: get multiple entities
@@ -80,48 +88,71 @@ export class QueryNodeApi {
     query: DocumentNode,
     variables: VariablesT,
     resultKey: keyof QueryT
-  ): Promise<QueryT[keyof QueryT]> {
-    return (
-      await this.apolloClient.query<QueryT, VariablesT>({ query, variables })
-    ).data[resultKey]
+  ): Promise<QueryT[keyof QueryT] | null> {
+    const result = await this.apolloClient.query<QueryT, VariablesT>({
+      query,
+      variables,
+    })
+
+    if (result?.data === null) {
+      return null
+    }
+    return result.data[resultKey]
   }
 
-  public getStorageBucketDetails(
-    objectId: string
-  ): Promise<StorageBucketDetailsFragment | null> {
-    return this.uniqueEntityQuery<
+  public async getStorageBucketDetails(
+    offset: number,
+    limit: number
+  ): Promise<Array<StorageBucketDetailsFragment>> {
+    const result = await this.multipleEntitiesQuery<
       GetStorageBucketDetailsQuery,
       GetStorageBucketDetailsQueryVariables
-    >(GetStorageBucketDetails, { id: objectId }, 'storageBucketByUniqueInput')
-  }
+    >(GetStorageBucketDetails, { offset, limit }, 'storageBuckets')
+
+    if (result === null) {
+      return []
+    }
 
-  public getAllStorageBucketDetails(): Promise<
-    Array<StorageBucketDetailsFragment>
-  > {
-    return this.multipleEntitiesQuery<
-      GetAllStorageBucketDetailsQuery,
-      GetAllStorageBucketDetailsQueryVariables
-    >(GetAllStorageBucketDetails, {}, 'storageBuckets')
+    return result
   }
 
-  public getStorageBagsDetails(
-    bucketIds: string[]
+  public async getStorageBagsDetails(
+    bucketIds: string[],
+    offset: number,
+    limit: number
   ): Promise<Array<StorageBagDetailsFragment>> {
     const input: StorageBucketWhereInput = { id_in: bucketIds }
-    return this.multipleEntitiesQuery<
+    const result = await this.multipleEntitiesQuery<
       GetStorageBagDetailsQuery,
       GetStorageBagDetailsQueryVariables
-    >(GetStorageBagDetails, { bucketIds: input }, 'storageBags')
+    >(GetStorageBagDetails, { offset, limit, bucketIds: input }, 'storageBags')
+
+    if (result === null) {
+      return []
+    }
+
+    return result
   }
 
-  public getDataObjectDetails(
+  public async getDataObjectDetails(
     bagIds: string[],
+    offset: number,
     limit: number
   ): Promise<Array<DataObjectDetailsFragment>> {
     const input: StorageBagWhereInput = { id_in: bagIds }
-    return this.multipleEntitiesQuery<
+    const result = await this.multipleEntitiesQuery<
       GetDataObjectDetailsQuery,
       GetDataObjectDetailsQueryVariables
-    >(GetDataObjectDetails, { limit, bagIds: input }, 'storageDataObjects')
+    >(
+      GetDataObjectDetails,
+      { offset, limit, bagIds: input },
+      'storageDataObjects'
+    )
+
+    if (result === null) {
+      return []
+    }
+
+    return result
   }
 }

+ 27 - 23
storage-node-v2/src/services/queryNode/generated/queries.ts

@@ -8,18 +8,11 @@ export type StorageBucketDetailsFragment = {
 }
 
 export type GetStorageBucketDetailsQueryVariables = Types.Exact<{
-  id: Types.Scalars['ID']
+  offset?: Types.Maybe<Types.Scalars['Int']>
+  limit?: Types.Maybe<Types.Scalars['Int']>
 }>
 
 export type GetStorageBucketDetailsQuery = {
-  storageBucketByUniqueInput?: Types.Maybe<StorageBucketDetailsFragment>
-}
-
-export type GetAllStorageBucketDetailsQueryVariables = Types.Exact<{
-  [key: string]: never
-}>
-
-export type GetAllStorageBucketDetailsQuery = {
   storageBuckets: Array<StorageBucketDetailsFragment>
 }
 
@@ -30,6 +23,8 @@ export type StorageBagDetailsFragment = {
 
 export type GetStorageBagDetailsQueryVariables = Types.Exact<{
   bucketIds?: Types.Maybe<Types.StorageBucketWhereInput>
+  offset?: Types.Maybe<Types.Scalars['Int']>
+  limit?: Types.Maybe<Types.Scalars['Int']>
 }>
 
 export type GetStorageBagDetailsQuery = {
@@ -43,6 +38,7 @@ export type DataObjectDetailsFragment = {
 
 export type GetDataObjectDetailsQueryVariables = Types.Exact<{
   bagIds?: Types.Maybe<Types.StorageBagWhereInput>
+  offset?: Types.Maybe<Types.Scalars['Int']>
   limit?: Types.Maybe<Types.Scalars['Int']>
 }>
 
@@ -81,32 +77,40 @@ export const DataObjectDetails = gql`
   }
 `
 export const GetStorageBucketDetails = gql`
-  query getStorageBucketDetails($id: ID!) {
-    storageBucketByUniqueInput(where: { id: $id }) {
-      ...StorageBucketDetails
-    }
-  }
-  ${StorageBucketDetails}
-`
-export const GetAllStorageBucketDetails = gql`
-  query getAllStorageBucketDetails {
-    storageBuckets {
+  query getStorageBucketDetails($offset: Int, $limit: Int) {
+    storageBuckets(offset: $offset, limit: $limit) {
       ...StorageBucketDetails
     }
   }
   ${StorageBucketDetails}
 `
 export const GetStorageBagDetails = gql`
-  query getStorageBagDetails($bucketIds: StorageBucketWhereInput) {
-    storageBags(where: { storedBy_some: $bucketIds }) {
+  query getStorageBagDetails(
+    $bucketIds: StorageBucketWhereInput
+    $offset: Int
+    $limit: Int
+  ) {
+    storageBags(
+      offset: $offset
+      limit: $limit
+      where: { storedBy_some: $bucketIds }
+    ) {
       ...StorageBagDetails
     }
   }
   ${StorageBagDetails}
 `
 export const GetDataObjectDetails = gql`
-  query getDataObjectDetails($bagIds: StorageBagWhereInput, $limit: Int) {
-    storageDataObjects(limit: $limit, where: { storageBag: $bagIds }) {
+  query getDataObjectDetails(
+    $bagIds: StorageBagWhereInput
+    $offset: Int
+    $limit: Int
+  ) {
+    storageDataObjects(
+      offset: $offset
+      limit: $limit
+      where: { storageBag: $bagIds, isAccepted_eq: true }
+    ) {
       ...DataObjectDetails
     }
   }

+ 22 - 12
storage-node-v2/src/services/queryNode/queries/queries.graphql

@@ -11,14 +11,8 @@ fragment StorageBucketDetails on StorageBucket {
   }
 }
 
-query getStorageBucketDetails($id: ID!) {
-  storageBucketByUniqueInput(where: { id: $id }) {
-    ...StorageBucketDetails
-  }
-}
-
-query getAllStorageBucketDetails {
-  storageBuckets {
+query getStorageBucketDetails($offset: Int, $limit: Int) {
+  storageBuckets(offset: $offset, limit: $limit) {
     ...StorageBucketDetails
   }
 }
@@ -30,8 +24,16 @@ fragment StorageBagDetails on StorageBag {
   }
 }
 
-query getStorageBagDetails($bucketIds: StorageBucketWhereInput) {
-  storageBags(where: { storedBy_some: $bucketIds }) {
+query getStorageBagDetails(
+  $bucketIds: StorageBucketWhereInput
+  $offset: Int
+  $limit: Int
+) {
+  storageBags(
+    offset: $offset
+    limit: $limit
+    where: { storedBy_some: $bucketIds }
+  ) {
     ...StorageBagDetails
   }
 }
@@ -43,8 +45,16 @@ fragment DataObjectDetails on StorageDataObject {
   }
 }
 
-query getDataObjectDetails($bagIds: StorageBagWhereInput, $limit: Int) {
-  storageDataObjects(limit: $limit, where: { storageBag: $bagIds }) {
+query getDataObjectDetails(
+  $bagIds: StorageBagWhereInput
+  $offset: Int
+  $limit: Int
+) {
+  storageDataObjects(
+    offset: $offset
+    limit: $limit
+    where: { storageBag: $bagIds, isAccepted_eq: true }
+  ) {
     ...DataObjectDetails
   }
 }

+ 61 - 6
storage-node-v2/src/services/sync/dataObjectsModel.ts

@@ -1,5 +1,11 @@
 import { QueryNodeApi } from '../../services/queryNode/api'
+import logger from '../../services/logger'
 import { u8aToString, hexToU8a } from '@polkadot/util'
+import {
+  StorageBagDetailsFragment,
+  StorageBucketDetailsFragment,
+  DataObjectDetailsFragment,
+} from '../queryNode/generated/queries'
 
 type Model = {
   storageBuckets: StorageBucket[]
@@ -28,19 +34,16 @@ export async function getRuntimeModel(
   workerId: number
 ): Promise<Model> {
   const api = new QueryNodeApi(queryNodeUrl)
-  // TODO: graphql response entries limit
-  const limit = 1000
-  // TODO: get accepted data objects only
 
-  let allBuckets = await api.getAllStorageBucketDetails()
+  let allBuckets = await getAllBuckets(api)
 
   let bucketIds = allBuckets
     .filter((bucket) => bucket.operatorStatus?.workerId === workerId)
     .map((bucket) => bucket.id)
-  let assignedBags = await api.getStorageBagsDetails(bucketIds)
+  let assignedBags = await getAllAssignedBags(api, bucketIds)
 
   let bagIds = assignedBags.map((bag) => bag.id)
-  let assignedDataObjects = await api.getDataObjectDetails(bagIds, limit)
+  let assignedDataObjects = await getAllAssignedDataObjects(api, bagIds)
 
   const model: Model = {
     storageBuckets: allBuckets.map((bucket) => ({
@@ -60,3 +63,55 @@ export async function getRuntimeModel(
 
   return model
 }
+
+async function getAllBuckets(
+  api: QueryNodeApi
+): Promise<StorageBucketDetailsFragment[]> {
+  return await getAllObjectsWithPaging(
+    'all storage buckets',
+    async (offset, limit) => await api.getStorageBucketDetails(offset, limit)
+  )
+}
+
+async function getAllAssignedDataObjects(
+  api: QueryNodeApi,
+  bagIds: string[]
+): Promise<DataObjectDetailsFragment[]> {
+  return await getAllObjectsWithPaging(
+    'assigned data objects',
+    async (offset, limit) =>
+      await api.getDataObjectDetails(bagIds, offset, limit)
+  )
+}
+
+async function getAllAssignedBags(
+  api: QueryNodeApi,
+  bucketIds: string[]
+): Promise<StorageBagDetailsFragment[]> {
+  return await getAllObjectsWithPaging(
+    'assigned bags',
+    async (offset, limit) =>
+      await api.getStorageBagsDetails(bucketIds, offset, limit)
+  )
+}
+
+async function getAllObjectsWithPaging<T>(
+  objectName: string,
+  query: (offset: number, limit: number) => Promise<T[]>
+): Promise<T[]> {
+  let result = []
+  const limit = 1000 // TODO: make as parameter?
+  let offset = 0
+
+  let resultPart = []
+  do {
+    logger.debug(`Sync - getting ${objectName}: offset = ${offset}, limit = ${limit}`)
+    resultPart = await query(offset, limit)
+    offset += limit
+    result.push(...resultPart)
+
+    if (resultPart.length < limit) break
+  } while (resultPart.length > 0)
+
+  return result
+}

+ 12 - 12
storage-node-v2/src/services/sync/synchronizer.ts

@@ -1,4 +1,5 @@
 import { getRuntimeModel } from '../../services/sync/dataObjectsModel'
+import logger from '../../services/logger'
 import _ from 'lodash'
 import fs from 'fs'
 import path from 'path'
@@ -13,24 +14,25 @@ const fsPromises = fs.promises
 export async function performSync(): Promise<void> {
   const queryNodeUrl = 'http://localhost:8081/graphql'
   const workerId = 1
-  const processNumber = 3
+  const processNumber = 30
   const uploadDirectory = '/Users/shamix/uploads'
   const operatorUrl = 'http://localhost:3333/'
 
+  logger.info('Started syncing...')
   const [model, files] = await Promise.all([
     getRuntimeModel(queryNodeUrl, workerId),
     getLocalFileNames(uploadDirectory),
   ])
-  console.log(model)
-  console.log(files)
+  // console.log(model)
+  // console.log(files)
 
   const requiredCids = model.dataObjects.map((obj) => obj.cid)
 
   const added = _.difference(requiredCids, files)
   const deleted = _.difference(files, requiredCids)
 
-  console.log(`Added: ${added}`)
-  console.log(`Deleted: ${deleted}`)
+  logger.debug(`Sync - added objects: ${added.length}`)
+  logger.debug(`Sync - deleted objects: ${deleted.length}`)
 
   const deletedTasks = deleted.map(
     (fileName) => new DeleteLocalFileTask(uploadDirectory, fileName)
@@ -39,6 +41,7 @@ export async function performSync(): Promise<void> {
     (fileName) => new DownloadFileTask(operatorUrl, fileName, uploadDirectory)
   )
 
+  logger.debug(`Sync - started processing...`)
   const workingStack = new WorkingStack()
   const processSpawner = new TaskProcessorSpawner(workingStack, processNumber)
 
@@ -46,9 +49,7 @@ export async function performSync(): Promise<void> {
   workingStack.add(addedTasks)
 
   await processSpawner.process()
-  // const tasks: SyncTask[] = [...deletedTasks, ...addedTasks]
-
-  // await Promise.all(tasks.map((task) => task.execute()))
+  logger.info('Sync ended.')
 }
 
 async function getLocalFileNames(directory: string): Promise<string[]> {
@@ -70,7 +71,7 @@ class DeleteLocalFileTask implements SyncTask {
   }
 
   description(): string {
-    return `Deleting local file: ${this.filename} ....`
+    return `Sync - deleting local file: ${this.filename} ....`
   }
 
   async execute(): Promise<void> {
@@ -89,7 +90,7 @@ class DownloadFileTask implements SyncTask {
   }
 
   description(): string {
-    return `Downloading file: ${this.url} as ${this.filepath} ....`
+    return `Sync - downloading file: ${this.url} as ${this.filepath} ....`
   }
 
   async execute(): Promise<void> {
@@ -175,11 +176,10 @@ class TaskProcessor {
 
   async process(): Promise<void> {
     while (true) {
-      console.log('Processing....')
       const task = await this.taskSource.get()
 
       if (task !== null) {
-        console.log(task.description())
+        logger.debug(task.description())
         await task.execute()
       } else {
         if (this.exitOnCompletion) {

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 32 - 521
yarn.lock


Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio