Skip to content

Commit

Permalink
fix: use system client to get config
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <fp544037857@gmail.com>
  • Loading branch information
Abirdcfly committed Apr 7, 2024
1 parent f259628 commit 7ffacbe
Show file tree
Hide file tree
Showing 24 changed files with 68 additions and 53 deletions.
2 changes: 1 addition & 1 deletion apiserver/pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func UploadIcon(ctx context.Context, client client.Client, icon, appName, namesp
return "", err
}

system, err := config.GetSystemDatasource(ctx, client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/chat/chat_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req Co
return err
}
// systemDatasource which stores the document
systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/chat/chat_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cs *ChatServer) Storage() storage.Storage {
if cs.storage == nil {
cs.once.Do(func() {
ctx := context.TODO()
ds, err := pkgconfig.GetRelationalDatasource(ctx, cs.cli)
ds, err := pkgconfig.GetRelationalDatasource(ctx)
if err != nil || ds == nil {
if err != nil {
klog.Infof("get relational datasource failed: %s, use memory storage for chat", err.Error())
Expand Down
8 changes: 4 additions & 4 deletions apiserver/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +77,7 @@ func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasou
// Embedder and vectorstore are both required when generating a new embedding.That's why we call it a `EmbeddingSuit`
func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Embedder, *v1alpha1.VectorStore, error) {
// get the built-in system embedder
emd, err := config.GetEmbedder(ctx, cli)
emd, err := config.GetEmbedder(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -86,7 +86,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb
return nil, nil, err
}
// get the built-in system vectorstore
vs, err := config.GetVectorStore(ctx, cli)
vs, err := config.GetVectorStore(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -100,7 +100,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb
// GetAPIServer returns the api server url to access arcadia's worker
// if external is true,then this func will return the external api server
func GetAPIServer(ctx context.Context, cli client.Client, external bool) (string, error) {
gateway, err := config.GetGateway(ctx, cli)
gateway, err := config.GetGateway(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/knowledgebase/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func knowledgebase2model(ctx context.Context, c client.Client, knowledgebase *v1
func CreateKnowledgeBase(ctx context.Context, c client.Client, input generated.CreateKnowledgeBaseInput) (*generated.KnowledgeBase, error) {
var filegroups []v1alpha1.FileGroup
var vectorstore v1alpha1.TypedObjectReference
vector, _ := config.GetVectorStore(ctx, c)
vector, _ := config.GetVectorStore(ctx)
displayname, description, embedder := "", "", ""
if input.DisplayName != nil {
displayname = *input.DisplayName
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/ray/raycluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func ListRayClusters(ctx context.Context, c client.Client, input generated.ListCommonInput) (*generated.PaginatedResult, error) {
clusters, err := config.GetRayClusters(ctx, c)
clusters, err := config.GetRayClusters(ctx)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions apiserver/service/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

"github.com/kubeagi/arcadia/apiserver/config"
"github.com/kubeagi/arcadia/apiserver/docs"
"github.com/kubeagi/arcadia/apiserver/pkg/client"
"github.com/kubeagi/arcadia/apiserver/pkg/oidc"
pkgconfig "github.com/kubeagi/arcadia/pkg/config"
)

func Cors() gin.HandlerFunc {
Expand Down Expand Up @@ -55,6 +57,8 @@ func NewServerAndRun(conf config.ServerConfig) {
if conf.EnableOIDC {
oidc.InitOIDCArgs(conf.IssuerURL, conf.MasterURL, conf.ClientSecret, conf.ClientID)
}
systemcli, _ := client.GetClient(nil)
pkgconfig.InitSystemClient(systemcli)

bffGroup := r.Group("/bff")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *RerankRetrieverReconciler) reconcile(ctx context.Context, log logr.Logg
}
}
if instance.Spec.Model == nil {
model, err := config.GetDefaultRerankModel(ctx, r.Client)
model, err := config.GetDefaultRerankModel(ctx)
if err != nil {
instance.Status.SetConditions(instance.Status.ErrorCondition(fmt.Sprintf("no model provided. please set model in reranker or set system default reranking model in config :%s", err))...)
return instance, ctrl.Result{RequeueAfter: 30 * time.Second}, err
Expand Down
5 changes: 3 additions & 2 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
kb *arcadiav1alpha1.KnowledgeBase,
vectorStore *arcadiav1alpha1.VectorStore,
embedder *arcadiav1alpha1.Embedder,
groupIndex, fileIndex int) (err error) {
groupIndex, fileIndex int,
) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to reconcile FileGroup: %w", err)
Expand Down Expand Up @@ -436,7 +437,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (r *ModelReconciler) CheckModel(ctx context.Context, logger logr.Logger, in
// otherwise we consider the model file for the trans-core service to be ready.
if instance.Spec.Source == nil && (instance.Spec.HuggingFaceRepo == "" && instance.Spec.ModelScopeRepo == "") {
logger.V(5).Info(fmt.Sprintf("model %s source is empty, check minio status.", instance.Name))
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *ModelReconciler) RemoveModel(ctx context.Context, logger logr.Logger, i
var ds datasource.Datasource
var info any

system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
klog.Errorf("get system datasource error %s", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, logger logr.
func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) (bool, []v1alpha1.FileStatus, error) {
// TODO: Currently, we think there is only one default minio environment,
// so we get the minio client directly through the configuration.
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return false, nil, err
Expand All @@ -233,7 +233,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log
}

func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) error {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return err
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo
return worker, errors.Wrap(err, "model config datasource, but get it failed.")
}
} else {
datasource, err = config.GetSystemDatasource(ctx, r.Client)
datasource, err = config.GetSystemDatasource(ctx)
if err != nil {
return worker, errors.Wrap(err, "Failed to get system datasource")
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/evaluation/rag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (r *RAGReconciler) WhenJobChanged(job *batchv1.Job) {

func (r *RAGReconciler) RemoveRAGFiles(ctx context.Context, rag *evaluationarcadiav1alpha1.RAG) {
logger := log.FromContext(ctx, "RAG", rag.Name, "Namespace", rag.Namespace, "Action", "DeleteRAGFiles")
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "failed to get system datasource")
return
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func main() {
_ = mgr.AddMetricsExtraHandler("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}

config.InitSystemClient(mgr.GetClient())
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/documentloader/documentloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s
if err := cli.Get(ctx, types.NamespacedName{Namespace: dl.RefNamespace(), Name: dl.Ref.Name}, dl.Instance); err != nil {
return args, fmt.Errorf("can't find the documentloader in cluster: %w", err)
}
system, err := config.GetSystemDatasource(ctx, cli)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/arctl/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var (

func SysatemDatasource(ctx context.Context, kubeClient client.Client) (*basev1alpha1.Datasource, error) {
once.Do(func() {
systemDatasource, systemError = config.GetSystemDatasource(ctx, kubeClient)
systemDatasource, systemError = config.GetSystemDatasource(ctx)
})
return systemDatasource, systemError
}
Expand Down
55 changes: 32 additions & 23 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ var (
ErrNoConfigStreamlit = fmt.Errorf("config Streamlit in comfigmap is not found")
ErrNoConfigRayClusters = fmt.Errorf("config RayClusters in comfigmap is not found")
ErrNoConfigRerank = fmt.Errorf("config rerankDefaultEndpoint in comfigmap is not found")
ErrSystemCliNotFound = fmt.Errorf("systemCli is not found")
)
var systemCli client.Client

func getDatasource(ctx context.Context, ref arcadiav1alpha1.TypedObjectReference, c client.Client) (ds *arcadiav1alpha1.Datasource, err error) {
name := ref.Name
Expand All @@ -57,24 +59,24 @@ func getDatasource(ctx context.Context, ref arcadiav1alpha1.TypedObjectReference
return source, err
}

func GetSystemDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := GetConfig(ctx, c)
func GetSystemDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
return getDatasource(ctx, config.SystemDatasource, c)
return getDatasource(ctx, config.SystemDatasource, systemCli)
}

func GetRelationalDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := GetConfig(ctx, c)
func GetRelationalDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
return getDatasource(ctx, config.RelationalDatasource, c)
return getDatasource(ctx, config.RelationalDatasource, systemCli)
}

func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) {
config, err := GetConfig(ctx, c)
func GetGateway(ctx context.Context) (*Gateway, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -84,14 +86,17 @@ func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) {
return config.Gateway, nil
}

func GetConfig(ctx context.Context, c client.Client) (config *Config, err error) {
func getConfig(ctx context.Context) (config *Config, err error) {
if systemCli == nil {
return nil, ErrSystemCliNotFound
}
cmName := env.GetString(EnvConfigKey, EnvConfigDefaultValue)
if cmName == "" {
return nil, ErrNoConfigEnv
}
cmNamespace := utils.GetCurrentNamespace()
cm := &corev1.ConfigMap{}
if err = c.Get(ctx, client.ObjectKey{Name: cmName, Namespace: cmNamespace}, cm); err != nil {
if err = systemCli.Get(ctx, client.ObjectKey{Name: cmName, Namespace: cmNamespace}, cm); err != nil {
return nil, err
}
value, ok := cm.Data["config"]
Expand All @@ -105,8 +110,8 @@ func GetConfig(ctx context.Context, c client.Client) (config *Config, err error)
}

// GetEmbedder get the default embedder from config
func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
func GetEmbedder(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -117,8 +122,8 @@ func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedOb
}

// GetVectorStore get the default vector store from config
func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
func GetVectorStore(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -129,8 +134,8 @@ func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.Type
}

// Get the configuration of streamlit tool
func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) {
config, err := GetConfig(ctx, c)
func GetStreamlit(ctx context.Context) (*Streamlit, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -141,8 +146,8 @@ func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) {
}

// Get the ray cluster that can be used a resource pool
func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error) {
config, err := GetConfig(ctx, c)
func GetRayClusters(ctx context.Context) ([]RayCluster, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -153,8 +158,8 @@ func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error)
}

// GetDefaultRerankModel gets the default reranking model which is recommended by kubeagi
func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
func GetDefaultRerankModel(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -164,14 +169,18 @@ func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alph
return config.Rerank, nil
}

func GetSystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := GetSystemDatasource(ctx, mgrClient)
func GetSystemDatasourceOSS(ctx context.Context) (*datasource.OSS, error) {
systemDatasource, err := GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
endpoint := systemDatasource.Spec.Endpoint.DeepCopy()
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}
return datasource.NewOSS(ctx, mgrClient, endpoint)
return datasource.NewOSS(ctx, systemCli, endpoint)
}

func InitSystemClient(cli client.Client) {
systemCli = cli
}
4 changes: 2 additions & 2 deletions pkg/evaluation/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func PhaseJobName(instance *evav1alpha1.RAG, phase evav1alpha1.RAGPhase) string
}
func systemEmbeddingSuite(ctx context.Context, mgrClient client.Client) (*v1alpha1.Embedder, error) {
// get the built-in system embedder
emd, err := config.GetEmbedder(ctx, mgrClient)
emd, err := config.GetEmbedder(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func JudgeJobGenerator(ctx context.Context, c client.Client) func(*evav1alpha1.R

func UploadJobGenerator(ctx context.Context, client client.Client) func(*evav1alpha1.RAG) (*batchv1.Job, error) {
return func(instance *evav1alpha1.RAG) (*batchv1.Job, error) {
datasource, err := config.GetSystemDatasource(ctx, client)
datasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/langchainwrap/embedder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GetLangchainEmbedder(ctx context.Context, e *v1alpha1.Embedder, c client.Cl
return langchaingoembeddings.NewEmbedder(llm, opts...)
}
case v1alpha1.ProviderTypeWorker:
gateway, err := config.GetGateway(ctx, c)
gateway, err := config.GetGateway(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/langchainwrap/llm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func GetLangchainLLM(ctx context.Context, llm *v1alpha1.LLM, c client.Client, mo
return googleLLM, nil
}
case v1alpha1.ProviderTypeWorker:
gateway, err := config.GetGateway(ctx, c)
gateway, err := config.GetGateway(ctx)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7ffacbe

Please sign in to comment.