Współbieżność
23 March 2017
Filip Borkiewicz
Filip Borkiewicz
Świat jest paralelny.
Wszystko co dzieje się wokół nas jest zbiorem niezależnie toczących się
procesów.
To samo jest prawdą w informatyce:
Języki programowania powinny jakoś to odzwierciedlać.
Sposób kompozycji programów, ogólny koncept zbioru niezależnie wykonywanych
zadań (funkcji?).
Jednoczesne wykonywanie wielu zadań (funkcji?), związanych ze sobą lub nie.
"My program is slower when using multiple threads."
Współbieżność polega na poradzeniu sobie z wieloma zadaniami jednocześnie,
paralelizm polega na wykonaniu wielu zadań jednocześnie.
Współbieżność dotyczy sposobu kompozycji i projektowania oprogramowania.
Współbieżność może (ale nie musi) prowadzić do rozwiązania problemu, które
łatwo będzie sparalelizować.
"Do not communicate by sharing memory; instead, share memory by communicating."
Tony Hoare: Communicating Sequential Processes
"This paper suggests that input and ouput are basic primitives of programming
and that parallel composition of communicating sequential processes is a
fundamental program structuring method."
Compiled, statically typed, garbage-collected, object-oriented (?) programming
language with CSP-inspired concurrency features.
Packages, rich official tooling, no generics.
a := 5
var a int = 5
Hello, world!
package main import "fmt" func main() { fmt.Println("Hello, 世界") }
Zwykłe wywołanie funkcji:
foo() // foo blocks
Keyword go
:
go foo() // foo doesn't block bar()
Podobne do wątków, ale znacznie lżejsze.
Normalnym jest tworzenie tysięcy lub dziesiątek tysięcy goroutines podczas
działania programu.
Runtime dynamically schedules goroutines for execution on OS threads.
Channels are used to communicate between running goroutines and synchronize
them.
Similar to pipes in the shell.
Channels are typed.
ch := make(chan int) go func() { time.Sleep(5 * time.Second) ch <- 10 } number := <-ch
Podobne do instrukcji switch
, gałęzie wybierane są w oparciu o gotowość
do komunikacji, a nie o wartość jakiejś zmiennej.
Blokuje do momentu kiedy jeden z warunków zostanie spełniony.
Jeśli żaden kanał nie jest gotowy do komunikacji zostaje wybrany przypadek
default
.
select { case value1 := <-ch1: fmt.Println("Received from ch1:", value1) case value2 := <-ch2: fmt.Println("Received from ch2:", value2) case ch3 <- 7: fmt.Println("Sent to ch3:", 7) default: fmt.Println("Nobody was ready!") }
Praca wykonywana w pętli blokuje wykonywanie programu.
func main() { for i := 0; i < 5; i++ { fmt.Println("Hello", i) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } fmt.Println("Exiting.") }
Wykonujemy pracę w goroutine, możemy kontynuować od razu.
func main() { go func() { for i := 0; ; i++ { fmt.Println("Hello", i) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } }() fmt.Println("Exiting.") }
Kiedy program zakańcza działanie, zabijane są wszystkie goroutines.
func main() { go func() { for i := 0; ; i++ { fmt.Println("Hello", i) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } }() fmt.Println("Exiting.") time.Sleep(5 * time.Second) }
Wykorzystujemy kanał do przesłania informacji pomiędzy osobnymi goroutines.
func main() { ch := make(chan int) go func() { for i := 0; ; i++ { ch <- i time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } }() for i := 0; i < 5; i++ { fmt.Println("Hello", <-ch) } fmt.Println("Exiting.") }
Channels are first-class values.
func generate(ch chan int) { for i := 0; ; i++ { ch <- i time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } } func main() { ch := make(chan int) go generate(ch) for i := 0; i < 5; i++ { fmt.Println("Hello", <-ch) } fmt.Println("Exiting.") }
Kiedy funkcja main
usiłuje przeczytać wartość <-ch
blokuje do momentu kiedy
jest to możliwe.
Kiedy funkcja generate
stara się wysłać ch
<-
wartość do kanału także czeka do
momentu kiedy jest to możliwe.
Kanały pozwalają nam przesyłać informacje, a także synchronizować program.
Istnieją kanały buforowane, które nie mogą służyć do synchronizacji.
"Do not communicate by sharing memory; instead, share memory by communicating."
Kanały to zwykłe wartości, możemy zwracać je z funkcji lub przekazywać je
jako argumenty.
func generate() <-chan int { ch := make(chan int) go func() { for i := 0; ; i++ { ch <- i time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } }() return ch } func main() { ch := generate() for i := 0; i < 5; i++ { fmt.Println("Hello", <-ch) } fmt.Println("Exiting.") }
To przypomina jakiś serwis, zmieńmy nazwę funkcji i odbierajmy string
.
func communicate(name string) <-chan string { ch := make(chan string) go func() { for i := 0; ; i++ { ch <- fmt.Sprintf("%s: %d", name, i) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } }() return ch } func main() { ch1 := communicate("Bob") ch2 := communicate("Alice") for i := 0; i < 5; i++ { fmt.Println(<-ch1) fmt.Println(<-ch2) } fmt.Println("Exiting.") }
func fanIn(ch1, ch2 <-chan string) <-chan string { out := make(chan string) go func() { for { out <- <-ch1 } }() go func() { for { out <- <-ch2 } }() return out } func main() { ch := fanIn(communicate("Bob"), communicate("Alice")) for i := 0; i < 10; i++ { fmt.Println(<-ch) } fmt.Println("Exiting.") }
func fanIn(ch1, ch2 <-chan string) <-chan string { out := make(chan string) go func() { for { select { case v := <-ch1: out <- v case v := <-ch2: out <- v } } }() return out } func main() { ch := fanIn(communicate("Bob"), communicate("Alice")) for i := 0; i < 10; i++ { fmt.Println(<-ch) } fmt.Println("Exiting.") }
Używamy kanału, który blokuje przez określony czas.
func main() { ch := fanIn(communicate("Bob"), communicate("Alice")) for { select { case message := <-ch: fmt.Println(message) case <-time.After(500 * time.Millisecond): fmt.Println("I can't wait anymore!") return } } }
Stworzenie kanału przed pętlą doprowadzi do zakończenia całej komunikacji po
upłynięciu podanego czasu.
func main() { ch := fanIn(communicate("Bob"), communicate("Alice")) timeout := time.After(2 * time.Second) for { select { case message := <-ch: fmt.Println(message) case <-timeout: fmt.Println("I can't wait anymore!") return } } }
type Task struct { Id int } func Worker(in <-chan Task) { for { task := <-in fmt.Println("Processing task", task.Id) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } } func main() { ch := make(chan Task) for i := 0; i < 4; i++ { go Worker(ch) } for i := 0; i < 10; i++ { ch <- Task{Id: i} } }
Periodycznie pobieramy dane z listy adresów.
type Resource struct { url string polling bool lastPolled int64 } type Resources struct { data []*Resource lock *sync.Mutex }
func Poller(res *Resources) { for { res.lock.Lock() var r *Resource for _, v := range res.data { if v.polling { continue } if r == nil || v.lastPolled < r.lastPolled { r = v } } if r != nil { r.polling = true } res.lock.Unlock() if r == nil { continue } // poll the URL res.lock.Lock() r.polling = false r.lastPolled = time.Nanoseconds() res.lock.Unlock() } }
Fan out + fan in.
type Resource string func Poller(in, out chan *Resource) { for r := range in { // poll the URL out <- r } }
"Do not communicate by sharing memory; instead, share memory by communicating."
Przekazujemy wartość przez łańcuch goroutines.
const n = 100 * 1000 func main() { leftmost := make(chan int) right := leftmost // just init left := leftmost for i := 0; i < n; i++ { right = make(chan int) go func(left, right chan int) { right <- 1 + <-left }(left, right) left = right } go func() { leftmost <- 1 }() fmt.Println(<-left) }
Współbieżność pozwala na wprowadzenie paralelizmu.
Współbieżność pozwala łatwo skalować problem.
Dzięki współbieżność tworzenie praktycznych, prostych i paralelnych rozwiązań
jest prymitywne.
Dzięki zastosowaniu odpowiedniego sposobu komunikacji nie musimy martwić się
o synchronizację, jest ona skutkiem ubocznym sposobu w który piszemy program.
Users:
313 000 000 monthly active users.
Requests:
1 000 000 000 monthly visits to sites with embedded tweets.
Tweets:
6000 tweets per second.
350 000 tweets per minute.
500 000 000 tweets per day.
180 000 000 000 tweets per year.
Wysyłamy zapytanie do wszystkich serwerów które mogą ją zwrócić i odbieramy
pierwszą odpowiedź.
func First(query string, replicas ...TimelineServer) Timeline { c := make(chan Timeline, len(replicas)) for _, replica := range replicas { go func(s TimelineServer) { c <- s.Query(query) }(replica) } return <-c }
type Task struct { Id int } func Worker(in <-chan Task) { for { task := <-in fmt.Println("Processing task", task.Id) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) } } func main() { ch := make(chan Task) for i := 0; i < 4; i++ { go Worker(ch) } for i := 0; i < 10; i++ { ch <- Task{Id: i} } }
Workery sygnalizują, że praca została wykonana wysyłając samych siebie przez
dostarczony im kanał.
type Task struct { Id int } type Worker struct { tasks chan *Task load int } func (w *Worker) work(done chan<- *Worker) { for { task := <-w.tasks fmt.Println("Processing task", task.Id) time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) done <- w } }
Kanały pozwalają na synchronizację - nie używamy mutex'ów.
type Pool []*Worker type LoadBalancer struct { pool *Pool done chan *Worker } func (b *LoadBalancer) Run(tasks chan *Task) { for { select { case task := <-tasks: b.dispatch(task) case worker := <-b.done: b.completed(worker) } } }
Dostęp do listy worker'ów i ich pól jest synchronizowany i nie ma możliwych
zjawisk race condition.
func (b *LoadBalancer) dispatch(task *Task) { worker := b.pool.getLeastLoadedWorker() worker.tasks <- task worker.load++ } func (b *LoadBalancer) completed(worker *Worker) { worker.load-- }
Tworzymy potrzebne podstawowe struktury danych.
type UserID int type Tweet struct { text string date time.Time } type User struct { timeline []*Tweet followers []UserID }
Polecenia z zewnątrz przesyłane są kanałami.
type PutTweet struct { tweet *Tweet target []UserID } type PutFollower struct { user UserID target []UserID } type TimelineServer struct { Tweets <-chan *PutTweet Follows <-chan *PutFollower users map[UserID]*User }
Funkcja dodaje tweety i subskrypcje.
func (s *TimelineServer) run() { for { select { case putTweet := <-s.Tweets: for _, userID := range putTweet.target { s.users[userID].timeline = append(s.users[userID].timeline, putTweet.tweet) } case putFollower := <-s.Follows: for _, userID := range putFollower.target { s.users[userID].followers = append(s.users[userID].followers, putFollower.user) } } } }
Przesyłamy polecenia PutTweet
i PutFollower
do odpowiednich serwerów.
type Dispatcher struct { replicas []*TimelineServer } func (d *Dispatcher) follow(id, target UserID) { for replica := range d.findReplicas(target) { replica.Follows <- PutFollower{user: id, target: []UserID{target}} } } func (d *Dispatcher) tweet(id UserID, tweet *Tweet) { author := d.getFirstByID(id) for _, followerID := range author.followers { for replica := range d.findReplicas(followerID) { replica.Tweets <- PutTweet{tweet: tweet, target: author.followers} } } }
Goroutines:
- tanie
- powszechnie używane
Channels:
- komunikacja
- synchronizacja
Dobrze wprowadzona komunikacja rozwiązuje problemy synchronizacji.
Złożone problemy łatwo rozbić na małe proste elementy.
Te elementy skomponowane współbieżnie prowadzą to prostych do zrozumienia,
logicznych, skalujących się, a co najważniejsze poprawnych rozwiązań.
Współbieżność nie jest paralelizmem, ale potencjalnie prowadzi do paralelizmu.
Współbieżność jest prosta i sprawia, że inne rzeczy są proste.
"Concurrency is not parallelism" - Rob Pike
"Go Concurrency Patterns" - Rob Pike
Filip Borkiewicz