Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactored the compute package into several interfaces #386

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions wren/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,19 @@ packages:
MaterializationClient:
OwnerClient:
PrepareJobClient:
github.com/kaskada-ai/kaskada/wren/compute:
config:
dir: "{{.InterfaceDir}}"
interfaces:
CompileManager:
ComputeManager:
FileManager:
MaterializationManager:
PrepareManager:
github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha:
config:
dir: "{{.InterfaceDir}}"
interfaces:
ComputeServiceClient:
FileServiceClient:
PreparationServiceClient:
18 changes: 12 additions & 6 deletions wren/client/compute_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@ import (
"github.com/rs/zerolog/log"
)

type ComputeClients interface {
NewFileServiceClient(ctx context.Context) FileServiceClient
NewPrepareServiceClient(ctx context.Context) PrepareServiceClient
NewComputeServiceClient(ctx context.Context) ComputeServiceClient
}

// ComputeClients is the container to hold client for communicating with compute services
type ComputeClients struct {
type computeClients struct {
fileServiceConfig *HostConfig
prepareServiceConfig *HostConfig
computeServiceConfig *HostConfig
}

// CreateComputeClients initializes the computeClients
func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) *ComputeClients {
return &ComputeClients{
func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) ComputeClients {
return &computeClients{
fileServiceConfig: fileServiceConfig,
prepareServiceConfig: prepareServiceConfig,
computeServiceConfig: computeServiceConfig,
}
}

// FileServiceClient creates a new FileServiceClient from the configuration and context
func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClient {
func (c *computeClients) NewFileServiceClient(ctx context.Context) FileServiceClient {
conn, err := connection(ctx, c.fileServiceConfig)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.fileServiceConfig).Msg("unable to dial FileServiceClient")
Expand All @@ -35,7 +41,7 @@ func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClien
}

// PrepareServiceClient creates a new PrepareServiceClient from the configuration and context
func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServiceClient {
func (c *computeClients) NewPrepareServiceClient(ctx context.Context) PrepareServiceClient {
conn, err := connection(ctx, c.prepareServiceConfig)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.prepareServiceConfig).Msg("unable to dial PrepareServiceClient")
Expand All @@ -47,7 +53,7 @@ func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServic
}

// ComputeServiceClient creates a new ComputeServiceClient from the configuration and context
func (c *ComputeClients) ComputeServiceClient(ctx context.Context) ComputeServiceClient {
func (c *computeClients) NewComputeServiceClient(ctx context.Context) ComputeServiceClient {
conn, err := connection(ctx, c.computeServiceConfig)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.computeServiceConfig).Msg("unable to dial ComputeServiceClient")
Expand Down
274 changes: 274 additions & 0 deletions wren/compute/compile_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package compute

import (
"context"
"fmt"
"strings"
"time"

v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha"
v2alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v2alpha"
"github.com/kaskada-ai/kaskada/wren/client"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/internal"
"github.com/rs/zerolog/log"
"golang.org/x/exp/maps"
)

type compileRequest struct {
Expression string
Views []*v1alpha.WithView
SliceRequest *v1alpha.SliceRequest
ResultBehavior v1alpha.Query_ResultBehavior
}

type compileOptions struct {
IsFormula bool
IsExperimental bool
}

type CompileManager interface {
CompileEntMaterialization(ctx context.Context, owner *ent.Owner, materialization *ent.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error)
CompileV1Materialization(ctx context.Context, owner *ent.Owner, materialization *v1alpha.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error)
CompileV1Query(ctx context.Context, owner *ent.Owner, query *v1alpha.Query, queryOptions *v1alpha.QueryOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error)
CompileV2Query(ctx context.Context, owner *ent.Owner, expression string, views []*v2alpha.QueryView, queryConfig *v2alpha.QueryConfig) (*v1alpha.CompileResponse, []*v1alpha.View, error)
CompileV1View(ctx context.Context, owner *ent.Owner, view *v1alpha.View) (*v1alpha.CompileResponse, error)
}

type compileManager struct {
computeClients client.ComputeClients
kaskadaTableClient internal.KaskadaTableClient
kaskadaViewClient internal.KaskadaViewClient
}

func NewCompileManager(computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient) CompileManager {
return &compileManager{
computeClients: *computeClients,
kaskadaTableClient: *kaskadaTableClient,
kaskadaViewClient: *kaskadaViewClient,
}
}

func (m *compileManager) CompileEntMaterialization(ctx context.Context, owner *ent.Owner, materialization *ent.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) {
compileRequest := &compileRequest{
Expression: materialization.Expression,
Views: materialization.WithViews.Views,
SliceRequest: materialization.SliceRequest,
ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS,
}

compileOptions := &compileOptions{
IsFormula: false,
IsExperimental: false,
}
return m.compile(ctx, owner, compileRequest, compileOptions)
}

func (m *compileManager) CompileV1Materialization(ctx context.Context, owner *ent.Owner, materialization *v1alpha.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) {
compileRequest := &compileRequest{
Expression: materialization.Expression,
Views: materialization.WithViews,
SliceRequest: materialization.Slice,
ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS,
}

compileOptions := &compileOptions{
IsFormula: false,
IsExperimental: false,
}
return m.compile(ctx, owner, compileRequest, compileOptions)
}

func (m *compileManager) CompileV1Query(ctx context.Context, owner *ent.Owner, query *v1alpha.Query, queryOptions *v1alpha.QueryOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error) {
compileRequest := &compileRequest{
Expression: query.Expression,
Views: []*v1alpha.WithView{},
SliceRequest: query.Slice,
ResultBehavior: query.ResultBehavior,
}

compileOptions := &compileOptions{
IsFormula: false,
IsExperimental: queryOptions != nil && queryOptions.ExperimentalFeatures,
}

return m.compile(ctx, owner, compileRequest, compileOptions)
}

func (m *compileManager) CompileV2Query(ctx context.Context, owner *ent.Owner, expression string, views []*v2alpha.QueryView, queryConfig *v2alpha.QueryConfig) (*v1alpha.CompileResponse, []*v1alpha.View, error) {

compileRequest := &compileRequest{
Expression: expression,
Views: make([]*v1alpha.WithView, len(views)),
SliceRequest: queryConfig.Slice,
}

for i, view := range views {
compileRequest.Views[i] = &v1alpha.WithView{
Name: view.ViewName,
Expression: view.Expression,
}
}

switch queryConfig.ResultBehavior.ResultBehavior.(type) {
case *v2alpha.ResultBehavior_AllResults:
compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS
case *v2alpha.ResultBehavior_FinalResults:
compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS
case *v2alpha.ResultBehavior_FinalResultsAtTime:
compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME
default:
subLogger := log.Ctx(ctx).With().Str("method", "compileManager.CompileV2Query").Logger()
subLogger.Error().Str("resultBehavior", fmt.Sprintf("%T", queryConfig.ResultBehavior.ResultBehavior)).Msg("unexpected resultBehavior")
return nil, nil, fmt.Errorf("unexpected resultBehavior: %T", queryConfig.ResultBehavior.ResultBehavior)
}

incrementalQueryExperiment := false
for _, experimentalFeature := range queryConfig.ExperimentalFeatures {
switch {
case strings.EqualFold("incremental", experimentalFeature):
incrementalQueryExperiment = true
}
}

compileOptions := &compileOptions{
IsFormula: false,
IsExperimental: incrementalQueryExperiment,
}
return m.compile(ctx, owner, compileRequest, compileOptions)
}

func (m *compileManager) CompileV1View(ctx context.Context, owner *ent.Owner, view *v1alpha.View) (*v1alpha.CompileResponse, error) {
compileRequest := &compileRequest{
Expression: view.Expression,
Views: []*v1alpha.WithView{},
ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS,
}

compileOptions := &compileOptions{
IsFormula: true,
IsExperimental: false,
}

compileResponse, _, err := m.compile(ctx, owner, compileRequest, compileOptions)
return compileResponse, err
}

func (m *compileManager) compile(ctx context.Context, owner *ent.Owner, request *compileRequest, options *compileOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error) {
subLogger := log.Ctx(ctx).With().Str("method", "compileManager.compileQuery").Logger()

var perEntityBehavior v1alpha.PerEntityBehavior
switch request.ResultBehavior {
case v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED:
perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_UNSPECIFIED
case v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS:
perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_ALL
case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS:
perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL
case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME:
perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL_AT_TIME
default:
subLogger.Error().Str("resultBehavior", request.ResultBehavior.String()).Msg("unexpected resultBehavior")
return nil, nil, fmt.Errorf("unexpected resultBehavior: %s", request.ResultBehavior.String())
}

formulaMap, err := m.getFormulaMap(ctx, owner, request.Views)
if err != nil {
subLogger.Error().Err(err).Msg("issue getting formulas")
return nil, nil, err
}

computeTables := []*v1alpha.ComputeTable{}
kaskadaTables, err := m.kaskadaTableClient.GetAllKaskadaTables(ctx, owner)
if err != nil {
subLogger.Error().Err(err).Msg("error getting all tables")
return nil, nil, err
}

for _, kaskadaTable := range kaskadaTables {
// if merged schema not set, table still contains no data
if kaskadaTable.MergedSchema != nil {
computeTables = append(computeTables, convertKaskadaTableToComputeTable(kaskadaTable))
}
}

compileRequest := &v1alpha.CompileRequest{
Experimental: options.IsExperimental,
FeatureSet: &v1alpha.FeatureSet{
Formulas: maps.Values(formulaMap),
Query: request.Expression,
},
PerEntityBehavior: perEntityBehavior,
SliceRequest: request.SliceRequest,
Tables: computeTables,
}

if options.IsFormula {
compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_FORMULA
} else {
compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE
}

computeClient := m.computeClients.NewComputeServiceClient(ctx)
defer computeClient.Close()

subLogger.Info().Interface("request", compileRequest).Msg("sending compile request")
compileTimeoutCtx, compileTimeoutCancel := context.WithTimeout(ctx, time.Second*compileTimeoutSeconds)
defer compileTimeoutCancel()

compileResponse, err := computeClient.Compile(compileTimeoutCtx, compileRequest)
subLogger.Info().Err(err).
Interface("fenl_diagnostics", compileResponse.FenlDiagnostics).
Bool("incremental_enabled", compileResponse.IncrementalEnabled).
Strs("free_names", compileResponse.FreeNames).
Strs("missing_names", compileResponse.MissingNames).
Interface("plan_hash", compileResponse.PlanHash).
Interface("result_type", compileResponse.ResultType).
Interface("slices", compileResponse.TableSlices).Msg("received compile response")
if err != nil {
return nil, nil, err
}

views := []*v1alpha.View{}
for _, freeName := range compileResponse.FreeNames {
if formula, ok := formulaMap[freeName]; ok {
views = append(views, &v1alpha.View{
ViewName: formula.Name,
Expression: formula.Formula,
})
}
}

return compileResponse, views, nil
}

// returns map of formulaName to formula, including all persisted views in the owner, and all requested views
// if a requestView and a persisted view have the same name, the requestView will be used
func (m *compileManager) getFormulaMap(ctx context.Context, owner *ent.Owner, requestViews []*v1alpha.WithView) (map[string]*v1alpha.Formula, error) {
subLogger := log.Ctx(ctx).With().Str("method", "manager.getFormulas").Logger()
persistedViews, err := m.kaskadaViewClient.GetAllKaskadaViews(ctx, owner)
if err != nil {
subLogger.Error().Err(err).Msg("issue getting persisted views")
return nil, err
}

formulaMap := map[string]*v1alpha.Formula{}

for _, persistedView := range persistedViews {
formulaMap[persistedView.Name] = &v1alpha.Formula{
Name: persistedView.Name,
Formula: persistedView.Expression,
SourceLocation: fmt.Sprintf("Persisted View: %s", persistedView.Name),
}
}

for _, requestView := range requestViews {
formulaMap[requestView.Name] = &v1alpha.Formula{
Name: requestView.Name,
Formula: requestView.Expression,
SourceLocation: fmt.Sprintf("Requested View: %s", requestView.Name),
}
}

return formulaMap, nil
}
Loading