Skip to content

Commit

Permalink
fix: use system client to get config
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <fp544037857@gmail.com>
  • Loading branch information
Abirdcfly committed Apr 7, 2024
1 parent f259628 commit c8f9776
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 55 deletions.
2 changes: 1 addition & 1 deletion apiserver/pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func UploadIcon(ctx context.Context, client client.Client, icon, appName, namesp
return "", err
}

system, err := config.GetSystemDatasource(ctx, client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/chat/chat_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req Co
return err
}
// systemDatasource which stores the document
systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/chat/chat_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cs *ChatServer) Storage() storage.Storage {
if cs.storage == nil {
cs.once.Do(func() {
ctx := context.TODO()
ds, err := pkgconfig.GetRelationalDatasource(ctx, cs.cli)
ds, err := pkgconfig.GetRelationalDatasourceWithClient(ctx, cs.cli)
if err != nil || ds == nil {
if err != nil {
klog.Infof("get relational datasource failed: %s, use memory storage for chat", err.Error())
Expand Down
10 changes: 5 additions & 5 deletions apiserver/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +77,7 @@ func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasou
// Embedder and vectorstore are both required when generating a new embedding.That's why we call it a `EmbeddingSuit`
func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Embedder, *v1alpha1.VectorStore, error) {
// get the built-in system embedder
emd, err := config.GetEmbedder(ctx, cli)
emd, err := config.GetEmbedder(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -86,7 +86,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb
return nil, nil, err
}
// get the built-in system vectorstore
vs, err := config.GetVectorStore(ctx, cli)
vs, err := config.GetVectorStore(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -99,8 +99,8 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb

// GetAPIServer returns the api server url to access arcadia's worker
// if external is true,then this func will return the external api server
func GetAPIServer(ctx context.Context, cli client.Client, external bool) (string, error) {
gateway, err := config.GetGateway(ctx, cli)
func GetAPIServer(ctx context.Context, _ client.Client, external bool) (string, error) {
gateway, err := config.GetGateway(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/knowledgebase/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func knowledgebase2model(ctx context.Context, c client.Client, knowledgebase *v1
func CreateKnowledgeBase(ctx context.Context, c client.Client, input generated.CreateKnowledgeBaseInput) (*generated.KnowledgeBase, error) {
var filegroups []v1alpha1.FileGroup
var vectorstore v1alpha1.TypedObjectReference
vector, _ := config.GetVectorStore(ctx, c)
vector, _ := config.GetVectorStore(ctx)
displayname, description, embedder := "", "", ""
if input.DisplayName != nil {
displayname = *input.DisplayName
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/ray/raycluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func ListRayClusters(ctx context.Context, c client.Client, input generated.ListCommonInput) (*generated.PaginatedResult, error) {
clusters, err := config.GetRayClusters(ctx, c)
clusters, err := config.GetRayClusters(ctx)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *RerankRetrieverReconciler) reconcile(ctx context.Context, log logr.Logg
}
}
if instance.Spec.Model == nil {
model, err := config.GetDefaultRerankModel(ctx, r.Client)
model, err := config.GetDefaultRerankModelWithClient(ctx, r.Client)
if err != nil {
instance.Status.SetConditions(instance.Status.ErrorCondition(fmt.Sprintf("no model provided. please set model in reranker or set system default reranking model in config :%s", err))...)
return instance, ctrl.Result{RequeueAfter: 30 * time.Second}, err
Expand Down
5 changes: 3 additions & 2 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
kb *arcadiav1alpha1.KnowledgeBase,
vectorStore *arcadiav1alpha1.VectorStore,
embedder *arcadiav1alpha1.Embedder,
groupIndex, fileIndex int) (err error) {
groupIndex, fileIndex int,
) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to reconcile FileGroup: %w", err)
Expand Down Expand Up @@ -436,7 +437,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (r *ModelReconciler) CheckModel(ctx context.Context, logger logr.Logger, in
// otherwise we consider the model file for the trans-core service to be ready.
if instance.Spec.Source == nil && (instance.Spec.HuggingFaceRepo == "" && instance.Spec.ModelScopeRepo == "") {
logger.V(5).Info(fmt.Sprintf("model %s source is empty, check minio status.", instance.Name))
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *ModelReconciler) RemoveModel(ctx context.Context, logger logr.Logger, i
var ds datasource.Datasource
var info any

system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
klog.Errorf("get system datasource error %s", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, logger logr.
func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) (bool, []v1alpha1.FileStatus, error) {
// TODO: Currently, we think there is only one default minio environment,
// so we get the minio client directly through the configuration.
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return false, nil, err
Expand All @@ -233,7 +233,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log
}

func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) error {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return err
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo
return worker, errors.Wrap(err, "model config datasource, but get it failed.")
}
} else {
datasource, err = config.GetSystemDatasource(ctx, r.Client)
datasource, err = config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
return worker, errors.Wrap(err, "Failed to get system datasource")
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/evaluation/rag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (r *RAGReconciler) WhenJobChanged(job *batchv1.Job) {

func (r *RAGReconciler) RemoveRAGFiles(ctx context.Context, rag *evaluationarcadiav1alpha1.RAG) {
logger := log.FromContext(ctx, "RAG", rag.Name, "Namespace", rag.Namespace, "Action", "DeleteRAGFiles")
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasourceWithClient(ctx, r.Client)
if err != nil {
logger.Error(err, "failed to get system datasource")
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/documentloader/documentloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s
if err := cli.Get(ctx, types.NamespacedName{Namespace: dl.RefNamespace(), Name: dl.Ref.Name}, dl.Instance); err != nil {
return args, fmt.Errorf("can't find the documentloader in cluster: %w", err)
}
system, err := config.GetSystemDatasource(ctx, cli)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/arctl/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var (

func SysatemDatasource(ctx context.Context, kubeClient client.Client) (*basev1alpha1.Datasource, error) {
once.Do(func() {
systemDatasource, systemError = config.GetSystemDatasource(ctx, kubeClient)
systemDatasource, systemError = config.GetSystemDatasourceWithClient(ctx, kubeClient)
})
return systemDatasource, systemError
}
Expand Down
93 changes: 69 additions & 24 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1"
pkgclient "github.com/kubeagi/arcadia/apiserver/pkg/client"
"github.com/kubeagi/arcadia/pkg/datasource"
"github.com/kubeagi/arcadia/pkg/utils"
)
Expand Down Expand Up @@ -57,24 +58,32 @@ func getDatasource(ctx context.Context, ref arcadiav1alpha1.TypedObjectReference
return source, err
}

func GetSystemDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := GetConfig(ctx, c)
func GetSystemDatasourceWithClient(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
return getDatasource(ctx, config.SystemDatasource, c)
}

func GetRelationalDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := GetConfig(ctx, c)
func GetSystemDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) {
return GetSystemDatasourceWithClient(ctx, nil)
}

func GetRelationalDatasourceWithClient(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
return getDatasource(ctx, config.RelationalDatasource, c)
}

func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) {
config, err := GetConfig(ctx, c)
func GetRelationalDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) {
return GetRelationalDatasourceWithClient(ctx, nil)
}

func GetGatewayWithClient(ctx context.Context, c client.Client) (*Gateway, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -83,8 +92,18 @@ func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) {
}
return config.Gateway, nil
}
func GetGateway(ctx context.Context) (*Gateway, error) {
return GetGatewayWithClient(ctx, nil)
}

func GetConfig(ctx context.Context, c client.Client) (config *Config, err error) {
// if c is nil, use system client
func getConfig(ctx context.Context, c client.Client) (config *Config, err error) {
if c == nil {
c, err = pkgclient.GetClient(nil)
if err != nil {
return nil, err
}
}
cmName := env.GetString(EnvConfigKey, EnvConfigDefaultValue)
if cmName == "" {
return nil, ErrNoConfigEnv
Expand All @@ -104,9 +123,9 @@ func GetConfig(ctx context.Context, c client.Client) (config *Config, err error)
return config, nil
}

// GetEmbedder get the default embedder from config
func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
// GetEmbedderWithClient get the default embedder from config
func GetEmbedderWithClient(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -115,10 +134,13 @@ func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedOb
}
return config.Embedder, nil
}
func GetEmbedder(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
return GetEmbedderWithClient(ctx, nil)
}

// GetVectorStore get the default vector store from config
func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
// GetVectorStoreWithClient get the default vector store from config
func GetVectorStoreWithClient(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -128,9 +150,14 @@ func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.Type
return config.VectorStore, nil
}

// Get the configuration of streamlit tool
func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) {
config, err := GetConfig(ctx, c)
// GetVectorStore get the default vector store from config
func GetVectorStore(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
return GetVectorStoreWithClient(ctx, nil)
}

// GetStreamlitWithClient Get the configuration of streamlit tool
func GetStreamlitWithClient(ctx context.Context, c client.Client) (*Streamlit, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -140,9 +167,14 @@ func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) {
return config.Streamlit, nil
}

// Get the ray cluster that can be used a resource pool
func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error) {
config, err := GetConfig(ctx, c)
// Get the configuration of streamlit tool
func GetStreamlit(ctx context.Context) (*Streamlit, error) {
return GetStreamlitWithClient(ctx, nil)
}

// GetRayClustersWithClient Get the ray cluster that can be used a resource pool
func GetRayClustersWithClient(ctx context.Context, c client.Client) ([]RayCluster, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -152,9 +184,13 @@ func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error)
return config.RayClusters, nil
}

// GetDefaultRerankModel gets the default reranking model which is recommended by kubeagi
func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := GetConfig(ctx, c)
func GetRayClusters(ctx context.Context) ([]RayCluster, error) {
return GetRayClustersWithClient(ctx, nil)
}

// GetDefaultRerankModelWithClient gets the default reranking model which is recommended by kubeagi
func GetDefaultRerankModelWithClient(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) {
config, err := getConfig(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -164,8 +200,13 @@ func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alph
return config.Rerank, nil
}

func GetSystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := GetSystemDatasource(ctx, mgrClient)
// GetDefaultRerankModel gets the default reranking model which is recommended by kubeagi
func GetDefaultRerankModel(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) {
return GetDefaultRerankModelWithClient(ctx, nil)
}

func GetSystemDatasourceOSSWithClient(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := GetSystemDatasourceWithClient(ctx, mgrClient)
if err != nil {
return nil, err
}
Expand All @@ -175,3 +216,7 @@ func GetSystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*data
}
return datasource.NewOSS(ctx, mgrClient, endpoint)
}

func GetSystemDatasourceOSS(ctx context.Context) (*datasource.OSS, error) {
return GetSystemDatasourceOSSWithClient(ctx, nil)
}
4 changes: 2 additions & 2 deletions pkg/evaluation/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func PhaseJobName(instance *evav1alpha1.RAG, phase evav1alpha1.RAGPhase) string
}
func systemEmbeddingSuite(ctx context.Context, mgrClient client.Client) (*v1alpha1.Embedder, error) {
// get the built-in system embedder
emd, err := config.GetEmbedder(ctx, mgrClient)
emd, err := config.GetEmbedderWithClient(ctx, mgrClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func JudgeJobGenerator(ctx context.Context, c client.Client) func(*evav1alpha1.R

func UploadJobGenerator(ctx context.Context, client client.Client) func(*evav1alpha1.RAG) (*batchv1.Job, error) {
return func(instance *evav1alpha1.RAG) (*batchv1.Job, error) {
datasource, err := config.GetSystemDatasource(ctx, client)
datasource, err := config.GetSystemDatasourceWithClient(ctx, client)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/langchainwrap/embedder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GetLangchainEmbedder(ctx context.Context, e *v1alpha1.Embedder, c client.Cl
return langchaingoembeddings.NewEmbedder(llm, opts...)
}
case v1alpha1.ProviderTypeWorker:
gateway, err := config.GetGateway(ctx, c)
gateway, err := config.GetGateway(ctx)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit c8f9776

Please sign in to comment.