Skip to content

Commit

Permalink
Merge pull request #1 from bcpeinhardt/add_tasks_example
Browse files Browse the repository at this point in the history
parallel letter frequency example in tasks
  • Loading branch information
bcpeinhardt authored May 9, 2024
2 parents af7f29f + 4ddd374 commit 183f864
Show file tree
Hide file tree
Showing 8 changed files with 31,209 additions and 10 deletions.
2 changes: 2 additions & 0 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ gleam_stdlib = "~> 0.34 or ~> 1.0"
gleam_erlang = "~> 0.24"
gleam_otp = "~> 0.9"
prng = "~> 3.0"
simplifile = ">= 1.7.0 and < 2.0.0"
birl = ">= 1.5.0 and < 2.0.0"

[dev-dependencies]
gleeunit = "~> 1.0"
10 changes: 8 additions & 2 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
# You typically do not need to edit this file

packages = [
{ name = "birl", version = "1.5.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "ranger"], otp_app = "birl", source = "hex", outer_checksum = "23BFE5AB0D7D9E4ECC5BB89B7ABDDF8E976D98C65D2E173D116E6AAFBF24E633" },
{ name = "filepath", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "EFB6FF65C98B2A16378ABC3EE2B14124168C0CE5201553DE652E2644DCFDB594" },
{ name = "gleam_bitwise", version = "1.3.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_bitwise", source = "hex", outer_checksum = "B36E1D3188D7F594C7FD4F43D0D2CE17561DE896202017548578B16FE1FE9EFC" },
{ name = "gleam_erlang", version = "0.24.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "26BDB52E61889F56A291CB34167315780EE4AA20961917314446542C90D1C1A0" },
{ name = "gleam_otp", version = "0.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "gleam_erlang"], otp_app = "gleam_otp", source = "hex", outer_checksum = "5FADBBEC5ECF3F8B6BE91101D432758503192AE2ADBAD5602158977341489F71" },
{ name = "gleam_otp", version = "0.9.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "5FADBBEC5ECF3F8B6BE91101D432758503192AE2ADBAD5602158977341489F71" },
{ name = "gleam_stdlib", version = "0.35.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "5443EEB74708454B65650FEBBB1EF5175057D1DEC62AEA9D7C6D96F41DA79152" },
{ name = "gleeunit", version = "1.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D364C87AFEB26BDB4FB8A5ABDE67D635DC9FA52D6AB68416044C35B096C6882D" },
{ name = "prng", version = "3.0.1", build_tools = ["gleam"], requirements = ["gleam_bitwise", "gleam_stdlib"], otp_app = "prng", source = "hex", outer_checksum = "C78A80DE41469A0BB1AB3B0B0610CCE5DB70C5659A540E2E0E6C54FA38134290" },
{ name = "ranger", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "ranger", source = "hex", outer_checksum = "28E615AE7590ED922AF1510DDF606A2ECBBC2A9609AF36D412EDC925F06DFD20" },
{ name = "simplifile", version = "1.7.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "1D5DFA3A2F9319EC85825F6ED88B8E449F381B0D55A62F5E61424E748E7DDEB0" },
]

[requirements]
birl = { version = ">= 1.5.0 and < 2.0.0"}
gleam_erlang = { version = "~> 0.24" }
gleam_otp = { version = "~> 0.9" }
gleam_stdlib = { version = "~> 0.34 or ~> 1.0" }
gleeunit = { version = "~> 1.0" }
prng = { version = "~> 3.0"}
prng = { version = "~> 3.0" }
simplifile = { version = ">= 1.7.0 and < 2.0.0" }
2 changes: 1 addition & 1 deletion src/actors/pantry.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
////
////

import gleam/otp/actor
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import gleam/set.{type Set}

// Below this comment are the public functions that we want to expose to other modules.
Expand Down
4 changes: 2 additions & 2 deletions src/concurrency_primitives.gleam
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import gleam/io
import gleam/erlang/process.{type Subject}
import gleam/string
import gleam/function
import gleam/int
import gleam/io
import gleam/string

pub fn main() {
// A "process" in gleam is a lightweight, concurrent unit of execution.
Expand Down
2 changes: 1 addition & 1 deletion src/supervisors.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
////
//// Back? Great, read on.

import gleam/erlang/process.{type Subject}
import gleam/io
import gleam/otp/supervisor
import gleam/erlang/process.{type Subject}
import supervisors/a_shit_actor as duckduckgoose

pub fn main() {
Expand Down
2 changes: 1 addition & 1 deletion src/supervisors/a_shit_actor.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
//// around the circle. Interestingly in the midwest of the United States the
//// game is often called "Duck, Duck, Grey Duck".)

import gleam/otp/actor
import gleam/erlang/process.{type Subject}
import gleam/function
import gleam/otp/actor
import prng/random

/// Okay, well this is new.
Expand Down
93 changes: 90 additions & 3 deletions src/tasks.gleam
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
//// Tasks are one off processes meant to easily make synchronous work async.
//// They're really straightforward to use, just fire them off and check back later.

import gleam/io
import gleam/otp/task
import gleam/dict.{type Dict}
import gleam/erlang
import gleam/erlang/process
import gleam/list
import gleam/int
import gleam/io
import gleam/list
import gleam/option.{None, Some}
import gleam/otp/task
import gleam/result
import gleam/string
import simplifile
import birl
import birl/duration

pub fn main() {
// Do a thing in a different process
Expand Down Expand Up @@ -78,4 +85,84 @@ pub fn main() {
Ok(val) -> io.println("The 100th item is" <> int.to_string(val))
Error(Nil) -> io.println_error("The list has fewer than 100 items")
}

// Alright, let's do a concurrency hello world example: Parallel Letter Frequency.
// We'll write a function that takes a list of codepoints and counts the frequency each one
// appears. We use a list of codepoints instead of a string because dealing with graphemes
// properly just distracts from the point of this exercise.

use workload <- result.try(simplifile.read("./src/tasks/king_james_bible.txt"))
let workload = string.to_utf_codepoints(workload)

// Doing work concurrently is about finding work that can be split into repeatable
// chunks. Therefore when trying to split work into parts, it's usually a good idea
// to start with the simple linear version then try to reuse it :)

let linear_freq = time("linear frequency", fn() { linear_letter_frequency(workload) })

// Okay, now that that's working, let's split the work into appropriate chunks
// and do the chunks in separate tasks

let parallel_freq = time("parallel frequency", fn() { parallel_letter_frequency(workload, 200_000) })

// Little sanity check
case linear_freq == parallel_freq {
True -> io.println("Our parallel and linear frequency functions produced the same output")
False -> io.println("Our parallel and linear frequency functions produced different output")
}

// Returning an OK because we used result.try
Ok(Nil)
}

// This is our base linear implementation for comparison. Hopefully it makes sense.
// We fold over the list and for each codepoint we increment it's value in the list.
fn linear_letter_frequency(input: List(UtfCodepoint)) -> Dict(UtfCodepoint, Int) {
use acc, letter <- list.fold(input, dict.new())
use entry <- dict.update(acc, update: letter)
case entry {
Some(n) -> n + 1
None -> 1
}
}

// This is our parallel/concurrent implementation
// (If your computer has multiple cores, Erlang should automagically use them and make
// this properly parallel)
fn parallel_letter_frequency(
input: List(UtfCodepoint),
chunk_size: Int,
) -> Dict(UtfCodepoint, Int) {

// Create chunks of work and pass them to separate tasks to be worked on
let handles = list.map(list.sized_chunk(input, chunk_size), fn(chunk) {
task.async(fn() { linear_letter_frequency(chunk) })
})

// Fold over the handles to the tasks to await their results
use total_freq, partial_freq_handle <- list.fold(handles, dict.new())
let partial_freq = task.await(partial_freq_handle, 1000)

// Merge the results into a single structure as they come in.
// We fold over the partial mapping we got back from the task
// and update the total count with it.
//
// Notice we do this inside the fold of the tasks. We don't want to
// await the next task until we're out of work to do.
use total_freq, letter, count <- dict.fold(partial_freq, total_freq)
use entry <- dict.update(total_freq, letter)
case entry {
Some(old_count) -> old_count + count
None -> count
}
}

// This is just a little timer function to help use see the results of our work.
fn time(name: String, f: fn() -> a) -> a {
let start = birl.now()
let x = f()
let end = birl.now()
let difference = birl.difference(end, start) |> duration.blur_to(duration.MilliSecond)
io.println(name <> " took: " <> int.to_string(difference) <> "ms")
x
}
Loading

0 comments on commit 183f864

Please sign in to comment.