Skip to content

Commit

Permalink
feat: update index json (#379)
Browse files Browse the repository at this point in the history
> 多同步源方案之后,原有 srouce_registry 配置仅初始化时消费, 更新 / 状态信息相关字段

1. 使用 `information_schema` 替换 id 计算,解决部分 db id 自增不连续的问题
2. 添加 upstream_registries 列表,返回对应 changesStreamTaskData 以及 registry 信息
3. ~~source_registry~~ , ~~changes_stream_registry~~,
~~sync_changes_steam~~ 标记为 Legacy 字段,暂不移除
4. 新增 rawQueryUtil 处理 getCount 类型查询逻辑
  • Loading branch information
elrrrrrrr committed Jan 28, 2023
1 parent 68edfb5 commit bce6e79
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 28 deletions.
1 change: 1 addition & 0 deletions app/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const BUG_VERSIONS = 'bug-versions';
export const LATEST_TAG = 'latest';
export const GLOBAL_WORKER = 'GLOBAL_WORKER';
37 changes: 24 additions & 13 deletions app/core/service/CacheService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,38 @@ import {
} from '@eggjs/tegg';
import { CacheAdapter } from '../../common/adapter/CacheAdapter';
import { AbstractService } from '../../common/AbstractService';
import { ChangesStreamTaskData } from '../entity/Task';

type PackageCacheAttribe = 'etag' | 'manifests';

type TotalData = {
export type UpstreamRegistryInfo = {
registry_name: string;
source_registry: string;
changes_stream_url: string;
} & ChangesStreamTaskData;

export type DownloadInfo = {
today: number;
yesterday: number;
samedayLastweek: number;
thisweek: number;
thismonth: number;
thisyear: number;
lastweek: number;
lastmonth: number;
lastyear: number;
};

export type TotalData = {
packageCount: number;
packageVersionCount: number;
lastPackage: string;
lastPackageVersion: string;
download: {
today: number;
yesterday: number;
samedayLastweek: number;
thisweek: number;
thismonth: number;
thisyear: number;
lastweek: number;
lastmonth: number;
lastyear: number;
};
changesStream: object,
download: DownloadInfo;
changesStream: ChangesStreamTaskData;
lastChangeId: number | bigint;
cacheTime: string;
upstreamRegistries: UpstreamRegistryInfo[];
};
const TOTAL_DATA_KEY = '__TOTAL_DATA__';

Expand Down Expand Up @@ -72,6 +82,7 @@ export class CacheService extends AbstractService {
lastyear: 0,
},
changesStream: {},
upstreamRegistries: [],
lastChangeId: 0,
cacheTime: '',
};
Expand Down
3 changes: 2 additions & 1 deletion app/core/service/ChangesStreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { E500 } from 'egg-errors';
import { Registry } from '../entity/Registry';
import { AbstractChangeStream } from '../../common/adapter/changesStream/AbstractChangesStream';
import { getScopeAndName } from '../../common/PackageUtil';
import { GLOBAL_WORKER } from '../../common/constants';
import { ScopeManagerService } from './ScopeManagerService';
import { PackageRepository } from '../../repository/PackageRepository';

Expand All @@ -44,7 +45,7 @@ export class ChangesStreamService extends AbstractService {
// GLOBAL_WORKER: 默认的同步源
// `{registryName}_WORKER`: 自定义 scope 的同步源
public async findExecuteTask(): Promise<ChangesStreamTask | null> {
const targetName = 'GLOBAL_WORKER';
const targetName = GLOBAL_WORKER;
const globalRegistryTask = await this.taskRepository.findTaskByTargetName(targetName, TaskType.ChangesStream);
// 如果没有配置默认同步源,先进行初始化
if (!globalRegistryTask) {
Expand Down
42 changes: 40 additions & 2 deletions app/port/controller/HomeController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,44 @@ import {
Inject,
} from '@eggjs/tegg';
import { AbstractController } from './AbstractController';
import { CacheService } from '../../core/service/CacheService';
import { CacheService, DownloadInfo, UpstreamRegistryInfo } from '../../core/service/CacheService';

const startTime = new Date();

// registry 站点信息数据 SiteTotalData
// SiteEnvInfo: 环境、运行时相关信息,实时查询
// UpstreamInfo: 上游信息,实时查询
// TotalInfo: 总数据信息,定时任务每分钟生成
// LegacyInfo: 旧版兼容信息
type SiteTotalData = LegacyInfo & SiteEnvInfo & TotalInfo;

type LegacyInfo = {
source_registry: string,
changes_stream_registry: string,
sync_changes_steam: any,
};

type SiteEnvInfo = {
sync_model: string;
sync_binary: string;
instance_start_time: Date;
node_version: string;
app_version: string;
engine: string;
cache_time: string;
};

type TotalInfo = {
last_package: string;
last_package_version: string;
doc_count: number | bigint;
doc_version_count: number | bigint;
update_seq: number | bigint;
download: DownloadInfo;
upstream_registries?: UpstreamRegistryInfo[];
};


@HTTPController()
export class HomeController extends AbstractController {
@Inject()
Expand All @@ -23,9 +57,12 @@ export class HomeController extends AbstractController {
path: '/',
method: HTTPMethodEnum.GET,
})
// 2023-1-20
// 原有 LegacyInfo 字段继续保留,由于 ChangesStream 信息通过 registry 表配置,可能会过期
// 新增 upstream_registries 字段,展示上游源站 registry 信息列表
async showTotal() {
const totalData = await this.cacheService.getTotalData();
const data = {
const data: SiteTotalData = {
last_package: totalData.lastPackage,
last_package_version: totalData.lastPackageVersion,
doc_count: totalData.packageCount,
Expand All @@ -42,6 +79,7 @@ export class HomeController extends AbstractController {
source_registry: this.config.cnpmcore.sourceRegistry,
changes_stream_registry: this.config.cnpmcore.changesStreamRegistry,
cache_time: totalData.cacheTime,
upstream_registries: totalData.upstreamRegistries,
};
return data;
}
Expand Down
48 changes: 40 additions & 8 deletions app/port/schedule/UpdateTotalData.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { EggLogger } from 'egg';
import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule';
import { Inject } from '@eggjs/tegg';
import { ChangesStreamTaskData } from '../../core/entity/Task';
import { RegistryManagerService } from '../../core/service/RegistryManagerService';
import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository';
import { PackageRepository } from '../../repository/PackageRepository';
import { TaskRepository } from '../../repository/TaskRepository';
import { ChangeRepository } from '../../repository/ChangeRepository';
import { CacheService } from '../../core/service/CacheService';
import { CacheService, DownloadInfo, TotalData } from '../../core/service/CacheService';
import { TaskType } from '../../common/enum/Task';
import { GLOBAL_WORKER } from '../../common/constants';
import dayjs from '../../common/dayjs';


@Schedule<IntervalParams>({
type: ScheduleType.WORKER,
scheduleData: {
Expand Down Expand Up @@ -38,11 +40,12 @@ export class UpdateTotalData {
@Inject()
private readonly cacheService: CacheService;

async subscribe() {
const changesStreamTask = await this.taskRepository.findTaskByTargetName('GLOBAL_WORKER', TaskType.ChangesStream);
const packageTotal = await this.packageRepository.queryTotal();
@Inject()
private readonly registryManagerService: RegistryManagerService;

const download = {
// 计算下载量相关信息,不区分不同 changesStream
private async calculateDownloadInfo() {
const download: DownloadInfo = {
today: 0,
yesterday: 0,
samedayLastweek: 0,
Expand Down Expand Up @@ -92,15 +95,44 @@ export class UpdateTotalData {
}
}
}
return download;
}

async subscribe() {
const packageTotal = await this.packageRepository.queryTotal();
const download = await this.calculateDownloadInfo();

const lastChange = await this.changeRepository.getLastChange();
const totalData = {
const totalData: TotalData = {
...packageTotal,
download,
changesStream: changesStreamTask && changesStreamTask.data || {},
lastChangeId: lastChange && lastChange.id || 0,
cacheTime: new Date().toISOString(),
changesStream: {} as unknown as ChangesStreamTaskData,
upstreamRegistries: [],
};

const tasks = await this.taskRepository.findTasksByCondition({ type: TaskType.ChangesStream });
for (const task of tasks) {
// 全局 changesStream
const data = task.data as ChangesStreamTaskData;
// 补充录入 upstreamRegistries
const registry = await this.registryManagerService.findByRegistryId(data.registryId as string);
if (registry) {
totalData.upstreamRegistries.push({
...data,
source_registry: registry?.host,
changes_stream_url: registry?.changeStream,
registry_name: registry?.name,
});
}

// 兼容 LegacyInfo 字段
if (task.targetName === GLOBAL_WORKER) {
totalData.changesStream = data;
}
}

await this.cacheService.saveTotalData(totalData);
this.logger.info('[UpdateTotalData.subscribe] total data: %j', totalData);
}
Expand Down
35 changes: 31 additions & 4 deletions app/repository/PackageRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { AccessLevel, SingletonProto, Inject } from '@eggjs/tegg';
import type { Package as PackageModel } from './model/Package';
import { Orm } from '@eggjs/tegg-orm-plugin/lib/SingletonORM';
import { Package as PackageModel } from './model/Package';
import { Package as PackageEntity } from '../core/entity/Package';
import { ModelConvertor } from './util/ModelConvertor';
import { PackageVersion as PackageVersionEntity } from '../core/entity/PackageVersion';
import type { PackageVersion as PackageVersionModel } from './model/PackageVersion';
import { PackageVersion as PackageVersionModel } from './model/PackageVersion';
import { PackageVersionManifest as PackageVersionManifestEntity } from '../core/entity/PackageVersionManifest';
import type { PackageVersionManifest as PackageVersionManifestModel } from './model/PackageVersionManifest';
import type { Dist as DistModel } from './model/Dist';
Expand All @@ -14,6 +15,8 @@ import type { Maintainer as MaintainerModel } from './model/Maintainer';
import type { User as UserModel } from './model/User';
import { User as UserEntity } from '../core/entity/User';
import { AbstractRepository } from './AbstractRepository';
import { EggAppConfig } from 'egg';
import { Bone } from 'leoric';

@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
Expand All @@ -40,6 +43,12 @@ export class PackageRepository extends AbstractRepository {
@Inject()
private readonly User: typeof UserModel;

@Inject()
private readonly config: EggAppConfig;

@Inject()
private readonly orm: Orm;

async findPackage(scope: string, name: string): Promise<PackageEntity | null> {
const model = await this.Package.findOne({ scope, name });
if (!model) return null;
Expand Down Expand Up @@ -241,6 +250,20 @@ export class PackageRepository extends AbstractRepository {
return ModelConvertor.convertModelToEntity(model, this.PackageVersionManifest);
}

private getCountSql(model: typeof Bone):string {
const { database } = this.config.orm;
const sql = `
SELECT
TABLE_ROWS
FROM
information_schema.tables
WHERE
table_schema = '${database}'
AND table_name = '${model.table}'
`;
return sql;
}

public async queryTotal() {
const lastPkg = await this.Package.findOne().order('id', 'desc');
const lastVersion = await this.PackageVersion.findOne().order('id', 'desc');
Expand All @@ -252,7 +275,9 @@ export class PackageRepository extends AbstractRepository {
if (lastPkg) {
lastPackage = lastPkg.scope ? `${lastPkg.scope}/${lastPkg.name}` : lastPkg.name;
// FIXME: id will be out of range number
packageCount = Number(lastPkg.id);
// 可能存在 id 增长不连续的情况,通过 count 查询
const queryRes = await this.orm.client.query(this.getCountSql(PackageModel));
packageCount = queryRes.rows?.[0].TABLE_ROWS as number;
}

if (lastVersion) {
Expand All @@ -261,7 +286,8 @@ export class PackageRepository extends AbstractRepository {
const fullname = pkg.scope ? `${pkg.scope}/${pkg.name}` : pkg.name;
lastPackageVersion = `${fullname}@${lastVersion.version}`;
}
packageVersionCount = Number(lastVersion.id);
const queryRes = await this.orm.client.query(this.getCountSql(PackageVersionModel));
packageVersionCount = queryRes.rows?.[0].TABLE_ROWS as number;
}
return {
packageCount,
Expand Down Expand Up @@ -327,4 +353,5 @@ export class PackageRepository extends AbstractRepository {
}
return entities;
}

}
5 changes: 5 additions & 0 deletions app/repository/TaskRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export class TaskRepository extends AbstractRepository {
return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity));
}

async findTasksByCondition(where: { targetName?: string; state?: TaskState; type: TaskType }): Promise<Array<TaskEntity>> {
const tasks = await this.Task.find(where);
return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity));
}

async findTaskByTargetName(targetName: string, type: TaskType, state?: TaskState) {
const where: any = { targetName, type };
if (state) {
Expand Down
Loading

0 comments on commit bce6e79

Please sign in to comment.