|
@@ -7,39 +7,92 @@ import {
|
|
|
} from '../models/ChannelEvent'
|
|
|
|
|
|
import { ChannelFollowsInfo } from '../entities/ChannelFollowsInfo'
|
|
|
+import { mapPeriods } from '../helpers'
|
|
|
+import { differenceInCalendarDays } from 'date-fns'
|
|
|
|
|
|
type ChannelEventsAggregationResult = {
|
|
|
events?: ChannelEvent[]
|
|
|
}[]
|
|
|
|
|
|
+type TimePeriodEventsData = {
|
|
|
+ sevenDays: Partial<UnsequencedChannelEvent>[]
|
|
|
+ thirtyDays: Partial<UnsequencedChannelEvent>[]
|
|
|
+}
|
|
|
+
|
|
|
+type TimePeriodFollows = {
|
|
|
+ sevenDays: ChannelFollowsInfo[]
|
|
|
+ thirtyDays: ChannelFollowsInfo[]
|
|
|
+}
|
|
|
+
|
|
|
export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
|
|
|
private channelFollowsMap: Record<string, number> = {}
|
|
|
- private allChannelFollowEvents: Partial<UnsequencedChannelEvent>[] = []
|
|
|
private allChannelFollows: ChannelFollowsInfo[] = []
|
|
|
|
|
|
- private addOrUpdateFollows(id: string, eventType: ChannelEventType): void {
|
|
|
- const i = this.allChannelFollows.findIndex((element) => element.id === id)
|
|
|
- if (i > -1) {
|
|
|
- if (!this.allChannelFollows[i].follows && eventType === ChannelEventType.UnfollowChannel) return
|
|
|
- this.allChannelFollows[i].follows =
|
|
|
- eventType === ChannelEventType.FollowChannel
|
|
|
- ? this.allChannelFollows[i].follows + 1
|
|
|
- : this.allChannelFollows[i].follows - 1
|
|
|
- } else this.allChannelFollows.push({ id, follows: eventType === ChannelEventType.UnfollowChannel ? 0 : 1 })
|
|
|
+ private timePeriodEvents: TimePeriodEventsData = {
|
|
|
+ sevenDays: [],
|
|
|
+ thirtyDays: [],
|
|
|
+ }
|
|
|
+
|
|
|
+ private timePeriodChannelFollows: TimePeriodFollows = {
|
|
|
+ sevenDays: [],
|
|
|
+ thirtyDays: [],
|
|
|
+ }
|
|
|
+
|
|
|
+ private addOrUpdateFollows(array: ChannelFollowsInfo[], id: string, shouldAdd = true): void {
|
|
|
+ const followsObject = array.find((element) => element.id === id)
|
|
|
+
|
|
|
+ if (followsObject) {
|
|
|
+ if (!followsObject.follows && !shouldAdd) return
|
|
|
+
|
|
|
+ if (shouldAdd) {
|
|
|
+ followsObject.follows++
|
|
|
+ } else {
|
|
|
+ followsObject.follows--
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ array.push({ id, follows: shouldAdd ? 1 : 0 })
|
|
|
+ }
|
|
|
+
|
|
|
+ array.sort((a, b) => (a.follows > b.follows ? -1 : 1))
|
|
|
}
|
|
|
|
|
|
- private addOrRemoveFollowEvent(eventType: ChannelEventType, { channelId, timestamp }: UnsequencedChannelEvent): void {
|
|
|
+ private addOrRemoveFollowEvent(
|
|
|
+ array: Partial<UnsequencedChannelEvent>[],
|
|
|
+ eventType: ChannelEventType,
|
|
|
+ { channelId, timestamp }: UnsequencedChannelEvent
|
|
|
+ ): void {
|
|
|
if (eventType === ChannelEventType.FollowChannel) {
|
|
|
- this.allChannelFollowEvents = [...this.allChannelFollowEvents, { channelId, timestamp }]
|
|
|
+ array.push({ channelId, timestamp })
|
|
|
}
|
|
|
if (eventType === ChannelEventType.UnfollowChannel) {
|
|
|
- const index = this.allChannelFollowEvents.findIndex((item) => item.channelId === channelId)
|
|
|
+ const index = array.findIndex((item) => item.channelId === channelId)
|
|
|
if (index >= 0) {
|
|
|
- this.allChannelFollowEvents.splice(index, 1)
|
|
|
+ array.splice(index, 1)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public filterEventsByPeriod(timePeriodDays: 7 | 30) {
|
|
|
+ const mappedPeriod = mapPeriods(timePeriodDays)
|
|
|
+ const followEvents = this.timePeriodEvents[mappedPeriod]
|
|
|
+
|
|
|
+ // find index of first event that should be kept
|
|
|
+ const firstEventToIncludeIdx = followEvents.findIndex(
|
|
|
+ (follow) => follow.timestamp && differenceInCalendarDays(new Date(), follow.timestamp) <= timePeriodDays
|
|
|
+ )
|
|
|
+
|
|
|
+ for (let i = 0; i < firstEventToIncludeIdx; i++) {
|
|
|
+ const { channelId } = followEvents[i]
|
|
|
+
|
|
|
+ if (channelId) {
|
|
|
+ this.addOrUpdateFollows(this.timePeriodChannelFollows[mappedPeriod], channelId, false)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // remove older events
|
|
|
+ this.timePeriodEvents[mappedPeriod] = followEvents.slice(firstEventToIncludeIdx)
|
|
|
+ }
|
|
|
+
|
|
|
public channelFollows(channelId: string): number | null {
|
|
|
return this.channelFollowsMap[channelId] ?? null
|
|
|
}
|
|
@@ -48,14 +101,14 @@ export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
|
|
|
return Object.freeze(this.channelFollowsMap)
|
|
|
}
|
|
|
|
|
|
- public getAllFollowEvents() {
|
|
|
- return this.allChannelFollowEvents
|
|
|
- }
|
|
|
-
|
|
|
public getAllChannelFollows() {
|
|
|
return this.allChannelFollows
|
|
|
}
|
|
|
|
|
|
+ public getTimePeriodChannelFollows() {
|
|
|
+ return this.timePeriodChannelFollows
|
|
|
+ }
|
|
|
+
|
|
|
public static async Build(): Promise<FollowsAggregate> {
|
|
|
const aggregation: ChannelEventsAggregationResult = await ChannelEventsBucketModel.aggregate([
|
|
|
{ $unwind: '$events' },
|
|
@@ -69,23 +122,34 @@ export class FollowsAggregate implements GenericAggregate<ChannelEvent> {
|
|
|
events.forEach((event) => {
|
|
|
aggregate.applyEvent(event)
|
|
|
})
|
|
|
+
|
|
|
+ aggregate.filterEventsByPeriod(7)
|
|
|
+ aggregate.filterEventsByPeriod(30)
|
|
|
+
|
|
|
return aggregate
|
|
|
}
|
|
|
|
|
|
public applyEvent(event: UnsequencedChannelEvent) {
|
|
|
- const { channelId, type } = event
|
|
|
+ const { type, ...eventWithoutType } = event
|
|
|
+ const { channelId } = eventWithoutType
|
|
|
const currentChannelFollows = this.channelFollowsMap[channelId] || 0
|
|
|
|
|
|
- switch (event.type) {
|
|
|
+ switch (type) {
|
|
|
case ChannelEventType.FollowChannel:
|
|
|
this.channelFollowsMap[channelId] = currentChannelFollows + 1
|
|
|
- this.addOrUpdateFollows(channelId, ChannelEventType.FollowChannel)
|
|
|
- this.addOrRemoveFollowEvent(ChannelEventType.FollowChannel, event)
|
|
|
+ this.addOrUpdateFollows(this.allChannelFollows, channelId)
|
|
|
+ this.addOrUpdateFollows(this.timePeriodChannelFollows.sevenDays, channelId)
|
|
|
+ this.addOrUpdateFollows(this.timePeriodChannelFollows.thirtyDays, channelId)
|
|
|
+ this.addOrRemoveFollowEvent(this.timePeriodEvents.sevenDays, ChannelEventType.FollowChannel, event)
|
|
|
+ this.addOrRemoveFollowEvent(this.timePeriodEvents.thirtyDays, ChannelEventType.FollowChannel, event)
|
|
|
break
|
|
|
case ChannelEventType.UnfollowChannel:
|
|
|
this.channelFollowsMap[channelId] = Math.max(currentChannelFollows - 1, 0)
|
|
|
- this.addOrUpdateFollows(channelId, ChannelEventType.UnfollowChannel)
|
|
|
- this.addOrRemoveFollowEvent(ChannelEventType.UnfollowChannel, event)
|
|
|
+ this.addOrUpdateFollows(this.allChannelFollows, channelId, false)
|
|
|
+ this.addOrUpdateFollows(this.timePeriodChannelFollows.sevenDays, channelId, false)
|
|
|
+ this.addOrUpdateFollows(this.timePeriodChannelFollows.thirtyDays, channelId, false)
|
|
|
+ this.addOrRemoveFollowEvent(this.timePeriodEvents.sevenDays, ChannelEventType.UnfollowChannel, event)
|
|
|
+ this.addOrRemoveFollowEvent(this.timePeriodEvents.thirtyDays, ChannelEventType.UnfollowChannel, event)
|
|
|
break
|
|
|
default:
|
|
|
console.error(`Parsing unknown channel event: ${type}`)
|