Skip to content

Commit

Permalink
runtimes/js: worker pooling (#1652)
Browse files Browse the repository at this point in the history
  • Loading branch information
eandre authored Dec 18, 2024
1 parent 821c94d commit f39a609
Show file tree
Hide file tree
Showing 38 changed files with 716 additions and 311 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/daemon/apps/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ func (i *Instance) Lang() appfile.Lang {
return appFile.Lang
}

func (i *Instance) AppFile() (*appfile.File, error) {
return appfile.ParseFile(filepath.Join(i.root, appfile.Name))
}

func (i *Instance) BuildSettings() (appfile.Build, error) {
appFile, err := appfile.ParseFile(filepath.Join(i.root, appfile.Name))
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions cli/daemon/run/runtime_config2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type RuntimeConfigGenerator struct {
PlatformID() string
PlatformOrLocalID() string
GlobalCORS() (appfile.CORS, error)
AppFile() (*appfile.File, error)
BuildSettings() (appfile.Build, error)
}

// The infra manager to use
Expand Down Expand Up @@ -139,6 +141,23 @@ func (g *RuntimeConfigGenerator) initialize() error {
})
}

appFile, err := g.app.AppFile()
if err != nil {
return errors.Wrap(err, "failed to get app's build settings")
}
for _, svc := range g.md.Svcs {
cfg := &runtimev1.HostedService{
Name: svc.Name,
LogConfig: ptrOrNil(appFile.LogLevel),
}

if appFile.Build.WorkerPooling {
n := int32(0)
cfg.WorkerThreads = &n
}
g.conf.ServiceConfig(cfg)
}

