¿Cómo esperar a que terminen todas las gorutinas sin gastar tiempo?

108

Este código selecciona todos los archivos xml en la misma carpeta, ya que el ejecutable invocado y aplica de forma asincrónica el procesamiento a cada resultado en el método de devolución de llamada (en el ejemplo siguiente, solo se imprime el nombre del archivo).

¿Cómo evito usar el método de dormir para evitar que salga el método principal? Tengo problemas para entender los canales (supongo que eso es lo que se necesita para sincronizar los resultados), ¡así que agradecemos cualquier ayuda!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Dante
fuente

Respuestas:

173

Puede utilizar sync.WaitGroup . Citando el ejemplo vinculado:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
zzzz
fuente
11
¿Alguna razón por la que tienes que hacer wg.Add (1) fuera de la rutina de ir? ¿Podemos hacerlo adentro justo antes del aplazamiento wg.Done ()?
sábado
18
sábado, sí, hay una razón, se describe en sincronía EspereGroup.Agregar documentos: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
wobmene
15
La adaptación de este código me provocó una larga sesión de depuración porque mi goroutine era una función con nombre y pasar el WaitGroup como valor lo copiará y hará que wg.Done () sea ineficaz. Si bien esto podría solucionarse pasando un puntero & wg, una mejor manera de prevenir tales errores es declarar la variable WaitGroup como un puntero en primer lugar: en wg := new(sync.WaitGroup)lugar de var wg sync.WaitGroup.
Robert Jack Will
Supongo que es válido escribir wg.Add(len(urls))justo encima de la línea for _, url := range urls, creo que es mejor ya que usa Agregar solo una vez.
Victor
@RobertJackWill: ¡Buena nota! Por cierto, esto se trata en los documentos : "Un WaitGroup no se debe copiar después del primer uso. Lástima que Go no tiene una forma de hacer cumplir esto . En realidad, sin embargo, go vetdetecta este caso y advierte con" func pasa bloqueo por valor : sync.WaitGroup contiene sync.noCopy ".
Brent Bradburn
56

WaitGroups es definitivamente la forma canónica de hacer esto. Sin embargo, solo en aras de la integridad, aquí está la solución que se usaba comúnmente antes de que se introdujeran WaitGroups. La idea básica es usar un canal para decir "He terminado" y hacer que la rutina principal espere hasta que cada rutina generada haya informado de su finalización.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
Joshlf
fuente
9
Es bueno ver una solución con canales simples. Una ventaja adicional: si doSomething()devuelve algún resultado, puede ponerlo en el canal, y puede recopilar y procesar los resultados en el segundo ciclo for (tan pronto como estén listos)
andras
4
Solo funciona si ya sabe la cantidad de gorutines que le gustaría comenzar. ¿Qué sucede si está escribiendo algún tipo de rastreador html y comienza gorutines de manera recursiva para cada enlace de la página?
shinydev
Deberá realizar un seguimiento de esto de alguna manera independientemente. Con WaitGroups es un poco más fácil porque cada vez que generas una nueva goroutine, primero puedes hacerlo wg.Add(1)y, por lo tanto, las rastreará. Con canales sería algo más complicado.
joshlf
c se bloqueará ya que todas las rutinas de Go intentarán acceder a él, y no tiene búfer
Edwin Ikechukwu Okonkwo
Si por "bloquear" te refieres a que el programa se interbloqueará, eso no es cierto. Puede intentar ejecutarlo usted mismo. La razón es que las únicas goroutines en las que escriben cson diferentes de la goroutine principal, que lee c. Por lo tanto, la goroutine principal siempre está disponible para leer un valor del canal, lo que sucederá cuando una de las goroutines esté disponible para escribir un valor en el canal. Tiene razón en que si este código no generara goroutines, sino que ejecutaba todo en una sola goroutine, se bloquearía.
joshlf
8

sync.WaitGroup puede ayudarlo aquí.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
atenuar
fuente
1

Aunque sync.waitGroup(wg) es la forma canónica a seguir, requiere que haga al menos algunas de sus wg.Addllamadas antes que usted wg.Waitpara que todas las complete. Es posible que esto no sea factible para cosas simples como un rastreador web, donde no se conoce la cantidad de llamadas recursivas de antemano y lleva un tiempo recuperar los datos que impulsan las wg.Addllamadas. Después de todo, debe cargar y analizar la primera página antes de saber el tamaño del primer lote de páginas secundarias.

Escribí una solución utilizando canales, evitando waitGroupen mi solución el ejercicio Tour of Go - web crawler . Cada vez que se inician una o más rutinas, envía el número al childrencanal. Cada vez que una rutina está a punto de completarse, envía un 1al donecanal. Cuando la suma de los niños es igual a la suma de hecho, hemos terminado.

Mi única preocupación restante es el tamaño codificado del resultscanal, pero esa es una limitación (actual) de Go.


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Código fuente completo de la solución

dirkjot
fuente
1

Aquí hay una solución que emplea WaitGroup.

Primero, defina 2 métodos de utilidad:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Luego, reemplace la invocación de callback:

go callback(fileName)

Con una llamada a su función de utilidad:

util.GoNode(func() { callback(fileName) })

Último paso, agregue esta línea al final de su main, en lugar de su sleep. Esto asegurará que el hilo principal esté esperando a que finalicen todas las rutinas antes de que el programa pueda detenerse.

func main() {
  // ...
  util.WaitForAllNodes()
}
gamliela
fuente