¿Cómo escuchar N canales? (declaración de selección dinámica)

116

para iniciar un ciclo interminable de ejecución de dos goroutines, puedo usar el siguiente código:

después de recibir el mensaje, se iniciará una nueva goroutine y continuará para siempre.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Ahora me gustaría tener el mismo comportamiento para N goroutines, pero ¿cómo se verá la instrucción select en ese caso?

Este es el bit de código con el que comencé, pero estoy confundido sobre cómo codificar la declaración de selección

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
John Smith
fuente
4
Creo que lo que quieres es multiplexación de canales. golang.org/doc/effective_go.html#chan_of_chan Básicamente, tienes un solo canal que escuchas y luego varios canales secundarios que se canalizan hacia el canal principal. Pregunta SO relacionada: stackoverflow.com/questions/10979608/…
Brenden

Respuestas:

152

Para ello, puede utilizar la Selectfunción de la reflejan paquete:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Seleccionar ejecuta una operación de selección descrita por la lista de casos. Al igual que la instrucción Go select, se bloquea hasta que al menos uno de los casos pueda continuar, hace una elección pseudoaleatoria uniforme y luego ejecuta ese caso. Devuelve el índice del caso elegido y, si ese caso fue una operación de recepción, el valor recibido y un booleano que indica si el valor corresponde a un envío en el canal (en contraposición a un valor cero recibido porque el canal está cerrado).

Pasas una matriz de SelectCaseestructuras que identifican el canal para seleccionar, la dirección de la operación y un valor para enviar en el caso de una operación de envío.

Entonces podrías hacer algo como esto:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Puede experimentar con un ejemplo más desarrollado aquí: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
fuente
4
¿Existe un límite práctico para el número de casos en tal selección? ¿El que si va más allá, el rendimiento se ve gravemente afectado?
Maxim Vladimirsky
4
Tal vez sea mi incompetencia, pero encontré este patrón realmente difícil de trabajar cuando estás enviando y recibiendo estructuras complejas a través del canal. Pasar un canal "agregado" compartido, como dijo Tim Allclair, fue mucho más fácil en mi caso.
Bora M. Alper
90

Puede lograr esto envolviendo cada canal en una goroutine que "reenvía" mensajes a un canal "agregado" compartido. Por ejemplo:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Si necesita saber de qué canal se originó el mensaje, puede envolverlo en una estructura con cualquier información adicional antes de reenviarlo al canal agregado.

En mis pruebas (limitadas), este método supera con creces el uso del paquete reflect:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Código de referencia aquí

Tim Allclair
fuente
2
Su código de referencia es incorrecto, debe realizar un bucleb.N dentro de una referencia. De lo contrario, los resultados (que se dividen entre b.N1 y 2000000000 en su salida) no tendrán ningún significado.
Dave C
2
@DaveC ¡Gracias! La conclusión no cambia, pero los resultados son mucho más cuerdos.
Tim Allclair
1
De hecho, hice un truco rápido en su código de referencia para obtener algunos números reales . Es muy posible que aún haya algo faltante / incorrecto en este punto de referencia, pero lo único que tiene el código reflect más complicado es que la configuración es más rápida (con GOMAXPROCS = 1) ya que no necesita un montón de gorutinas. En todos los demás casos, un canal de fusión de goroutine simple destruye la solución reflectante (en ~ 2 órdenes de magnitud).
Dave C
2
Una desventaja importante (en comparación con el reflect.Selectenfoque) es que las gorutinas hacen el búfer de fusión al mínimo un valor único en cada canal que se fusiona. Por lo general, eso no será un problema, pero en algunas aplicaciones específicas puede ser un factor decisivo :(.
Dave C
1
un canal de combinación con búfer empeora el problema. El problema es que solo la solución reflect puede tener una semántica sin búfer. Seguí adelante y publiqué el código de prueba con el que estaba experimentando como una respuesta separada para (con suerte) aclarar lo que estaba tratando de decir.
Dave C
22

Para ampliar algunos comentarios sobre respuestas anteriores y proporcionar una comparación más clara, aquí hay un ejemplo de ambos enfoques presentados hasta ahora con la misma entrada, un segmento de canales para leer y una función para llamar para cada valor que también necesita saber cuál canal del que proviene el valor.

Hay tres diferencias principales entre los enfoques:

  • Complejidad. Aunque puede ser parcialmente una preferencia del lector, encuentro que el enfoque del canal es más idiomático, directo y legible.

  • Actuación. En mi sistema Xeon amd64, los canales goroutines + realizan la solución de reflexión en aproximadamente dos órdenes de magnitud (en general, la reflexión en Go suele ser más lenta y solo debe usarse cuando sea absolutamente necesario). Por supuesto, si hay un retraso significativo en la función que procesa los resultados o en la escritura de valores en los canales de entrada, esta diferencia de rendimiento puede fácilmente volverse insignificante.

  • Semántica de bloqueo / almacenamiento en búfer. La importancia de esto depende del caso de uso. La mayoría de las veces, no importa o el ligero almacenamiento en búfer adicional en la solución de fusión de goroutine puede ser útil para el rendimiento. Sin embargo, si es deseable tener la semántica de que solo se desbloquea un solo escritor y su valor se maneja completamente antes de que se desbloquee cualquier otro escritor, entonces eso solo se puede lograr con la solución reflect.

Tenga en cuenta que ambos enfoques se pueden simplificar si no se requiere el "id" del canal de envío o si los canales de origen nunca se cerrarán.

Canal de fusión de Goroutine:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Selección de reflexión:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Código completo en el patio de juegos de Go ].

Dave C
fuente
1
También vale la pena señalar que la solución goroutines + canales no puede hacer todo selecto lo reflect.Selecthace. Las gorutinas seguirán girando hasta que consuman todo lo de los canales, por lo que no hay una forma clara de poder Process1salir temprano. También existe la posibilidad de que surjan problemas si tiene varios lectores, ya que las goroutines almacenan un elemento de cada uno de los canales, lo que no sucederá con select.
James Henstridge
@JamesHenstridge, tu primera nota sobre detenerte no es cierta. Haría arreglos para detener Process1 exactamente de la misma manera que arreglaría para detener Process2; por ejemplo, agregando un canal de "parada" que se cierra cuando las rutinas gordas deben detenerse. Process1 necesitaría dos casos selectdentro de un forciclo en lugar del for rangeciclo más simple que se usa actualmente. Process2 necesitaría insertar otro caso casesy un manejo especial de ese valor de i.
Dave C
Eso todavía no resuelve el problema de que está leyendo valores de los canales que no se utilizarán en el caso de parada anticipada.
James Henstridge
0

¿Por qué este enfoque no funcionaría asumiendo que alguien está enviando eventos?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
fuente
8
Este es un bucle giratorio. Mientras espera que un canal de entrada tenga un valor, este consume toda la CPU disponible. El objetivo de selectlos canales múltiples (sin una defaultcláusula) es que espera de manera eficiente hasta que al menos uno esté listo sin girar.
Dave C
0

Opción posiblemente más sencilla:

En lugar de tener una serie de canales, ¿por qué no pasar solo un canal como parámetro a las funciones que se ejecutan en goroutines separados, y luego escuchar el canal en un goroutine consumidor?

Esto le permite seleccionar un solo canal en su oyente, lo que hace que sea una selección simple y evite la creación de nuevas gorutinas para agregar mensajes de múltiples canales.

Fernando Sanchez
fuente