Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support npm search command like npmio #513

Merged
merged 11 commits into from
Sep 1, 2023
16 changes: 15 additions & 1 deletion app/common/PackageUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import * as ssri from 'ssri';
import tar from 'tar';
import { PackageJSONType } from '../repository/PackageRepository';
import { AuthorType, PackageJSONType } from '../repository/PackageRepository';


// /@cnpm%2ffoo
// /@cnpm%2Ffoo
Expand Down Expand Up @@ -104,6 +105,19 @@ export async function hasShrinkWrapInTgz(contentOrFile: Uint8Array | string): Pr
}
}

/** 写入 ES 时,格式化 author */
export function formatAuthor(author: string | AuthorType | undefined): AuthorType | undefined {
if (author === undefined) {
return author;
}

if (typeof author === 'string') {
return { name: author };
}

return author;
}
Beace marked this conversation as resolved.
Show resolved Hide resolved

export async function extractPackageJSON(tarballBytes: Buffer): Promise<PackageJSONType> {
return new Promise((resolve, reject) => {
Readable.from(tarballBytes)
Expand Down
7 changes: 7 additions & 0 deletions app/common/typing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { CnpmcoreConfig } from '../port/config';
import { Readable } from 'stream';
import { IncomingHttpHeaders } from 'http';
import { EggContext } from '@eggjs/tegg';
import { estypes } from '@elastic/elasticsearch';

export interface UploadResult {
key: string;
Expand Down Expand Up @@ -50,6 +51,12 @@ export interface QueueAdapter {
length(key: string): Promise<number>;
}

export interface SearchAdapter {
search<T>(query: any): Promise<estypes.SearchHitsMetadata<T>>;
upsert<T>(id: string, document: T): Promise<string>;
delete(id: string): Promise<string>;
}

export interface AuthUrlResult {
loginUrl: string;
doneUrl: string;
Expand Down
94 changes: 94 additions & 0 deletions app/core/event/SyncESPackage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// TODO sync event
/* eslint-disable @typescript-eslint/no-unused-vars */
import { EggAppConfig } from 'egg';
import { Event, Inject } from '@eggjs/tegg';
import {
PACKAGE_UNPUBLISHED,
PACKAGE_VERSION_ADDED,
PACKAGE_VERSION_REMOVED,
PACKAGE_TAG_ADDED,
PACKAGE_TAG_CHANGED,
PACKAGE_TAG_REMOVED,
PACKAGE_MAINTAINER_CHANGED,
PACKAGE_MAINTAINER_REMOVED,
PACKAGE_META_CHANGED,
} from './index';

import { PackageSearchService } from '../service/PackageSearchService';

class SyncESPackage {
@Inject()
protected readonly packageSearchService: PackageSearchService;

@Inject()
protected readonly config: EggAppConfig;

protected async syncPackage(fullname: string) {
if (!this.config.cnpmcore.enableElasticsearch) return;
await this.packageSearchService.syncPackage(fullname, true);
}
}

@Event(PACKAGE_UNPUBLISHED)
export class PackageUnpublished extends SyncESPackage {
async handle(fullname: string) {
if (!this.config.cnpmcore.enableElasticsearch) return;
await this.packageSearchService.removePackage(fullname);
}
}

@Event(PACKAGE_VERSION_ADDED)
export class PackageVersionAdded extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_VERSION_REMOVED)
export class PackageVersionRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_ADDED)
export class PackageTagAdded extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_CHANGED)
export class PackageTagChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_TAG_REMOVED)
export class PackageTagRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_MAINTAINER_CHANGED)
export class PackageMaintainerChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_MAINTAINER_REMOVED)
export class PackageMaintainerRemoved extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}

@Event(PACKAGE_META_CHANGED)
export class PackageMetaChanged extends SyncESPackage {
async handle(fullname: string) {
await this.syncPackage(fullname);
}
}
216 changes: 216 additions & 0 deletions app/core/service/PackageSearchService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import { AccessLevel, Inject, SingletonProto } from '@eggjs/tegg';
import type { estypes } from '@elastic/elasticsearch';
import dayjs from 'dayjs';

import { AbstractService } from '../../common/AbstractService';
import { formatAuthor, getScopeAndName } from '../../common/PackageUtil';
import { PackageManagerService } from './PackageManagerService';
import { SearchManifestType, SearchMappingType, SearchRepository } from '../../repository/SearchRepository';
import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository';
import { PackageRepository } from '../../repository/PackageRepository';