g.conf.AuthMethods([]*runtimev1.ServiceAuth{
{
AuthMethod: &runtimev1.ServiceAuth_EncoreAuth_{
Expand Down
5 changes: 5 additions & 0 deletions docs/menu.cue
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,11 @@
text: "Middleware"
path: "/ts/develop/middleware"
file: "ts/develop/middleware"
}, {
kind: "basic"
text: "Multithreading"
path: "/ts/develop/multithreading"
file: "ts/develop/multithreading"
}]
},
{
Expand Down
13 changes: 6 additions & 7 deletions docs/ts/develop/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ seotitle: Using Middleware in your Encore.ts application
seodesc: See how you can use middleware in your Encore.ts application to handle cross-cutting generic functionality, like request logging, auth, or tracing.
title: Middleware
subtitle: Handling cross-cutting, generic functionality
lang: lang
lang: ts
---

Middleware is a way to write reusable code that runs before, after, or both before and after
Expand All @@ -21,14 +21,14 @@ However, when developing applications there's often some use cases where it can
reusable functionality that applies to multiple API endpoints, and middleware
is a good solution for this.

Encore provides built-in support for middleware by adding functions to the
Encore provides built-in support for middleware by adding functions to the
[Service definitions](/docs/ts/primitives/services) configuration.
Each middleware can be configured with a `target` option to specify what
API endpoints it applies to.

<GitHubLink
href="https://github.com/encoredev/examples/tree/main/ts/middleware"
desc="Example app with two middleware; a rate limiter and one for user authorization."
<GitHubLink
href="https://github.com/encoredev/examples/tree/main/ts/middleware"
desc="Example app with two middleware; a rate limiter and one for user authorization."
/>

## Middleware functions
Expand Down Expand Up @@ -94,6 +94,5 @@ export default new Service("myService", {

The target option specifies which endpoints within the service the middleware should run on. If not set, the middleware will run for all endpoints by default.

For better performance, use the `target` option instead of filtering within the middleware function.
For better performance, use the `target` option instead of filtering within the middleware function.
This enables calculating applicable middleware per endpoint during startup, reducing runtime overhead.

40 changes: 40 additions & 0 deletions docs/ts/develop/multithreading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
seotitle: Multithreading in Encore.ts
seodesc: See how Encore.ts provides true multithreading for JavaScript applications, and how to enable Worker Pooling for CPU-intensive workloads.
title: Multithreading
subtitle: True multithreading for JavaScript applications
lang: ts
---

Encore.ts runs using a high-performance Rust runtime that uses multiple threads to handle incoming requests.
The Encore.ts Rust runtime handles virtually everything outside of your core business logic:

* Parsing and validating incoming requests
* Making API calls to other services
* Serializing and writing API responses
* Observability integrations like distributed tracing
* Infrastructure integrations, like executing database queries, reading and writing from object storage, publishing and consuming messages from Pub/Sub, and more

This architecture allows for much higher performance and scalability compared to traditional JavaScript frameworks.
By offloading most of this to multithreaded Rust, the single-threaded JavaScript event loop becomes free to focus on executing your core business logic.

But for more CPU-intensive workloads, the single-threaded JavaScript event loop can still become a performance bottleneck.
For these use cases Encore.ts offers Worker Pooling. With Worker Pooling enabled, Encore.ts starts up multiple NodeJS event loops
and load-balances incoming requests across them. This can provide a significant performance boost for CPU-intensive workloads.

<img src="https://encore.dev/assets/blog/worker-pooling/encore-pooling.png" />

## Enabling Worker Pooling

To enable Worker Pooling, add `"build": {"worker_pooling": true}` to your `encore.app` file.

## Designing your application to work with Worker Pooling

Most application code will work with Worker Pooling without any changes. However, it's important to understand
the implications of running in a multi-threaded environment.

When utilizing Worker Pooling, Encore.ts will automatically spin up multiple NodeJS isolates (one per CPU) to handle incoming requests.
Each NodeJS isolate is a separate JavaScript runtime, with its own event loop and memory space.

This means that you cannot rely on global shared state that is shared across all incoming requests,
since each request may be handled by a different NodeJS isolate.
7 changes: 7 additions & 0 deletions pkg/appfile/appfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type File struct {
//
// Deprecated: Use build.docker.base_image instead.
DockerBaseImage string `json:"docker_base_image,omitempty"`

// LogLevel is the minimum log level for the app.
// If empty it defaults to "trace".
LogLevel string `json:"log_level,omitempty"`
}

type Build struct {
Expand All @@ -69,6 +73,9 @@ type Build struct {
// Docker configures the docker images built
// by Encore's CI/CD system.
Docker Docker `json:"docker,omitempty"`

// WorkerPooling enables worker pooling for Encore.ts.
WorkerPooling bool `json:"worker_pooling,omitempty"`
}

type Docker struct {
Expand Down
34 changes: 26 additions & 8 deletions pkg/rtconfgen/base_builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package rtconfgen

import (
"cmp"
"fmt"
"slices"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -41,6 +43,7 @@ type Builder struct {
defaultDeployedAt time.Time

deployments map[string]*Deployment
services map[string]*runtimev1.HostedService
}

func NewBuilder() *Builder {
Expand All @@ -50,6 +53,7 @@ func NewBuilder() *Builder {
rs: rs,
obs: &runtimev1.Observability{},
deployments: make(map[string]*Deployment),
services: make(map[string]*runtimev1.HostedService),
}

return b
Expand Down Expand Up @@ -109,6 +113,10 @@ func (b *Builder) LogsProviderFn(rid string, fn func() *runtimev1.LogsProvider)
addResFunc(&b.obs.Logs, b.rs, rid, fn)
}

func (b *Builder) ServiceConfig(svc *runtimev1.HostedService) {
b.services[svc.Name] = svc
}

func (b *Builder) Deployment(rid string) *Deployment {
if d, ok := b.deployments[rid]; ok {
return d
Expand Down Expand Up @@ -138,8 +146,8 @@ type Deployment struct {
// The base URL for reaching this deployment from another service.
svc2svcBaseURL string

hostedGateways []string
hostedServices []string
hostedGateways []string
hostedServiceNames []string
}

// DeployID sets the deploy id.
Expand All @@ -162,7 +170,7 @@ func (d *Deployment) DynamicExperiments(experiments []string) *Deployment {
// HostsServices adds the given service names as being hosted by this deployment.
// It appends and doesn't overwrite any existing hosted services.
func (d *Deployment) HostsServices(names ...string) *Deployment {
d.hostedServices = append(d.hostedServices, names...)
d.hostedServiceNames = append(d.hostedServiceNames, names...)
return d
}

Expand Down Expand Up @@ -198,17 +206,27 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
return nil, err
}
if reduced, ok := d.reduceWith.Get(); ok {
infra = reduceForServices(infra, reduced, d.hostedServices)
infra = reduceForServices(infra, reduced, d.hostedServiceNames)
}

graceful := d.gracefulShutdown.GetOrElse(d.b.defaultGracefulShutdown)

var hostedServices []*runtimev1.HostedService
for _, svcName := range d.hostedServices {
hostedServices = append(hostedServices, &runtimev1.HostedService{
Name: svcName,
})
{
for _, svcName := range d.hostedServiceNames {
// If we have a service config defined for this service, use it.
cfg := b.services[svcName]
if cfg == nil {
cfg = &runtimev1.HostedService{
Name: svcName,
}
}
hostedServices = append(hostedServices, cfg)
}
}
slices.SortFunc(hostedServices, func(a, b *runtimev1.HostedService) int {
return cmp.Compare(a.Name, b.Name)
})

gatewaysByName := make(map[string]*runtimev1.Gateway)
for _, gw := range infra.Resources.Gateways {
Expand Down
41 changes: 29 additions & 12 deletions pkg/rtconfgen/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"

"github.com/cockroachdb/errors"
"github.com/rs/zerolog"

"go.encore.dev/platform-sdk/pkg/auth"

Expand Down Expand Up @@ -55,14 +56,14 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
CORS: &config.CORS{},
}

// Compute handling.
// Deployment handling.
{
compute := c.in.Deployment
cfg.HostedServices = fns.Map(compute.HostedServices, func(s *runtimev1.HostedService) string {
deployment := c.in.Deployment
cfg.HostedServices = fns.Map(deployment.HostedServices, func(s *runtimev1.HostedService) string {
return s.Name
})

cfg.ServiceAuth = fns.Map(compute.AuthMethods, func(sa *runtimev1.ServiceAuth) config.ServiceAuth {
cfg.ServiceAuth = fns.Map(deployment.AuthMethods, func(sa *runtimev1.ServiceAuth) config.ServiceAuth {
switch sa.AuthMethod.(type) {
case *runtimev1.ServiceAuth_EncoreAuth_:
return config.ServiceAuth{Method: "encore-auth"}
Expand All @@ -71,7 +72,7 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
})

cfg.ServiceDiscovery = make(map[string]config.Service)
for key, value := range compute.ServiceDiscovery.Services {
for key, value := range deployment.ServiceDiscovery.Services {
method := config.ServiceAuth{Method: "noop"}
if len(value.AuthMethods) > 0 {
if _, ok := value.AuthMethods[0].AuthMethod.(*runtimev1.ServiceAuth_EncoreAuth_); ok {
Expand All @@ -86,22 +87,22 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
}
}

if compute.GracefulShutdown != nil {
if deployment.GracefulShutdown != nil {
cfg.GracefulShutdown = &config.GracefulShutdownTimings{
Total: ptr(compute.GracefulShutdown.Total.AsDuration()),
ShutdownHooks: ptr(compute.GracefulShutdown.ShutdownHooks.AsDuration()),
Handlers: ptr(compute.GracefulShutdown.Handlers.AsDuration()),
Total: ptr(deployment.GracefulShutdown.Total.AsDuration()),
ShutdownHooks: ptr(deployment.GracefulShutdown.ShutdownHooks.AsDuration()),
Handlers: ptr(deployment.GracefulShutdown.Handlers.AsDuration()),
}
cfg.ShutdownTimeout = compute.GracefulShutdown.Total.AsDuration()
cfg.ShutdownTimeout = deployment.GracefulShutdown.Total.AsDuration()
}
cfg.DynamicExperiments = compute.DynamicExperiments
cfg.DynamicExperiments = deployment.DynamicExperiments

// Set the API Base URL if we have a gateway.
if len(c.in.Infra.Resources.Gateways) > 0 {
cfg.APIBaseURL = c.in.Infra.Resources.Gateways[0].BaseUrl
}

for _, gwRID := range compute.HostedGateways {
for _, gwRID := range deployment.HostedGateways {
idx := slices.IndexFunc(c.in.Infra.Resources.Gateways, func(gw *runtimev1.Gateway) bool {
return gw.Rid == gwRID
})
Expand Down Expand Up @@ -133,6 +134,22 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
})
}
}

// Use the most verbose logging requested.
currLevel := zerolog.PanicLevel
foundLevel := false
for _, svc := range deployment.HostedServices {
if svc.LogConfig != nil {
if level, err := zerolog.ParseLevel(*svc.LogConfig); err == nil && level < currLevel {
currLevel = level
foundLevel = true
}
}
}
if !foundLevel {
currLevel = zerolog.TraceLevel
}
cfg.LogConfig = currLevel.String()
}

// Infrastructure handling.
Expand Down
Loading

0 comments on commit f39a609

Please sign in to comment.