Model razcepi-pridruži
var wg sync.WaitGroup
func hello(s string) {
defer wg.Done()
for i := 0; i < 5; i++ {
fmt.Print(s, " ")
time.Sleep(time.Millisecond)
}
}
func main() {
wg.Add(1)
go hello("hello")
wg.Add(1)
go hello("world")
wg.Add(1)
hello("!")
wg.Wait()
fmt.Println()
}Kanali - osnove
var dataStream chan int
dataStream = make(chan int)
var bufferStream = make(chan int, 4)
// Pisanje
dataStream <- 314
// Branje
value, ok := <-dataStream // 314, true
close(dataStream)
value, ok := <-dataStream // 0, false
for msg := range stringStream {
// ...
}Kanali - sinhronizacija
Funkcija speaker da gorutinam listener signal kdaj lahko nadaljujejo
S struct{} poudarimo, da kanal ni namenjen prenašanju sporočil, temveč sinhronizaciji
func speaker(message string) <-chan struct{} {
broadcastStream := make(chan struct{})
go func() {
time.Sleep(5 * time.Second)
fmt.Println("Announcement:", message)
close(broadcastStream)
}()
return broadcastStream
}
func listener(id int, broadcastStream <-chan struct{}) {
fmt.Println("Listener", id, "is waiting for an announcement.")
<-broadcastStream
fmt.Println("Listener", id, "completed.")
}
func main() {
broadcastStream := speaker("Hello world for the last time!")
for i := 1; i < 5; i++ {
go listener(i, broadcastStream)
}
listener(0, broadcastStream)
fmt.Println("Great!")
time.Sleep(1 * time.Second)
}Gorutini writer neprestano pišeta vsaka v svoj kanal
Gorutina reader bere in obdeluje sporočila iz več kanalov
Dodamo kanal za zaustavitev gorutine reader - glavna gorutina sproži zaustavitev po 20 sekundah
var wg sync.WaitGroup
func writer(id int) <-chan string {
stream := make(chan string)
go func() {
defer close(stream)
for {
stream <- "message from " + strconv.Itoa(id)
time.Sleep(time.Duration(id*id+1) * time.Second)
}
}()
return stream
}
func reader(stream1 <-chan string, stream2 <-chan string, streamDone <-chan struct{}) {
defer wg.Done()
for {
select {
case msg1 := <-stream1:
fmt.Println(msg1)
case msg2 := <-stream2:
fmt.Println(msg2)
case <-streamDone:
fmt.Println("Done")
return
case <-time.After(1 * time.Second):
fmt.Println("timeout")
default:
}
}
}
func main() {
writer1Stream := writer(1)
writer2Stream := writer(2)
streamDone := make(chan struct{})
wg.Add(1)
go reader(writer1Stream, writer2Stream, streamDone)
time.Sleep(20 * time.Second)
close(streamDone)
wg.Wait()
}Sinhronizacija - ključavnice
Uporaba ključavnice:
var result int
var wg sync.WaitGroup
var lock sync.Mutex
func worker() {
defer wg.Done()
lock.Lock()
result++
lock.Unlock()
}
func main() {
wg.Add(*gPtr)
for id := 0; id < 10; id++ {
go worker()
}
wg.Wait()
}Uporaba atomic spremenljivk:
var result atomic.Int64
var wg sync.WaitGroup
func worker() {
defer wg.Done()
result.Add(1)
}
func main() {
wg.Add(*gPtr)
for id := 0; id < 10; id++ {
go worker()
}
wg.Wait()
}Če gorutine veliko dostopajo do skupne spremenljivke:
- lokalna spremenljivka kot delni rezultat, kasneje uporaba le ene ključavnice
- delne rezultate se pošilja v kanal, glavna gorutina čaka na
štGorutinprebranih vrednosti
Bralno-pisalna ključavnica:
var wg sync.WaitGroup
var lockBook sync.RWMutex
func writer(id int, cycles int) {
defer wg.Done()
for i := 0; i < cycles; i++ {
lockBook.Lock()
fmt.Println("Writer", id, "start", i)
time.Sleep(time.Duration(id) * time.Millisecond)
fmt.Println("Writer", id, "finish", i)
lockBook.Unlock()
time.Sleep(time.Duration(id) * time.Millisecond)
}
}
func reader(id int) {
for {
lockBook.RLock()
fmt.Println("Reader", id, "start")
time.Sleep(time.Duration(id) * time.Millisecond)
fmt.Println("Reader", id, "finish")
lockBook.RUnlock()
time.Sleep(time.Duration(id) * time.Millisecond)
}
}
func main() {
// preberemo argumente
writersPtr := flag.Int("w", 2, "# of writers")
readersPtr := flag.Int("r", 4, "# of readers")
cyclesPtr := flag.Int("c", 10, "# of cycles")
flag.Parse()
// zaženemo pisatelje
wg.Add(*writersPtr)
for i := 1; i <= *writersPtr; i++ {
go writer(i, *cyclesPtr)
}
// zaženemo bralce
for i := 1; i <= *readersPtr; i++ {
go reader(i)
}
// počakamo, da pisatelji zaključijo
wg.Wait()
}Uporaba sync.Map kot slovar:
func writeToMap(id int, steps int, dict *sync.Map) {
defer wg.Done()
dict.Store(id, 0)
for i := 0; i < steps; i++ {
dict.Store(id, i)
}
}
func readFromMap(id int, steps int, dict *sync.Map) {
defer wg.Done()
for i := 0; i < steps; i++ {
_, _ = dict.Load(id)
}
}
func main() {
gwPtr := flag.Int("gw", 1, "# of writing goroutines")
grPtr := flag.Int("gr", 1, "# of reading goroutines")
sPtr := flag.Int("s", 100, "# of read or write steps")
flag.Parse()
var dict sync.Map
records := max(*gwPtr, *grPtr)
for i := 0; i < records; i++ {
dict.Store(i, 0)
}
timeStart := time.Now()
for i := 0; i < *gwPtr; i++ {
wg.Add(1)
go writeToMap(i, *sPtr, &dict)
}
for i := 0; i < *grPtr; i++ {
wg.Add(1)
go readFromMap(i, *sPtr, &dict)
}
wg.Wait()
fmt.Print("dict: map[")
dict.Range(
func(k, v interface{}) bool {
fmt.Print(k, ":", v, " ")
return true
})
fmt.Println("\b] time:", time.Since(timeStart))
}