Varias rutinas gordas escuchando en un canal

82

Tengo múltiples goroutines tratando de recibir en el mismo canal simultáneamente. Parece que la última goroutine que comienza a recibir en el canal obtiene el valor. ¿Está esto en alguna parte de la especificación del idioma o es un comportamiento indefinido?

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        <-c
        c <- fmt.Sprintf("goroutine %d", i)
    }(i)
}
c <- "hi"
fmt.Println(<-c)

Salida:

goroutine 4

Ejemplo en el patio de recreo

EDITAR:

Me acabo de dar cuenta de que es más complicado de lo que pensaba. El mensaje pasa por todas las goroutines.

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        msg := <-c
        c <- fmt.Sprintf("%s, hi from %d", msg, i)
    }(i)
}
c <- "original"
fmt.Println(<-c)

Salida:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

Ejemplo en el patio de recreo

Ilia Choly
fuente
6
Probé su último fragmento y (para mi gran alivio) solo salió original, hi from 4...
Chang Qian
1
@ChangQian agregar un time.Sleep(time.Millisecond)entre el envío y la recepción del canal devuelve el comportamiento anterior.
Ilia Choly

Respuestas:

75

Sí, es complicado, pero hay un par de reglas generales que deberían hacer que las cosas se sientan mucho más sencillas.

  • prefiera usar argumentos formales para los canales que pasa a las rutinas go en lugar de acceder a los canales en el ámbito global De esta manera, puede obtener más comprobaciones del compilador y también una mejor modularidad.
  • evite leer y escribir en el mismo canal en una rutina de marcha en particular (incluida la 'principal'). De lo contrario, el estancamiento es un riesgo mucho mayor.

Aquí hay una versión alternativa de su programa, aplicando estas dos pautas. Este caso demuestra muchos escritores y un lector en un canal:

c := make(chan string)

for i := 1; i <= 5; i++ {
    go func(i int, co chan<- string) {
        for j := 1; j <= 5; j++ {
            co <- fmt.Sprintf("hi from %d.%d", i, j)
        }
    }(i, c)
}

for i := 1; i <= 25; i++ {
    fmt.Println(<-c)
}

http://play.golang.org/p/quQn7xePLw

Crea las cinco rutinas go escribiendo en un solo canal, cada una escribiendo cinco veces. La rutina principal lee los veinticinco mensajes; puede notar que el orden en el que aparecen a menudo no es secuencial (es decir, la concurrencia es evidente).

Este ejemplo demuestra una característica de los canales Go: es posible tener varios escritores compartiendo un canal; Go intercalará los mensajes automáticamente.

Lo mismo se aplica para un escritor y varios lectores en un canal, como se ve en el segundo ejemplo aquí:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
    go func(i int, ci <-chan int) {
        j := 1
        for v := range ci {
            time.Sleep(time.Millisecond)
            fmt.Printf("%d.%d got %d\n", i, j, v)
            j += 1
        }
        w.Done()
    }(i, c)
}

for i := 1; i <= 25; i++ {
    c <- i
}
close(c)
w.Wait()

Este segundo ejemplo incluye una espera impuesta a la goroutine principal, que de otro modo saldría rápidamente y provocaría que las otras cinco gorutinas se cancelen antes (gracias a olov por esta corrección) .

En ambos ejemplos, no se necesitó almacenamiento en búfer. En general, es un buen principio considerar el almacenamiento en búfer únicamente como un potenciador del rendimiento. Si su programa no se bloquea sin búfer, tampoco lo hará con búfer (pero lo contrario no siempre es cierto). Entonces, como otra regla general, comience sin almacenar en búfer y luego agréguelo más tarde según sea necesario .

