浏览代码

add channel follow functionality

Klaudiusz Dembler 4 年之前
父节点
当前提交
f11c9790b2

+ 2 - 1
package.json

@@ -17,8 +17,9 @@
     "url": "https://github.com/Joystream/joystream/issues"
   },
   "scripts": {
+    "clean": "rm -rf dist",
     "dev": "NODE_ENV=development ts-node-dev --respawn src/server.ts",
-    "build": "tsc",
+    "build": "yarn clean && yarn tsc",
     "start": "node dist/server.js",
     "lint": "eslint . --ext .ts"
   },

+ 17 - 0
schema.graphql

@@ -3,6 +3,11 @@
 # !!!   DO NOT MODIFY THIS FILE BY YOURSELF   !!!
 # -----------------------------------------------
 
+type ChannelFollowsInfo {
+  follows: Int!
+  id: ID!
+}
+
 type EntityViewsInfo {
   id: ID!
   views: Int!
@@ -11,15 +16,27 @@ type EntityViewsInfo {
 type Mutation {
   """Add a single view to the target video's count"""
   addVideoView(channelId: ID!, videoId: ID!): EntityViewsInfo!
+
+  """Add a single follow to the target channel"""
+  followChannel(channelId: ID!): ChannelFollowsInfo!
+
+  """Remove a single follow from the target channel"""
+  unfollowChannel(channelId: ID!): ChannelFollowsInfo!
 }
 
 type Query {
+  """Get follows counts for a list of channels"""
+  batchedChannelFollows(channelIdList: [ID!]!): [ChannelFollowsInfo]!
+
   """Get views counts for a list of channels"""
   batchedChannelsViews(channelIdList: [ID!]!): [EntityViewsInfo]!
 
   """Get views counts for a list of videos"""
   batchedVideoViews(videoIdList: [ID!]!): [EntityViewsInfo]!
 
+  """Get follows count for a single channel"""
+  channelFollows(channelId: ID!): ChannelFollowsInfo
+
   """Get views count for a single channel"""
   channelViews(channelId: ID!): EntityViewsInfo
 

+ 50 - 0
src/aggregates/follows.ts

@@ -0,0 +1,50 @@
+import { GenericAggregate } from './shared'
+import {
+  ChannelEvent,
+  ChannelEventsBucketModel,
+  ChannelEventType,
+  UnsequencedChannelEvent,
+} from '../models/ChannelEvent'
+
+type ChannelEventsAggregationResult = {
+  events?: ChannelEvent[]
+}[]
+
+export class FollowsAggregate extends GenericAggregate<ChannelEvent> {
+  private channelFollowsMap: Record<string, number> = {}
+
+  public channelFollows(channelId: string): number | null {
+    return this.channelFollowsMap[channelId] ?? null
+  }
+
+  public async rebuild() {
+    const aggregation: ChannelEventsAggregationResult = await ChannelEventsBucketModel.aggregate([
+      { $unwind: '$events' },
+      { $group: { _id: null, allEvents: { $push: '$events' } } },
+      { $project: { events: '$allEvents' } },
+    ])
+
+    const events = aggregation[0]?.events || []
+
+    events.forEach((event) => {
+      this.applyEvent(event)
+    })
+  }
+
+  public applyEvent(event: UnsequencedChannelEvent) {
+    const currentChannelFollows = this.channelFollowsMap[event.channelId] || 0
+
+    switch (event.type) {
+      case ChannelEventType.FollowChannel:
+        this.channelFollowsMap[event.channelId] = currentChannelFollows + 1
+        break
+      case ChannelEventType.UnfollowChannel:
+        this.channelFollowsMap[event.channelId] = Math.max(currentChannelFollows - 1, 0)
+        break
+      default:
+        console.error(`Parsing unknown channel event: ${event.type}`)
+    }
+  }
+}
+
+export const followsAggregate = new FollowsAggregate()

+ 4 - 0
src/aggregates/index.ts

@@ -0,0 +1,4 @@
+import { followsAggregate } from './follows'
+import { viewsAggregate } from './views'
+
+export { followsAggregate, viewsAggregate }

+ 6 - 0
src/aggregates/shared.ts

@@ -0,0 +1,6 @@
+import { GenericEvent } from '../models/shared'
+
+export abstract class GenericAggregate<EventType = GenericEvent> {
+  public abstract rebuild(): Promise<void>
+  public abstract applyEvent(event: EventType): void
+}

+ 11 - 15
src/aggregates/views.ts

@@ -8,12 +8,12 @@ export class ViewsAggregate {
   private videoViewsMap: Record<string, number> = {}
   private channelViewsMap: Record<string, number> = {}
 
-  public videoViews(videoId: string): number {
-    return this.videoViewsMap[videoId] || 0
+  public videoViews(videoId: string): number | null {
+    return this.videoViewsMap[videoId] ?? null
   }
 
-  public channelViews(channelId: string): number {
-    return this.channelViewsMap[channelId] || 0
+  public channelViews(channelId: string): number | null {
+    return this.channelViewsMap[channelId] ?? null
   }
 
   public async rebuild() {
@@ -31,22 +31,18 @@ export class ViewsAggregate {
   }
 
   public applyEvent(event: UnsequencedVideoEvent) {
-    switch (event.eventType) {
+    const currentVideoViews = this.videoViewsMap[event.videoId] || 0
+    const currentChannelViews = this.channelViewsMap[event.channelId] || 0
+
+    switch (event.type) {
       case VideoEventType.AddView:
-        this.applyAddViewEvent(event)
+        this.videoViewsMap[event.videoId] = currentVideoViews + 1
+        this.channelViewsMap[event.channelId] = currentChannelViews + 1
         break
       default:
-        console.error(`Parsing unknown video event: ${event.eventType}`)
+        console.error(`Parsing unknown video event: ${event.type}`)
     }
   }
-
-  private applyAddViewEvent(event: UnsequencedVideoEvent) {
-    const currentVideoViews = this.videoViewsMap[event.videoId] || 0
-    const currentChannelViews = this.channelViewsMap[event.channelId] || 0
-
-    this.videoViewsMap[event.videoId] = currentVideoViews + 1
-    this.channelViewsMap[event.channelId] = currentChannelViews + 1
-  }
 }
 
 export const viewsAggregate = new ViewsAggregate()

+ 10 - 0
src/entities/ChannelFollowsInfo.ts

@@ -0,0 +1,10 @@
+import { Field, ID, Int, ObjectType } from 'type-graphql'
+
+@ObjectType()
+export class ChannelFollowsInfo {
+  @Field(() => ID)
+  id: string
+
+  @Field(() => Int)
+  follows: number
+}

+ 1 - 1
src/entities/EntityViewsInfo.ts

@@ -2,7 +2,7 @@ import { Field, ID, Int, ObjectType } from 'type-graphql'
 
 @ObjectType()
 export class EntityViewsInfo {
-  @Field(() => ID, { name: 'id' })
+  @Field(() => ID)
   id: string
 
   @Field(() => Int)

+ 30 - 0
src/models/ChannelEvent.ts

@@ -0,0 +1,30 @@
+import { getModelForClass, prop } from '@typegoose/typegoose'
+import { GenericBucket, GenericEvent, insertEventIntoBucket } from './shared'
+
+export enum ChannelEventType {
+  FollowChannel = 'FOLLOW_CHANNEL',
+  UnfollowChannel = 'UNFOLLOW_CHANNEL',
+}
+
+export class ChannelEvent extends GenericEvent {
+  @prop({ required: true, index: true })
+  channelId: string
+
+  @prop({ required: true, index: true, enum: ChannelEventType })
+  type: ChannelEventType
+}
+
+export type UnsequencedChannelEvent = Omit<ChannelEvent, '_id'>
+
+class ChannelEventsBucket extends GenericBucket {
+  @prop({ required: true, type: () => [ChannelEvent] })
+  events: ChannelEvent[]
+}
+
+export const ChannelEventsBucketModel = getModelForClass(ChannelEventsBucket, {
+  schemaOptions: { collection: 'channelEvents' },
+})
+
+export const saveChannelEvent = async (unsequencedChannelEvent: UnsequencedChannelEvent) => {
+  return await insertEventIntoBucket(unsequencedChannelEvent, ChannelEventsBucketModel)
+}

+ 8 - 55
src/models/VideoEvent.ts

@@ -1,45 +1,24 @@
-import { getModelForClass, plugin, prop } from '@typegoose/typegoose'
-import { AutoIncrementID } from '@typegoose/auto-increment'
-
-const MAX_BUCKET_SIZE = 50000
+import { getModelForClass, prop } from '@typegoose/typegoose'
+import { GenericBucket, GenericEvent, insertEventIntoBucket } from './shared'
 
 export enum VideoEventType {
   AddView = 'ADD_VIEW',
 }
 
-export class VideoEvent {
-  @prop({ required: true, index: true, unique: true })
-  eventId: number
-
+export class VideoEvent extends GenericEvent {
   @prop({ required: true, index: true })
   videoId: string
 
   @prop({ required: true, index: true })
   channelId: string
 
-  @prop({ required: true })
-  timestamp: Date
-
   @prop({ required: true, index: true, enum: VideoEventType })
-  eventType: VideoEventType
+  type: VideoEventType
 }
 
-export type UnsequencedVideoEvent = Omit<VideoEvent, 'eventId'>
-
-@plugin(AutoIncrementID, { field: 'bucketId' })
-class VideoEventsBucket {
-  @prop({ required: true, index: true, unique: true })
-  bucketId?: number
-
-  @prop({ required: true, index: true })
-  firstTimestamp: Date
-
-  @prop({ required: true, index: true })
-  lastTimestamp: Date
-
-  @prop({ required: true })
-  bucketSize: number
+export type UnsequencedVideoEvent = Omit<VideoEvent, '_id'>
 
+class VideoEventsBucket extends GenericBucket {
   @prop({ required: true, type: () => [VideoEvent] })
   events: VideoEvent[]
 }
@@ -48,32 +27,6 @@ export const VideoEventsBucketModel = getModelForClass(VideoEventsBucket, {
   schemaOptions: { collection: 'videoEvents' },
 })
 
-export const insertVideoEventIntoBucket = async (unsequencedVideoEvent: UnsequencedVideoEvent) => {
-  // TODO: possibly cache the last bucket
-  const lastBucket = await VideoEventsBucketModel.findOne().sort({ bucketId: -1 })
-  const lastEventId = lastBucket ? lastBucket.events[lastBucket.events.length - 1].eventId || 0 : 0
-  const event: VideoEvent = {
-    ...unsequencedVideoEvent,
-    eventId: lastEventId + 1,
-  }
-
-  if (!lastBucket || lastBucket.bucketSize >= MAX_BUCKET_SIZE) {
-    return await createNewBucketFromEvent(event)
-  }
-
-  lastBucket.events.push(event)
-  lastBucket.bucketSize++
-  lastBucket.lastTimestamp = event.timestamp
-  await lastBucket.save()
-}
-
-const createNewBucketFromEvent = async (event: VideoEvent) => {
-  const newBucket: VideoEventsBucket = {
-    firstTimestamp: event.timestamp,
-    lastTimestamp: event.timestamp,
-    bucketSize: 1,
-    events: [event],
-  }
-
-  await VideoEventsBucketModel.create(newBucket)
+export const saveVideoEvent = async (unsequencedVideoEvent: UnsequencedVideoEvent) => {
+  return await insertEventIntoBucket(unsequencedVideoEvent, VideoEventsBucketModel)
 }

+ 69 - 0
src/models/shared.ts

@@ -0,0 +1,69 @@
+import { DocumentType, prop, ReturnModelType } from '@typegoose/typegoose'
+
+const MAX_BUCKET_SIZE = 50000
+
+export class GenericEvent {
+  @prop()
+  _id: number
+
+  @prop({ required: true })
+  timestamp: Date
+
+  type: unknown
+}
+
+type UnsequencedGenericEvent = Omit<GenericEvent, '_id'>
+
+export class GenericBucket {
+  @prop()
+  _id: number
+
+  @prop({ required: true, index: true })
+  firstTimestamp: Date
+
+  @prop({ required: true, index: true })
+  lastTimestamp: Date
+
+  @prop({ required: true })
+  size: number
+
+  events: GenericEvent[]
+}
+
+export const insertEventIntoBucket = async (
+  unsequencedEvent: UnsequencedGenericEvent,
+  bucketModel: ReturnModelType<typeof GenericBucket>
+) => {
+  const lastBucket = await bucketModel.findOne().sort({ _id: -1 })
+  const lastEventId = lastBucket?.events[lastBucket?.events.length - 1]._id ?? -1
+  const event: GenericEvent = {
+    ...unsequencedEvent,
+    _id: lastEventId + 1,
+  }
+
+  if (!lastBucket || lastBucket.size >= MAX_BUCKET_SIZE) {
+    return await createNewBucketFromEvent(lastBucket, event, bucketModel)
+  }
+
+  lastBucket.events.push(event)
+  lastBucket.size++
+  lastBucket.lastTimestamp = event.timestamp
+  await lastBucket.save()
+}
+
+const createNewBucketFromEvent = async (
+  lastBucket: DocumentType<GenericBucket> | null,
+  event: GenericEvent,
+  bucketModel: ReturnModelType<typeof GenericBucket>
+) => {
+  const newBucketId = lastBucket ? lastBucket._id + 1 : 0
+  const newBucket: GenericBucket = {
+    _id: newBucketId,
+    firstTimestamp: event.timestamp,
+    lastTimestamp: event.timestamp,
+    size: 1,
+    events: [event],
+  }
+
+  await bucketModel.create(newBucket)
+}

+ 76 - 0
src/resolvers/followsInfo.ts

@@ -0,0 +1,76 @@
+import { Args, ArgsType, Field, ID, Mutation, Query, Resolver } from 'type-graphql'
+import { followsAggregate } from '../aggregates'
+import { ChannelFollowsInfo } from '../entities/ChannelFollowsInfo'
+import { ChannelEventType, saveChannelEvent, UnsequencedChannelEvent } from '../models/ChannelEvent'
+
+@ArgsType()
+class ChannelFollowsArgs {
+  @Field(() => ID)
+  channelId: string
+}
+
+@ArgsType()
+class BatchedChannelFollowsArgs {
+  @Field(() => [ID])
+  channelIdList: string[]
+}
+
+@ArgsType()
+class FollowChannelArgs extends ChannelFollowsArgs {}
+
+@ArgsType()
+class UnfollowChannelArgs extends ChannelFollowsArgs {}
+
+@Resolver()
+export class ChannelFollowsInfosResolver {
+  @Query(() => ChannelFollowsInfo, { nullable: true, description: 'Get follows count for a single channel' })
+  async channelFollows(@Args() { channelId }: ChannelFollowsArgs): Promise<ChannelFollowsInfo | null> {
+    return getFollowsInfo(channelId)
+  }
+
+  @Query(() => [ChannelFollowsInfo], { description: 'Get follows counts for a list of channels', nullable: 'items' })
+  async batchedChannelFollows(
+    @Args() { channelIdList }: BatchedChannelFollowsArgs
+  ): Promise<(ChannelFollowsInfo | null)[]> {
+    return channelIdList.map((channelId) => getFollowsInfo(channelId))
+  }
+
+  @Mutation(() => ChannelFollowsInfo, { description: 'Add a single follow to the target channel' })
+  async followChannel(@Args() { channelId }: FollowChannelArgs): Promise<ChannelFollowsInfo> {
+    const event: UnsequencedChannelEvent = {
+      channelId,
+      type: ChannelEventType.FollowChannel,
+      timestamp: new Date(),
+    }
+
+    await saveChannelEvent(event)
+    followsAggregate.applyEvent(event)
+
+    return getFollowsInfo(channelId)!
+  }
+
+  @Mutation(() => ChannelFollowsInfo, { description: 'Remove a single follow from the target channel' })
+  async unfollowChannel(@Args() { channelId }: UnfollowChannelArgs): Promise<ChannelFollowsInfo> {
+    const event: UnsequencedChannelEvent = {
+      channelId,
+      type: ChannelEventType.UnfollowChannel,
+      timestamp: new Date(),
+    }
+
+    await saveChannelEvent(event)
+    followsAggregate.applyEvent(event)
+
+    return getFollowsInfo(channelId)!
+  }
+}
+
+const getFollowsInfo = (channelId: string): ChannelFollowsInfo | null => {
+  const follows = followsAggregate.channelFollows(channelId)
+  if (follows != null) {
+    return {
+      id: channelId,
+      follows,
+    }
+  }
+  return null
+}

+ 4 - 0
src/resolvers/index.ts

@@ -0,0 +1,4 @@
+import { ChannelFollowsInfosResolver } from './followsInfo'
+import { VideoViewsInfosResolver } from './viewsInfo'
+
+export { ChannelFollowsInfosResolver, VideoViewsInfosResolver }

+ 26 - 44
src/resolvers/viewsInfo.ts

@@ -1,7 +1,7 @@
 import { Args, ArgsType, Field, ID, Mutation, Query, Resolver } from 'type-graphql'
 import { EntityViewsInfo } from '../entities/EntityViewsInfo'
-import { viewsAggregate } from '../aggregates/views'
-import { insertVideoEventIntoBucket, VideoEventType, UnsequencedVideoEvent } from '../models/VideoEvent'
+import { viewsAggregate } from '../aggregates'
+import { saveVideoEvent, VideoEventType, UnsequencedVideoEvent } from '../models/VideoEvent'
 
 @ArgsType()
 class VideoViewsArgs {
@@ -40,56 +40,22 @@ class AddVideoViewArgs {
 export class VideoViewsInfosResolver {
   @Query(() => EntityViewsInfo, { nullable: true, description: 'Get views count for a single video' })
   async videoViews(@Args() { videoId }: VideoViewsArgs): Promise<EntityViewsInfo | null> {
-    const views = viewsAggregate.videoViews(videoId)
-    if (views) {
-      return {
-        id: videoId,
-        views,
-      }
-    }
-    return null
+    return getVideoViewsInfo(videoId)
   }
 
   @Query(() => [EntityViewsInfo], { description: 'Get views counts for a list of videos', nullable: 'items' })
   async batchedVideoViews(@Args() { videoIdList }: BatchedVideoViewsArgs): Promise<(EntityViewsInfo | null)[]> {
-    return videoIdList.map((videoId) => {
-      const videoViews = viewsAggregate.videoViews(videoId)
-      if (videoViews) {
-        const videoViewsInfo: EntityViewsInfo = {
-          id: videoId,
-          views: videoViews,
-        }
-        return videoViewsInfo
-      }
-      return null
-    })
+    return videoIdList.map((videoId) => getVideoViewsInfo(videoId))
   }
 
   @Query(() => EntityViewsInfo, { nullable: true, description: 'Get views count for a single channel' })
   async channelViews(@Args() { channelId }: ChannelViewsArgs): Promise<EntityViewsInfo | null> {
-    const views = viewsAggregate.channelViews(channelId)
-    if (views) {
-      return {
-        id: channelId,
-        views,
-      }
-    }
-    return null
+    return getChannelViewsInfo(channelId)
   }
 
   @Query(() => [EntityViewsInfo], { description: 'Get views counts for a list of channels', nullable: 'items' })
   async batchedChannelsViews(@Args() { channelIdList }: BatchedChannelViewsArgs): Promise<(EntityViewsInfo | null)[]> {
-    return channelIdList.map((channelId) => {
-      const channelViews = viewsAggregate.channelViews(channelId)
-      if (channelViews) {
-        const channelViewsInfo: EntityViewsInfo = {
-          id: channelId,
-          views: channelViews,
-        }
-        return channelViewsInfo
-      }
-      return null
-    })
+    return channelIdList.map((channelId) => getChannelViewsInfo(channelId))
   }
 
   @Mutation(() => EntityViewsInfo, { description: "Add a single view to the target video's count" })
@@ -97,17 +63,33 @@ export class VideoViewsInfosResolver {
     const event: UnsequencedVideoEvent = {
       videoId,
       channelId,
-      eventType: VideoEventType.AddView,
+      type: VideoEventType.AddView,
       timestamp: new Date(),
     }
 
-    await insertVideoEventIntoBucket(event)
+    await saveVideoEvent(event)
     viewsAggregate.applyEvent(event)
 
-    const views = viewsAggregate.videoViews(videoId)
+    return getVideoViewsInfo(videoId)!
+  }
+}
+
+const buildViewsObject = (id: string, views: number | null): EntityViewsInfo | null => {
+  if (views != null) {
     return {
-      id: videoId,
+      id,
       views,
     }
   }
+  return null
+}
+
+const getVideoViewsInfo = (videoId: string): EntityViewsInfo | null => {
+  const views = viewsAggregate.videoViews(videoId)
+  return buildViewsObject(videoId, views)
+}
+
+const getChannelViewsInfo = (channelId: string): EntityViewsInfo | null => {
+  const views = viewsAggregate.channelViews(channelId)
+  return buildViewsObject(channelId, views)
 }

+ 21 - 13
src/server.ts

@@ -3,16 +3,17 @@ import { ApolloServer } from 'apollo-server-express'
 import Express from 'express'
 import { connect } from 'mongoose'
 import { buildSchema } from 'type-graphql'
-import { VideoViewsInfosResolver } from './resolvers/viewsInfo'
+
 import config from './config'
-import { viewsAggregate } from './aggregates/views'
+import { followsAggregate, viewsAggregate } from './aggregates'
+import { ChannelFollowsInfosResolver, VideoViewsInfosResolver } from './resolvers'
 
 const main = async () => {
   await getMongooseConnection()
   await rebuildAggregates()
 
   const schema = await buildSchema({
-    resolvers: [VideoViewsInfosResolver],
+    resolvers: [VideoViewsInfosResolver, ChannelFollowsInfosResolver],
     emitSchemaFile: 'schema.graphql',
     validate: false,
   })
@@ -26,27 +27,34 @@ const main = async () => {
 }
 
 const getMongooseConnection = async () => {
-  process.stdout.write(`Connecting to MongoDB at "${config.mongoDBUri}"...`)
-  try {
+  const task = async () => {
     const mongoose = await connect(config.mongoDBUri, {
       useUnifiedTopology: true,
       useNewUrlParser: true,
       useCreateIndex: true,
     })
     await mongoose.connection
-  } catch (error) {
-    process.stdout.write(' Failed!\n')
-    console.error(error)
-    process.exit()
-    return
   }
-  process.stdout.write(' Done.\n')
+  await wrapTask(`Connecting to MongoDB at "${config.mongoDBUri}"`, task)
 }
 
 const rebuildAggregates = async () => {
-  process.stdout.write('Rebuilding video events aggregate...')
-  try {
+  const viewEventsTask = async () => {
     await viewsAggregate.rebuild()
+  }
+
+  const followEventsTask = async () => {
+    await followsAggregate.rebuild()
+  }
+
+  await wrapTask('Rebuiliding view events aggregate', viewEventsTask)
+  await wrapTask('Rebuiliding follow events aggregate', followEventsTask)
+}
+
+const wrapTask = async (message: string, task: () => Promise<void>) => {
+  process.stdout.write(`${message}...`)
+  try {
+    await task()
   } catch (error) {
     process.stdout.write(' Failed!\n')
     console.error(error)