Quellcode durchsuchen

remove buckets, save events directly to collections (#33)

Klaudiusz Dembler vor 2 Jahren
Ursprung
Commit
13da370a43

+ 1 - 1
docker-compose.yml

@@ -6,7 +6,7 @@ services:
       - ORION_PORT=6116
       - ORION_MONGO_HOSTNAME=mongo
       - ORION_FEATURED_CONTENT_SECRET=change_me_please
-      - ORION_QUERY_NODE_URL=change_me_please
+      - ORION_QUERY_NODE_URL=https://hydra.joystream.org/graphql
     ports:
       - '127.0.0.1:6116:6116'
     restart: always

+ 7 - 22
src/aggregates/follows.ts

@@ -1,22 +1,13 @@
 import { GenericAggregate } from './shared'
-import {
-  ChannelEvent,
-  ChannelEventsBucketModel,
-  ChannelEventType,
-  UnsequencedChannelEvent,
-} from '../models/ChannelEvent'
+import { ChannelEvent, ChannelEventModel, ChannelEventType } from '../models/ChannelEvent'
 
 import { ChannelFollowsInfo } from '../entities/ChannelFollowsInfo'
 import { mapPeriods } from '../helpers'
 import { differenceInCalendarDays } from 'date-fns'
 
-type ChannelEventsAggregationResult = {
-  events?: ChannelEvent[]
-}[]
-
 type TimePeriodEventsData = {
-  sevenDays: Partial<UnsequencedChannelEvent>[]
-  thirtyDays: Partial<UnsequencedChannelEvent>[]
+  sevenDays: Partial<ChannelEvent>[]
+  thirtyDays: Partial<ChannelEvent>[]
 }
 
 type TimePeriodFollows = {
@@ -57,9 +48,9 @@ export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
   }
 
   private addOrRemoveFollowEvent(
-    array: Partial<UnsequencedChannelEvent>[],
+    array: Partial<ChannelEvent>[],
     eventType: ChannelEventType,
-    { channelId, timestamp }: UnsequencedChannelEvent
+    { channelId, timestamp }: ChannelEvent
   ): void {
     if (eventType === ChannelEventType.FollowChannel) {
       array.push({ channelId, timestamp })
@@ -110,13 +101,7 @@ export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
   }
 
   public static async Build(): Promise<FollowsAggregate> {
-    const aggregation: ChannelEventsAggregationResult = await ChannelEventsBucketModel.aggregate([
-      { $unwind: '$events' },
-      { $group: { _id: null, allEvents: { $push: '$events' } } },
-      { $project: { events: '$allEvents' } },
-    ])
-
-    const events = aggregation[0]?.events || []
+    const events = await ChannelEventModel.find({}).lean()
 
     const aggregate = new FollowsAggregate()
     events.forEach((event) => {
@@ -129,7 +114,7 @@ export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
     return aggregate
   }
 
-  public applyEvent(event: UnsequencedChannelEvent) {
+  public applyEvent(event: ChannelEvent) {
     const { type, ...eventWithoutType } = event
     const { channelId } = eventWithoutType
     const currentChannelFollows = this.channelFollowsMap[channelId] || 0

+ 1 - 3
src/aggregates/shared.ts

@@ -1,5 +1,3 @@
-import { GenericEvent } from '../models/shared'
-
-export interface GenericAggregate<EventType = GenericEvent> {
+export interface GenericAggregate<EventType> {
   applyEvent: (event: EventType) => void
 }

+ 5 - 15
src/aggregates/views.ts

@@ -1,15 +1,11 @@
-import { UnsequencedVideoEvent, VideoEvent, VideoEventsBucketModel, VideoEventType } from '../models/VideoEvent'
+import { VideoEvent, VideoEventModel, VideoEventType } from '../models/VideoEvent'
 import { EntityViewsInfo } from '../entities/EntityViewsInfo'
 import { mapPeriods } from '../helpers'
 import { differenceInCalendarDays } from 'date-fns'
 
-type VideoEventsAggregationResult = {
-  events?: VideoEvent[]
-}[]
-
 type TimePeriodEventsData = {
-  sevenDays: Partial<UnsequencedVideoEvent>[]
-  thirtyDays: Partial<UnsequencedVideoEvent>[]
+  sevenDays: Partial<VideoEvent>[]
+  thirtyDays: Partial<VideoEvent>[]
 }
 
 type TimePeriodViews = {
@@ -133,13 +129,7 @@ export class ViewsAggregate {
   }
 
   public static async Build() {
-    const aggregation: VideoEventsAggregationResult = await VideoEventsBucketModel.aggregate([
-      { $unwind: '$events' },
-      { $group: { _id: null, allEvents: { $push: '$events' } } },
-      { $project: { events: '$allEvents' } },
-    ])
-
-    const events = aggregation[0]?.events || []
+    const events = await VideoEventModel.find({}).lean()
 
     const aggregate = new ViewsAggregate()
     events.forEach((event) => {
@@ -152,7 +142,7 @@ export class ViewsAggregate {
     return aggregate
   }
 
-  public applyEvent(event: UnsequencedVideoEvent) {
+  public applyEvent(event: VideoEvent) {
     const { type, ...eventWithoutType } = event
     const { videoId, channelId, categoryId } = eventWithoutType
     const currentVideoViews = videoId ? this.videoViewsMap[videoId] || 0 : 0

+ 0 - 8
src/config.ts

@@ -25,7 +25,6 @@ const loadEnvVar = (name: string, { defaultValue, devDefaultValue }: LoadEnvVarO
 
 export class Config {
   private _port: number
-  private _bucketSize: number
   private _mongoDBUri: string
   private _featuredContentSecret: string
   private _queryNodeUrl: string
@@ -35,10 +34,6 @@ export class Config {
     return this._port
   }
 
-  get bucketSize(): number {
-    return this._bucketSize
-  }
-
   get mongoDBUri(): string {
     return this._mongoDBUri
   }
@@ -61,9 +56,6 @@ export class Config {
     const rawPort = loadEnvVar('ORION_PORT', { defaultValue: '6116' })
     this._port = parseInt(rawPort)
 
-    const rawBucketSize = loadEnvVar('ORION_BUCKET_SIZE', { defaultValue: '50000' })
-    this._bucketSize = parseInt(rawBucketSize)
-
     const mongoHostname = loadEnvVar('ORION_MONGO_HOSTNAME', { devDefaultValue: 'localhost' })
     const rawMongoPort = loadEnvVar('ORION_MONGO_PORT', { defaultValue: '27017' })
     const mongoDatabase = loadEnvVar('ORION_MONGO_DATABASE', { defaultValue: 'orion' })

+ 10 - 12
src/models/ChannelEvent.ts

@@ -1,30 +1,28 @@
 import { getModelForClass, prop } from '@typegoose/typegoose'
-import { GenericBucket, GenericEvent, insertEventIntoBucket } from './shared'
 
 export enum ChannelEventType {
   FollowChannel = 'FOLLOW_CHANNEL',
   UnfollowChannel = 'UNFOLLOW_CHANNEL',
 }
 
-export class ChannelEvent extends GenericEvent {
+export class ChannelEvent {
   @prop({ required: true, index: true })
   channelId: string
 
-  @prop({ required: true, index: true, enum: ChannelEventType })
-  declare type: ChannelEventType
-}
+  @prop({ required: true })
+  timestamp: Date
 
-export type UnsequencedChannelEvent = Omit<ChannelEvent, '_id'>
+  @prop({ required: false, index: true })
+  actorId?: string
 
-class ChannelEventsBucket extends GenericBucket {
-  @prop({ required: true, type: () => [ChannelEvent] })
-  declare events: ChannelEvent[]
+  @prop({ required: true, index: true, enum: ChannelEventType })
+  type: ChannelEventType
 }
 
-export const ChannelEventsBucketModel = getModelForClass(ChannelEventsBucket, {
+export const ChannelEventModel = getModelForClass(ChannelEvent, {
   schemaOptions: { collection: 'channelEvents' },
 })
 
-export const saveChannelEvent = async (unsequencedChannelEvent: UnsequencedChannelEvent) => {
-  return await insertEventIntoBucket(unsequencedChannelEvent, ChannelEventsBucketModel)
+export const saveChannelEvent = (event: ChannelEvent) => {
+  return ChannelEventModel.create(event)
 }

+ 10 - 14
src/models/VideoEvent.ts

@@ -1,11 +1,10 @@
 import { getModelForClass, prop } from '@typegoose/typegoose'
-import { GenericBucket, GenericEvent, insertEventIntoBucket } from './shared'
 
 export enum VideoEventType {
   AddView = 'ADD_VIEW',
 }
 
-export class VideoEvent extends GenericEvent {
+export class VideoEvent {
   @prop({ required: true, index: true })
   videoId: string
 
@@ -15,21 +14,18 @@ export class VideoEvent extends GenericEvent {
   @prop({ required: false, index: true })
   categoryId?: string
 
-  @prop({ required: true, index: true, enum: VideoEventType })
-  declare type: VideoEventType
-}
+  @prop({ required: true })
+  timestamp: Date
 
-export type UnsequencedVideoEvent = Omit<VideoEvent, '_id'>
+  @prop({ required: false, index: true })
+  actorId?: string
 
-class VideoEventsBucket extends GenericBucket {
-  @prop({ required: true, type: () => [VideoEvent] })
-  declare events: VideoEvent[]
+  @prop({ required: true, index: true, enum: VideoEventType })
+  type: VideoEventType
 }
 
-export const VideoEventsBucketModel = getModelForClass(VideoEventsBucket, {
-  schemaOptions: { collection: 'videoEvents' },
-})
+export const VideoEventModel = getModelForClass(VideoEvent, { schemaOptions: { collection: 'videoEvents' } })
 
-export const saveVideoEvent = async (unsequencedVideoEvent: UnsequencedVideoEvent) => {
-  return await insertEventIntoBucket(unsequencedVideoEvent, VideoEventsBucketModel)
+export const saveVideoEvent = (event: VideoEvent) => {
+  return VideoEventModel.create(event)
 }

+ 0 - 71
src/models/shared.ts

@@ -1,71 +0,0 @@
-import { DocumentType, prop, ReturnModelType } from '@typegoose/typegoose'
-import config from '../config'
-
-export class GenericEvent {
-  @prop()
-  _id: number
-
-  @prop({ required: true })
-  timestamp: Date
-
-  @prop({ required: false, index: true })
-  actorId?: string
-
-  type: unknown
-}
-
-type UnsequencedGenericEvent = Omit<GenericEvent, '_id'>
-
-export class GenericBucket {
-  @prop()
-  _id: number
-
-  @prop({ required: true, index: true })
-  firstTimestamp: Date
-
-  @prop({ required: true, index: true })
-  lastTimestamp: Date
-
-  @prop({ required: true })
-  size: number
-
-  events: GenericEvent[]
-}
-
-export const insertEventIntoBucket = async (
-  unsequencedEvent: UnsequencedGenericEvent,
-  bucketModel: ReturnModelType<typeof GenericBucket>
-) => {
-  const lastBucket = await bucketModel.findOne().sort({ _id: -1 })
-  const lastEventId = lastBucket?.events[lastBucket?.events.length - 1]._id ?? -1
-  const event: GenericEvent = {
-    ...unsequencedEvent,
-    _id: lastEventId + 1,
-  }
-
-  if (!lastBucket || lastBucket.size >= config.bucketSize) {
-    return await createNewBucketFromEvent(lastBucket, event, bucketModel)
-  }
-
-  lastBucket.events.push(event)
-  lastBucket.size++
-  lastBucket.lastTimestamp = event.timestamp
-  await lastBucket.save()
-}
-
-const createNewBucketFromEvent = async (
-  lastBucket: DocumentType<GenericBucket> | null,
-  event: GenericEvent,
-  bucketModel: ReturnModelType<typeof GenericBucket>
-) => {
-  const newBucketId = lastBucket ? lastBucket._id + 1 : 0
-  const newBucket: GenericBucket = {
-    _id: newBucketId,
-    firstTimestamp: event.timestamp,
-    lastTimestamp: event.timestamp,
-    size: 1,
-    events: [event],
-  }
-
-  await bucketModel.create(newBucket)
-}

+ 3 - 3
src/resolvers/followsInfo.ts

@@ -1,6 +1,6 @@
 import { Args, ArgsType, Ctx, Field, ID, Mutation, Resolver } from 'type-graphql'
 import { ChannelFollowsInfo } from '../entities/ChannelFollowsInfo'
-import { UnsequencedChannelEvent, ChannelEventType, saveChannelEvent } from '../models/ChannelEvent'
+import { ChannelEvent, ChannelEventType, saveChannelEvent } from '../models/ChannelEvent'
 import { OrionContext } from '../types'
 
 @ArgsType()
@@ -19,7 +19,7 @@ class UnfollowChannelArgs extends ChannelFollowsArgs {}
 export class ChannelFollowsInfosResolver {
   @Mutation(() => ChannelFollowsInfo, { description: 'Add a single follow to the target channel' })
   async followChannel(@Args() { channelId }: FollowChannelArgs, @Ctx() ctx: OrionContext): Promise<ChannelFollowsInfo> {
-    const event: UnsequencedChannelEvent = {
+    const event: ChannelEvent = {
       channelId,
       type: ChannelEventType.FollowChannel,
       timestamp: new Date(),
@@ -37,7 +37,7 @@ export class ChannelFollowsInfosResolver {
     @Args() { channelId }: UnfollowChannelArgs,
     @Ctx() ctx: OrionContext
   ): Promise<ChannelFollowsInfo> {
-    const event: UnsequencedChannelEvent = {
+    const event: ChannelEvent = {
       channelId,
       type: ChannelEventType.UnfollowChannel,
       timestamp: new Date(),

+ 2 - 2
src/resolvers/viewsInfo.ts

@@ -2,7 +2,7 @@ import { Args, ArgsType, Ctx, Field, ID, Int, Mutation, Query, Resolver } from '
 import { Min, Max, IsIn } from 'class-validator'
 import { EntityViewsInfo } from '../entities/EntityViewsInfo'
 import { mapPeriods } from '../helpers'
-import { UnsequencedVideoEvent, VideoEventType, saveVideoEvent } from '../models/VideoEvent'
+import { VideoEventType, saveVideoEvent, VideoEvent } from '../models/VideoEvent'
 import { OrionContext } from '../types'
 
 @ArgsType()
@@ -66,7 +66,7 @@ export class VideoViewsInfosResolver {
     @Args() { videoId, channelId, categoryId }: AddVideoViewArgs,
     @Ctx() ctx: OrionContext
   ): Promise<EntityViewsInfo> {
-    const event: UnsequencedVideoEvent = {
+    const event: VideoEvent = {
       videoId,
       channelId,
       categoryId,

+ 4 - 26
tests/follows.test.ts

@@ -14,12 +14,11 @@ import {
   GetMostFollowedChannelsConnectionArgs,
 } from './queries/follows'
 import { ChannelFollowsInfo } from '../src/entities/ChannelFollowsInfo'
-import { ChannelEventsBucketModel } from '../src/models/ChannelEvent'
-import { TEST_BUCKET_SIZE } from './setup'
 import { createMutationFn, createQueryFn, MutationFn, QueryFn } from './helpers'
+import { ChannelEventModel } from '../src/models/ChannelEvent'
 
-const FIRST_CHANNEL_ID = '22'
-const SECOND_CHANNEL_ID = '23'
+const FIRST_CHANNEL_ID = '6'
+const SECOND_CHANNEL_ID = '7'
 
 describe('Channel follows resolver', () => {
   let server: ApolloServer
@@ -39,7 +38,7 @@ describe('Channel follows resolver', () => {
 
   afterEach(async () => {
     await server.stop()
-    await ChannelEventsBucketModel.deleteMany({})
+    await ChannelEventModel.deleteMany({})
     await mongoose.disconnect()
   })
 
@@ -213,25 +212,4 @@ describe('Channel follows resolver', () => {
 
     await checkFollows()
   })
-
-  it('should properly handle saving events across buckets', async () => {
-    const eventsCount = TEST_BUCKET_SIZE * 2 + 1
-
-    const expectedChannelFollows: ChannelFollowsInfo = {
-      id: FIRST_CHANNEL_ID,
-      follows: eventsCount,
-    }
-
-    for (let i = 0; i < eventsCount; i++) {
-      await followChannel(FIRST_CHANNEL_ID)
-    }
-    const expectedMostFollowedChannels = {
-      edges: [expectedChannelFollows].map((follow) => ({ node: follow })),
-    }
-
-    const mostFollowedChannels = await getMostFollowedChannels(30)
-    const mostFollowedChannelsAllTime = await getMostFollowedChannels(null)
-    expect(mostFollowedChannels).toEqual(expectedMostFollowedChannels)
-    expect(mostFollowedChannelsAllTime).toEqual(expectedMostFollowedChannels)
-  })
 })

+ 2 - 4
tests/setup.ts

@@ -1,5 +1,3 @@
-export const TEST_BUCKET_SIZE = 20
+process.env.ORION_QUERY_NODE_URL = 'https://atlas-dev.joystream.app/query-node/server/graphql'
 
-jest.mock('../src/config', () => ({ bucketSize: TEST_BUCKET_SIZE }))
-
-process.env.ORION_QUERY_NODE_URL = 'https://sumer-dev-2.joystream.app/query/server/graphql'
+export {}

+ 7 - 29
tests/views.test.ts

@@ -20,15 +20,14 @@ import {
   GetMostViewedCategoriesAllTime,
 } from './queries/views'
 import { EntityViewsInfo } from '../src/entities/EntityViewsInfo'
-import { VideoEventsBucketModel } from '../src/models/VideoEvent'
-import { TEST_BUCKET_SIZE } from './setup'
+import { VideoEventModel } from '../src/models/VideoEvent'
 import { createMutationFn, createQueryFn, MutationFn, QueryFn } from './helpers'
 
-const FIRST_VIDEO_ID = '12'
-const SECOND_VIDEO_ID = '13'
-const FIRST_CHANNEL_ID = '22'
-const SECOND_CHANNEL_ID = '23'
-const FIRST_CATEGORY_ID = '32'
+const FIRST_VIDEO_ID = '7'
+const SECOND_VIDEO_ID = '8'
+const FIRST_CHANNEL_ID = '6'
+const SECOND_CHANNEL_ID = '7'
+const FIRST_CATEGORY_ID = '1'
 
 describe('Video and channel views resolver', () => {
   let server: ApolloServer
@@ -48,7 +47,7 @@ describe('Video and channel views resolver', () => {
 
   afterEach(async () => {
     await server.stop()
-    await VideoEventsBucketModel.deleteMany({})
+    await VideoEventModel.deleteMany({})
     await mongoose.disconnect()
   })
 
@@ -324,25 +323,4 @@ describe('Video and channel views resolver', () => {
 
     await checkViews()
   })
-
-  it('should properly handle saving events across buckets', async () => {
-    const eventsCount = TEST_BUCKET_SIZE * 2 + 1
-    const expectedVideoViews: EntityViewsInfo = {
-      id: FIRST_VIDEO_ID,
-      views: eventsCount,
-    }
-
-    const expectedMostViewedVideos = {
-      edges: [expectedVideoViews].map((view) => ({ node: view })),
-    }
-
-    for (let i = 0; i < eventsCount; i++) {
-      await addVideoView(FIRST_VIDEO_ID, FIRST_CHANNEL_ID)
-    }
-
-    const mostViewedVideos = await getMostViewedVideos(30)
-    const mostViewedVideosAllTime = await getMostViewedVideos(null)
-    expect(mostViewedVideos).toEqual(expectedMostViewedVideos)
-    expect(mostViewedVideosAllTime).toEqual(expectedMostViewedVideos)
-  })
 })