Rick-777
fuente
¿No necesitas esperar a que terminen todas las rutinas?
mlbright
Depende de lo que quieras decir. Eche un vistazo a los ejemplos de play.golang.org; tienen una mainfunción que termina una vez que llega al final, independientemente de lo que estén haciendo otras gorutinas. En el primer ejemplo anterior, mainestá sincronizado con las otras rutinas de gor, por lo que no hay problema. El segundo ejemplo también funciona sin problemas porque todos los mensajes se envían a través de c antes declose que se llame a la función y esto sucede antes de quemain finalice la goroutine. (Se podría argumentar que llamar closees superfluo en el caso, pero es una buena práctica)
Rick-777
1
asumiendo que desea (determinísticamente) ver 15 impresiones en el último ejemplo, debe esperar. Para demostrar eso, aquí está el mismo ejemplo pero con un tiempo. Duerma justo antes de Printf: play.golang.org/p/cEP-UBPLv6
olov
Y aquí está el mismo ejemplo con time.Sleep y arreglado con un WaitGroup para esperar las goroutines: play.golang.org/p/ESq9he_WzS
olov
No creo que sea una buena recomendación omitir el almacenamiento en búfer al principio. Sin almacenamiento en búfer, en realidad no escribe código concurrente, y eso conduce no solo a que no se puede interbloquear, sino también a que el resultado de manejo del otro lado del canal ya está disponible en la siguiente instrucción después del envío, y puede que involuntariamente (o eventualmente intencionalmente en el caso de un novato) confíe en eso. Y una vez que confía en el hecho de que tiene un resultado de inmediato, sin esperarlo especialmente, y agrega un búfer, tiene una condición de carrera.
usuario
24

Respuesta tardía, pero espero que esto ayude a otros en el futuro, como Long Polling, Botón "Global", ¿Transmitir a todos?

Effective Go explica el problema:

Los receptores siempre se bloquean hasta que haya datos para recibir.

Eso significa que no puede tener más de 1 goroutine escuchando 1 canal y esperar que TODAS las goroutines reciban el mismo valor.

Ejecute este ejemplo de código .

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
        for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }

    close(c)
}

No verá "count 1" más de una vez aunque haya 5 goroutines escuchando el canal. Esto se debe a que cuando la primera goroutine bloquea el canal, todas las demás goroutine deben esperar en fila. Cuando se desbloquea el canal, el recuento ya se ha recibido y eliminado del canal, por lo que la siguiente gorutina en línea obtiene el siguiente valor de recuento.

Brenden
fuente
1
Gracias, ahora este ejemplo tiene sentido github.com/goinaction/code/blob/master/chapter6/listing20/…
user31208
Ahh, esto fue útil. ¿Sería una buena alternativa crear un canal para cada rutina de Go que necesite la información y luego enviar un mensaje en todos los canales cuando sea necesario? Esa es la opción que puedo imaginar.
ThePartyTurtle
8

Es complicado.

Además, mira qué pasa con GOMAXPROCS = NumCPU+1. Por ejemplo,

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU() + 1)
    fmt.Print(runtime.GOMAXPROCS(0))
    c := make(chan string)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- ", original"
    fmt.Println(<-c)
}

Salida:

5, original, hi from 4

Y vea lo que sucede con los canales almacenados en búfer. Por ejemplo,

package main

import "fmt"

func main() {
    c := make(chan string, 5+1)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- "original"
    fmt.Println(<-c)
}

Salida:

original

También debería poder explicar estos casos.

peterSO
fuente
7

Estudié soluciones existentes y creé una biblioteca de transmisión simple https://github.com/grafov/bcast .

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send("test message") // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages
Alejandro I.Grafov
fuente
2
¡Gran lib que tienes ahí! También encontré github.com/asaskevich/EventBus
usuario
Y no es gran cosa, pero tal vez debería mencionar cómo deshacerse del archivo Léame.
usuario
Fuga de memoria allí
jhvaras
:( ¿Puedes explicar los detalles @jhvaras?
Alexander I.Grafov
2

Para escuchar múltiples gorutinas en un canal, sí, es posible. el punto clave es el mensaje en sí, puedes definir algún mensaje como ese:

package main

import (
    "fmt"
    "sync"
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // both block or non-block are ok
    var wg sync.WaitGroup
    receiver := 25 // specify receiver count

    sender := func() {
        o := &obj {
            msg: "hello everyone!",
            receiver: receiver,
        }
        ch <- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := <-ch
        fmt.Printf("%d received at %d\n", idx, o.receiver)
        o.receiver--
        if o.receiver > 0 {
            ch <- o // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go sender()
    for i:=0; i<reciever; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

La salida es aleatoria:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4
coanor
fuente