Go é fundamentada no modelo CSP (Communicating sequential processes) proposto por Tony Hoare. Neste modelo os dados são compartilhados enviando mensagens através de canais.
As explicações e exemplos são altamente inspiradas na apresentação do @andrebq.
Uma outra influência é a artigo sobre pipelines e cancelamento em go.
Aqui serão apresentados alguns padrões de concorrência, porem sugiro também a leitura sobre context, select, canais com buffer e outros mecanismos de controle de concorrencia.
Canais (channels) são uma estrutura primitiva na linguagem, e você pode utilizá-los para envio e recebimento de valores entre rotinas (goroutines). Os valores podem ser de qualquer tipo, inclusive do tipo canal.
Um canal é um ponto de sincronização entre goroutines. Uma goroutine vai ficar bloqueada escrevendo em um canal até que aquele canal seja lido.
Ler de um canal é semelhante, uma goroutine vai ficar bloqueada lendo até que um valor seja enviado para o canal ou o canal seja fechado (quando isso ocorre, o valor zero do tipo é retornado).
Um canal pode ser fechado. Isso é útil para indicar que nenhum outro valor será escrito no canal.
Ler um canal fechado retorna um valor zero do tipo do canal.
Escrever em um canal fechado retorna um erro (panic).
O primeiro exemplo mostra como criar um canal, que será utilizado como ponte entre a aplicação principal e uma goroutine.
O programa principal fica bloqueado até que a mensagem "Olá mundo" seja enviada para o canal.
Quando isto ocorre, a goroutine é desbloqueada e a mensagem é exibida.
Quando o programa principal termina, a goroutine é também terminada.
package main
import "fmt"
func main() {
canal := make(chan string)
go func() {
canal <- "Olá, mundo!"
}()
fmt.Println(<-canal)
}
Geradores são funções que iniciam uma goroutine para escrever uma lista de valores em um canal que é retornado para quem acionou a função.
No exemplo uma sequência de números inteiros é gerada e enviada para um canal.
A função principal (main) irá realizar a leitura do canal e imprimir os valores. Essa é uma característica interessante sobre canais, quando utilizados com o range, a iteração continuará até que o canal seja fechado.
package main
import "fmt"
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func main() {
valores := sequenciaNumeros(1, 1000)
for valor := range valores {
fmt.Printf("valor: %v\n", valor)
}
}
Um trabalhador é uma goroutine que recebe valores de um canal e os processa.
No exemplo valores inteiros são enviados pela função principal (main) através do canal de entrada e processados por um trabalhador.
É possível criar vários trabalhadores para processarem um mesmo canal.
package main
import "fmt"
func trabalhador(entrada <-chan int) {
for valor := range entrada {
fmt.Printf("valor: %v\n", valor)
}
}
func main() {
entrada := make(chan int)
// Um trabalhador é iniciado e aguarda por valores no canal de entrada
go trabalhador(entrada)
for i := 0; i < 10; i++ {
entrada <- i
}
// Após ter enviado todos os valores, fecha o canal de entrada
// avisando ao trabalhador que o trabalho terminou
close(entrada)
}
Uma coleção de goroutines que ficam esperando tarefas serem atribuídas a elas. Quando a goroutine finaliza a tarefa que foi atribuída, se torna disponível novamente para execução de uma nova tarefa.
No exemplo, um grupo de n trabalhadores aguardam a chegada de valores pelo canal de entrada. Cada trabalhador executa seu processmento e envia o resultado por um canal.
O tipo sync.WaitGroup fornece uma maneira simples de organizar o grupo de trabalhadores.
package main
import (
"fmt"
"sync"
)
func trabalhador(id int, entrada <-chan int, saida chan<- int, grupo *sync.WaitGroup) {
for valor := range entrada {
fmt.Printf("id: %d processou valor: %v\n", id, valor)
saida <- valor * 2
}
grupo.Done()
}
func grupoDeTrabalhadores(entrada <-chan int, nTrabalhadores int) chan int {
saida := make(chan int)
var wg sync.WaitGroup
for i := 0; i < nTrabalhadores; i++ {
go trabalhador(i+1, entrada, saida, &wg)
}
wg.Add(nTrabalhadores)
go func() {
// Quando todos os trabalhadores estiverem terminado
// informa que o grupo não vai mais enviar resultados
wg.Wait()
close(saida)
}()
return saida
}
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func main() {
// Produz uma sequência de 10 valores
entrada := sequenciaNumeros(1, 10)
// Um grupo de trabalhadores irá processar esses números
saida := grupoDeTrabalhadores(entrada, 2)
// somente termina quando todo o trabalho for processado
for s := range saida {
fmt.Println(s)
}
}
Um pipeline trabalha recebendo valores de um canal e escrevendo em outro canal, normalmente após realizar alguma tranformação no valor.
No exemplo temos a função dobroFloat
atuando como um pipeline, que irá receber os valores enviados ao canal de entrada retornando os valores transformados.
Um canal pode ser definido como sendo apenas para leitura (<-chan
) ou apenas para escrita (chan<-
).
Os valores gerados pelo gerador sequenciaNumeros
são enviados para o canal de entrada do pipeline e seu valor transformado recebido pelo canal de saída na função principal e é impresso.
Vários pipelines poderiam ser encadeados para realizar múltiplas transformações.
package main
import "fmt"
func dobroFloat(entrada <-chan int) <-chan float64 {
saida := make(chan float64)
go func() {
for valor := range entrada {
saida <- float64(valor) * 2
}
// Após ter terminado de transformar os valores de entrada,
// fecha o canal de saida
close(saida)
}()
return saida
}
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func main() {
for valor := range dobroFloat(sequenciaNumeros(1, 10)) {
fmt.Printf("valor: %v\n", valor)
}
}
Um fan-in copia dados de múltiplos canais de entrada e escreve em um único canal de saída. Normalmente um fan-in só termina quando todos os canais de entrada são fechados.
A função fan-in pode receber vários canais entrada através de parâmetros múltiplos.
No exemplo abaixo, enviamos vários geradores como entrada para a função fan-in e nos é retornado um único canal de saída. Internamente, uma goroutine é criada para ler os valores de cada canal de entrada, porém todas escrevem no mesmo canal de saída.
Envio de mensagem em um canal fechado causa um erro (panic), por isso é importante garantir que todos os canais de entrada estejam fechados antes de fechar o canal de saída. O tipo sync.WaitGroup fornece uma maneira simples de organizar essa sincronização.
Repare que temos uma goroutine que aguarda um sinal indicando que todas as todas entradas foram consumidas (wg.Wait), finalizando assim o canal de saída.
package main
import (
"fmt"
"sync"
)
func fanin(entradas ...<-chan int) <-chan int {
var wg sync.WaitGroup
// canal de saída que será compartilhado entre os canais de entrada
saida := make(chan int)
// lê os valores de cada canal de entrada e envia para o canal de saída
// quando todos os valores forem lidos, envia sinal avisando que terminou
enviarSaida := func(c <-chan int) {
for n := range c {
saida <- n
}
// aviso que terminou de ler os valores de um canal
wg.Done()
}
wg.Add(len(entradas))
// Inicializa uma goroutine de saída para cada canal de entrada em canais_entrada.
for _, c := range entradas {
go enviarSaida(c)
}
// Inicia uma goroutine para fechar o canal de saída quando todas as
// goroutines de entrada terminarem.
// isto deve ser feito após o wg.Add
go func() {
wg.Wait()
close(saida)
}()
return saida
}
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func main() {
canal := fanin(
sequenciaNumeros(1, 10),
sequenciaNumeros(11, 20),
sequenciaNumeros(21, 30),
)
for valor := range canal {
fmt.Printf("valor: %v\n", valor)
}
}
Um fan-out copia dados de um canal de entrada para múltiplos canais de saída.
No exemplo uma sequência de números é gerada e enviada para múltiplos canais de saída. Estes canais possuem seus respectivos trabalhadores que irão fazer o processamento do valor.
Esta implementação de fan-out tenta garantir a entrega de todas as mensagens utilizando um agrupador (WaitGroup) para aguardar a publicação dos valores em todos os canais de saída. A publicação é feita em sua própria goroutine e conta também com um mecanismo(timer) de forma a previnir o bloqueio caso algum canal de saída não consiga consumir a mensagem. As mensagens não consumidas são descartadas.
package main
import (
"fmt"
"sync"
"time"
)
func publicar(saida chan<- int, valor int, wg *sync.WaitGroup) {
timer := time.NewTimer(1 * time.Second)
// Aguarda 1 segundo ou o canal ser lido
select {
case saida <- valor:
case <-timer.C:
}
// Independente do canal ser lido ou não,
// avisa que a publicação terminou
wg.Done()
timer.Stop()
}
func fanout(entrada <-chan int, saidas ...chan<- int) {
// O agrupamento das publicações é para evitar que
// o processamento fique bloquando enquanto um canal de saída não é lido
// e garante que todos os valores serão publicados
var wg sync.WaitGroup
for valor := range entrada {
wg.Add(len(saidas))
// Publica o valor de entrada em todas as saídas
for _, saida := range saidas {
go publicar(saida, valor, &wg)
}
wg.Wait()
}
// Como a entrada foi consumida, fecha os canais de saída
for _, saida := range saidas {
close(saida)
}
}
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func trabalhador(in <-chan int, id int, wg *sync.WaitGroup) {
for v := range in {
fmt.Println("id: ", id, " valor: ", v)
}
wg.Done()
}
func main() {
saida1 := make(chan int)
saida2 := make(chan int)
// Agrupamos os trabalhadores de forma
// a aguardar o processamento de todos antes do programa principal
// ser finalizado
var wg sync.WaitGroup
wg.Add(2)
go trabalhador(saida1, 1, &wg)
go trabalhador(saida2, 2, &wg)
fanout(sequenciaNumeros(1, 10), saida1, saida2)
wg.Wait()
}
Uma janela deslizante (sliding window) é utilizada para prevenir que um leitor lento trave um escritor rápido. Ela funciona deslizando sobre os dados. A ordem de entregas é garantida porém dados antigos podem ser descartados se o consumidor for muito lento.
No exemplo uma sequência de números é gerada, porém nosso consumidor é mais lento que o produtor, logo a medida que a janela desliza os valores antigos são descartados.
Para fazer a janela deslizante, utilizamos um buffer, que possui um tamanho fixo. Utilizamos uma técnica de seleção (select) onde caso o canal de saída seja lido, enviamos o valor para o consumidor e o removemos do buffer. Caso o canal de entrada seja lido, o valor é adicionado ao buffer.
package main
import (
"container/list"
"fmt"
"time"
)
func janelaDeslizante(saida chan<- interface{}, entrada <-chan interface{}, tamanho int) {
buffer := list.New()
defer close(saida)
for entrada != nil || buffer.Len() > 0 {
if buffer.Len() == 0 {
// nós temos um buffer vazio
// e um canal de entrada válido
val := <-entrada
if val == nil { // assume que nil significa fechado
entrada = nil // não vai mais ler dados
continue
}
buffer.PushBack(val)
continue
}
select {
case saida <- buffer.Front().Value:
// consumidor lê o dado
buffer.Remove(buffer.Front()) // remove first item
case val := <-entrada:
// recebeu nova entrada
if val == nil {
// invalida entrada
entrada = nil
// continua já que podemos ter dados
// no buffer
continue
}
if buffer.Len() == tamanho {
// buffer cheio, descarta dados antigos
buffer.Remove(buffer.Front())
}
// adiciona novo dado no buffer
buffer.PushBack(val)
}
}
}
func leitorLento(in <-chan interface{}) {
for val := range in {
fmt.Printf("valor: %v\n", val)
time.Sleep(4 * time.Second)
}
}
func sequenciaNumeros(inicial, final int) <-chan interface{} {
saida := make(chan interface{})
go func() {
for i := inicial; i <= final; i++ {
saida <- i
time.Sleep(1 * time.Second)
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
func main() {
valores := sequenciaNumeros(1, 10)
saida := make(chan interface{})
go leitorLento(saida)
janelaDeslizante(saida, valores, 3)
}
Um processamento em lote (batch processing) é usado quando uma goroutine gera itens um-por-um mas o consumidor deseja processar os items em blocos. Normalmente um canal de conclusão é usado para notificar o escritor que o item foi processado. Um canal de descarga (flush) pode user usado para forçar que o buffer seja enviado antes que ele esteja cheio.
Exemplo: Ao invés de salvar cada item no banco de dados assim que ele é recebido, é possível utilizar um buffer de 100 itens ou 100ms e salvar os itens em uma única requisição.
No exemplo, quando a terceira requisição (req) é enviada, o buffer percebe que ele está cheio e envia os dados para o canal de saída.
Há um canal que permite enviar os dados antes que o buffer esteja cheio, chamado descarga
.
Quando o canal de entrada é fechado, mas ainda há itens no buffer, o buffer é enviado para o canal de saída.
package main
import (
"fmt"
)
type req struct {
valor int
}
func processar(lote []req) {
fmt.Println("processando lote com valores: ", lote)
}
func processadorLotes(entrada <-chan []req) chan bool {
pronto := make(chan bool)
go func() {
for lote := range entrada {
processar(lote)
}
pronto <- true
}()
return pronto
}
func processamentoLotes(entrada <-chan req, descarga <-chan struct{}, tamanhoLote int) chan []req {
saida := make(chan []req)
go func() {
buf := make([]req, 0, tamanhoLote)
valorZero := req{}
var fechado bool
// enquanto houver itens para processar
for !fechado {
var deveDescarregar bool
select {
case r := <-entrada:
if r == valorZero {
// close on zero value
fechado = true
continue
}
// Adiciona o item no buffer
buf = append(buf, r)
deveDescarregar = len(buf) == tamanhoLote
case <-descarga:
deveDescarregar = true
}
if deveDescarregar {
saida <- buf
buf = make([]req, 0, tamanhoLote)
}
}
// garante que caso a entrada seja fechada sem preencher o buffer
// os ultimos itens sejam enviados para processamento
if len(buf) > 0 {
saida <- buf
}
close(saida)
}()
return saida
}
func main() {
entrada := make(chan req)
descarga := make(chan struct{})
// inicia de forma concorrente o processamento em lotes
saida := processamentoLotes(entrada, descarga, 3)
// O consumidor de lotes será iniciado de forma concorrente
pronto := processadorLotes(saida)
entrada <- req{valor: 1}
entrada <- req{valor: 2}
entrada <- req{valor: 3}
// Envia mais dois itens, porém força a descarga
// através de um sinal
entrada <- req{valor: 4}
entrada <- req{valor: 5}
descarga <- struct{}{}
// Envia mais dois itens, não o suficiente para descarregar
// o lote.
entrada <- req{valor: 6}
entrada <- req{valor: 7}
// Eles serão processados mesmo assim.
close(entrada)
// Aguarda todo o processamento do processador de lotes
// antes de encerrar o programa
<-pronto
}
Um sistema de ticket é usado para controlar quando um determinado trabalho pode ser executado, normalmente é utilizado para limitar o uso de um recurso sobre um período de tempo.
Exemplo: Uma API pode ser acionada apenas 15 vezes em um perído de 15 minutos. A utilização é medida em blocos de 15 minutos.
No exemplo, a bilheteria é um sistema de ticket que garante que apenas 15 "tickets" sejam processados a cada segundo.
Enviamos através de um canal 30 processamentos a serem feitos, mas o sistema de ticket garante que apenas 15 processamentos sejam executados por segundo.
Como pode ser visto, o trabalhador fica bloqueado, até que um ticket seja enviado através do canal.
package main
import (
"fmt"
"time"
)
type (
Trabalho func()
ticket int
)
func trabalhador(tickets <-chan ticket, work <-chan Trabalho) {
for w := range work {
<-tickets // espera por um ticket
w() // executa um trabalho
}
}
func bilheteria(tickets chan<- ticket, timeout time.Duration, nTickets int) {
for {
for i := 0; i < nTickets; i++ {
tickets <- ticket(i)
}
// espera até que mais tickets possam ser emitidos
<-time.After(timeout)
}
}
func main() {
tickets := make(chan ticket)
trabalhos := make(chan Trabalho)
go bilheteria(tickets, 1*time.Second, 10)
go trabalhador(tickets, trabalhos)
for i := 0; i <= 30; i++ {
trabalhos <- func() {
fmt.Println("processando ticket")
}
fmt.Println("trabalho ", i, " enviado")
}
close(trabalhos)
close(tickets)
}