@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
})
export class PackageSearchService extends AbstractService {
@Inject()
private readonly packageManagerService: PackageManagerService;
@Inject()
private readonly searchRepository: SearchRepository;
@Inject()
private packageVersionDownloadRepository: PackageVersionDownloadRepository;
@Inject()
protected packageRepository: PackageRepository;

async syncPackage(fullname: string, isSync = true) {
const [ scope, name ] = getScopeAndName(fullname);
const fullManifests = await this.packageManagerService.listPackageFullManifests(scope, name, isSync);

if (!fullManifests.data) {
this.logger.warn('[PackageSearchService.syncPackage] save package:%s not found', fullname);
return;
}

const pkg = await this.packageRepository.findPackage(scope, name);
if (!pkg) {
this.logger.warn('[PackageSearchService.syncPackage] findPackage:%s not found', fullname);
return;
}

// get last year download data
const startDate = dayjs().subtract(1, 'year');
const endDate = dayjs();

const entities = await this.packageVersionDownloadRepository.query(pkg.packageId, startDate.toDate(), endDate.toDate());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

download 现在的实现估计扛不住,没有根据 packageId 来聚合的。

比如 antd 这种版本特别多的包,单次查询就要有上千条数据 感觉得换个方案。

Copy link
Contributor Author

@Beace Beace Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

外网的包可以直接调下 npmjs 的 api,内网包现有实现确实没啥好办法。是不是可以在 redis 里给每个 package 整个 total download 计数器,写 DB 的时候同时给对应 packageId(key) 累加。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉 es 索引更新时机也是个问题,现在索引更新是依赖版本或者 tag 变更,下载量更新感觉需要在现在统计下载量的地方去触发一下 es 索引更新

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我看你提了个新的 ISSUE 在搞单包下载量的事情,要等你搞完吗。下载量不一定要非常准确,趋势是对的,就不会影响搜索的排名的

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Beace 要不先合并我们跑下数据看看?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Beace 在申请了,我先尝试在 opensearch 导入下数据看看

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Beace opensearch 试了下兼容性问题较多,没法直接用,还是继续申请 es

看计划里有同步脚本,哪里可以找呀?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦脚本遗漏了,其实就是批量调这接口

$ curl --location --request PUT 'http://localhost:7001/-/package/:package/syncs'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/-/v1/search/sync/:fullname(${FULLNAME_REG_STRING})

👌🏻 合并完我试试单独调这个接口

let downloadsAll = 0;
for (const entity of entities) {
for (let i = 1; i <= 31; i++) {
const day = String(i).padStart(2, '0');
const field = `d${day}`;
const counter = entity[field];
if (!counter) continue;
downloadsAll += counter;
}
}

const { data: manifest } = fullManifests;

const latestVersion = manifest['dist-tags'].latest;
const latestManifest = manifest.versions[latestVersion];

const packageDoc: SearchMappingType = {
name: manifest.name,
version: latestVersion,
_rev: manifest._rev,
scope: scope ? scope.replace('@', '') : 'unscoped',
keywords: manifest.keywords || [],
versions: Object.keys(manifest.versions),
description: manifest.description,
license: manifest.license,
maintainers: manifest.maintainers,
author: formatAuthor(manifest.author),
'dist-tags': manifest['dist-tags'],
date: manifest.time[latestVersion],
created: manifest.time.created,
modified: manifest.time.modified,
Beace marked this conversation as resolved.
Show resolved Hide resolved
// 归属 registry,keywords 枚举值
_source_registry_name: manifest._source_registry_name,
// 最新版本发布人 _npmUser:
_npmUser: latestManifest?._npmUser,
// 最新版本发布信息
publish_time: latestManifest?.publish_time,
};

const document: SearchManifestType = {
package: packageDoc,
downloads: {
all: downloadsAll,
},
};

return await this.searchRepository.upsertPackage(document);
}

async searchPackage(text: string, from: number, size: number): Promise<{ objects: (SearchManifestType | undefined)[], total: number }> {
const matchQueries = this._buildMatchQueries(text);
const scriptScore = this._buildScriptScore({
text,
scoreEffect: 0.25,
});

const res = await this.searchRepository.searchPackage({
body: {
size,
from,
query: {
function_score: {
boost_mode: 'replace',
query: {
bool: {
should: matchQueries,
minimum_should_match: matchQueries.length ? 1 : 0,
},
},
script_score: scriptScore,
},
},
},
});
const { hits, total } = res;
return {
objects: hits?.map(item => {
return item._source;
}),
total: (total as estypes.SearchTotalHits).value,
};
}

async removePackage(fullname: string) {
return await this.searchRepository.removePackage(fullname);
}

// https://github.com/npms-io/queries/blob/master/lib/search.js#L8C1-L78C2
private _buildMatchQueries(text: string) {
return [
// Standard match using cross_fields
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.standard^4',
'package.description.standard',
'package.keywords.standard^2',
],
type: 'cross_fields',
boost: 6,
tie_breaker: 0.5,
},
},

// Partial match using edge-ngram
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.edge_ngram^4',
'package.description.edge_ngram',
'package.keywords.edge_ngram^2',
],
type: 'phrase',
slop: 3,
boost: 3,
tie_breaker: 0.5,
},
},

// Normal term match with an english stemmer
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.english_docs^4',
'package.description.english_docs',
'package.keywords.english_docs^2',
],
type: 'cross_fields',
boost: 3,
tie_breaker: 0.5,
},
},

// Normal term match with a more aggressive english stemmer (not so important)
{
multi_match: {
query: text,
operator: 'and',
fields: [
'package.name.english_aggressive_docs^4',
'package.description.english_aggressive_docs',
'package.keywords.english_aggressive_docs^2',
],
type: 'cross_fields',
tie_breaker: 0.5,
},
},
];
}

private _buildScriptScore(params: { text: string | undefined, scoreEffect: number }) {
// keep search simple, only download(popularity)
const downloads = 'doc["downloads.all"].value';
const source = `doc["package.name.raw"].value.equals("${params.text}") ? 100000 + ${downloads} : _score * Math.pow(${downloads}, ${params.scoreEffect})`;
return {
script: {
source,
params: {
text: params.text || '',
scoreEffect: params.scoreEffect,
},
},
};
}
}
1 change: 1 addition & 0 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class PackageSyncerService extends AbstractService {
if (!this.allowSyncDownloadData) {
return;
}

const fullname = pkg.fullname;
const start = '2011-01-01';
const end = this.config.cnpmcore.syncDownloadDataMaxDate;
Expand Down
Loading