[Go] goroutine/channel/waitgroup
개요
Go의 동시성 모델은 고루틴(goroutine)과 채널(channel)을 중심으로 설계되었습니다.
주요 특징:
- 고루틴: 경량 쓰레드, 수천~수만 개 동시 실행 가능
- 채널: 고루틴 간 통신 및 동기화
- CSP 모델: Communicating Sequential Processes
- 메모리 공유 대신 통신: “Don’t communicate by sharing memory; share memory by communicating”
- 스케줄러: Go 런타임이 M:N 스케줄링 수행
- 동기화 도구: WaitGroup, Mutex, Once 등 제공
고루틴 (Goroutine)
1. 기본 개념
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine")
}
func main() {
// 일반 함수 호출
sayHello()
// 고루틴으로 실행
go sayHello()
// 익명 함수를 고루틴으로
go func() {
fmt.Println("Anonymous goroutine")
}()
// 메인 고루틴이 종료되면 모든 고루틴도 종료됨
time.Sleep(100 * time.Millisecond)
fmt.Println("Main exits")
}
2. 고루틴 특징
import "runtime"
func goroutineCharacteristics() {
// 1. 경량: 초기 스택 2KB (vs 쓰레드 1-2MB)
for i := 0; i < 10000; i++ {
go func(n int) {
time.Sleep(time.Hour)
}(i)
}
fmt.Printf("Number of goroutines: %d\n", runtime.NumGoroutine())
// 2. 동적 스택: 필요에 따라 자동으로 증가/감소
// 3. M:N 스케줄링: M개 고루틴을 N개 OS 쓰레드에 매핑
fmt.Printf("Number of CPUs: %d\n", runtime.NumCPU())
// 4. 고루틴 ID 없음 (의도적 설계)
}
3. 클로저와 고루틴
func closureIssue() {
// ❌ 잘못된 사용 (모두 같은 값 출력)
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // i를 캡처 (참조)
}()
}
// 출력: 5, 5, 5, 5, 5 (또는 다른 순서)
time.Sleep(100 * time.Millisecond)
// ✅ 올바른 사용 1: 파라미터로 전달
for i := 0; i < 5; i++ {
go func(n int) {
fmt.Println(n)
}(i)
}
time.Sleep(100 * time.Millisecond)
// ✅ 올바른 사용 2: 로컬 변수 복사
for i := 0; i < 5; i++ {
i := i // 섀도잉
go func() {
fmt.Println(i)
}()
}
time.Sleep(100 * time.Millisecond)
}
채널 (Channel)
1. 채널 기본
func channelBasics() {
// 채널 생성
ch := make(chan int)
// 송신 (다른 고루틴에서)
go func() {
ch <- 42 // 값 전송
}()
// 수신
value := <-ch
fmt.Println("Received:", value)
// 송수신은 블로킹
// 상대방이 준비될 때까지 대기
}
2. Unbuffered Channel (버퍼 없는 채널)
func unbufferedChannel() {
ch := make(chan string)
// ❌ 데드락 발생
// ch <- "hello" // 수신자가 없어서 영원히 블록됨
// ✅ 고루틴에서 송신
go func() {
ch <- "hello"
fmt.Println("Sent")
}()
msg := <-ch
fmt.Println("Received:", msg)
// 특징:
// - 수신자가 있어야 송신 가능 (동기화 보장)
// - 송신과 수신이 정확히 동시에 일어남
}
3. Buffered Channel (버퍼 있는 채널)
func bufferedChannel() {
ch := make(chan int, 3) // 버퍼 크기 3
// 버퍼가 가득 차기 전까지 블록 안됨
ch <- 1
ch <- 2
ch <- 3
fmt.Println("Sent 3 values")
// ❌ 4번째는 블록됨 (버퍼 가득)
// ch <- 4
// 수신
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
// 이제 다시 송신 가능
ch <- 4
fmt.Println(<-ch) // 3
fmt.Println(<-ch) // 4
// 특징:
// - 비동기 통신 가능
// - 버퍼 크기만큼 송신 가능
// - len(ch): 현재 버퍼의 원소 수
// - cap(ch): 버퍼 용량
}
4. 채널 닫기
func channelClose() {
ch := make(chan int, 3)
// 송신
ch <- 1
ch <- 2
ch <- 3
close(ch) // 채널 닫기
// ❌ 닫힌 채널에 송신하면 패닉
// ch <- 4 // panic: send on closed channel
// ✅ 닫힌 채널에서 수신은 가능
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
fmt.Println(<-ch) // 0 (제로값)
// 채널이 닫혔는지 확인
value, ok := <-ch
if !ok {
fmt.Println("Channel is closed")
}
// range로 순회 (채널이 닫힐 때까지)
ch2 := make(chan int, 3)
ch2 <- 10
ch2 <- 20
ch2 <- 30
close(ch2)
for v := range ch2 {
fmt.Println(v)
}
}
5. 채널 방향성
// 송신 전용 채널
func sendOnly(ch chan<- int) {
ch <- 42
// val := <-ch // 컴파일 에러
}
// 수신 전용 채널
func receiveOnly(ch <-chan int) {
val := <-ch
fmt.Println(val)
// ch <- 42 // 컴파일 에러
}
// 양방향 채널
func bidirectional(ch chan int) {
ch <- 42
val := <-ch
fmt.Println(val)
}
func channelDirections() {
ch := make(chan int, 1)
// 양방향 채널을 단방향으로 전달
go sendOnly(ch)
receiveOnly(ch)
bidirectional(ch)
}
select 문
1. 기본 사용법
func selectBasics() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "from ch2"
}()
// 여러 채널 중 하나가 준비되면 실행
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
// 먼저 준비된 ch1의 메시지 출력
}
2. default 케이스 (비블로킹)
func selectDefault() {
ch := make(chan int)
select {
case val := <-ch:
fmt.Println("Received:", val)
default:
fmt.Println("No data available")
}
// default가 있으면 블록되지 않음
}
3. 타임아웃 패턴
func selectTimeout() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(1 * time.Second):
fmt.Println("Timeout!")
}
}
4. 다중 수신
func selectMultiple() {
ch1 := make(chan int)
ch2 := make(chan int)
done := make(chan bool)
go func() {
for i := 0; i < 3; i++ {
ch1 <- i
}
}()
go func() {
for i := 0; i < 3; i++ {
ch2 <- i * 10
}
}()
go func() {
time.Sleep(500 * time.Millisecond)
done <- true
}()
for {
select {
case v1 := <-ch1:
fmt.Println("ch1:", v1)
case v2 := <-ch2:
fmt.Println("ch2:", v2)
case <-done:
fmt.Println("Done!")
return
}
}
}
5. nil 채널 활용
func selectNilChannel() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
for i := 0; i < 3; i++ {
ch1 <- i
}
close(ch1)
}()
go func() {
for i := 0; i < 3; i++ {
ch2 <- i * 10
}
close(ch2)
}()
for ch1 != nil || ch2 != nil {
select {
case v, ok := <-ch1:
if !ok {
ch1 = nil // nil 채널은 select에서 무시됨
} else {
fmt.Println("ch1:", v)
}
case v, ok := <-ch2:
if !ok {
ch2 = nil
} else {
fmt.Println("ch2:", v)
}
}
}
}
WaitGroup
1. 기본 사용법
import "sync"
func waitGroupBasic() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 대기 카운터 증가
go func(n int) {
defer wg.Done() // 완료 시 카운터 감소
fmt.Printf("Goroutine %d\n", n)
time.Sleep(100 * time.Millisecond)
}(i)
}
wg.Wait() // 모든 고루틴 완료 대기
fmt.Println("All goroutines completed")
}
2. WaitGroup 전달
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func waitGroupPointer() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg) // 포인터로 전달
}
wg.Wait()
}
3. WaitGroup 주의사항
func waitGroupMistakes() {
var wg sync.WaitGroup
// ❌ 잘못된 사용 1: Add를 고루틴 안에서 호출
for i := 0; i < 5; i++ {
go func(n int) {
wg.Add(1) // 레이스 컨디션 발생 가능
defer wg.Done()
fmt.Println(n)
}(i)
}
// wg.Wait() // Wait이 Add보다 먼저 실행될 수 있음
// ✅ 올바른 사용: Add를 고루틴 밖에서 호출
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
fmt.Println(n)
}(i)
}
wg.Wait()
}
실전 패턴
1. 워커 풀 (Worker Pool)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Output string
}
func workerPool() {
numWorkers := 3
jobs := make(chan Job, 10)
results := make(chan Result, 10)
// 워커 시작
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 작업 제출
go func() {
for j := 1; j <= 9; j++ {
jobs <- Job{ID: j, Data: fmt.Sprintf("task-%d", j)}
}
close(jobs)
}()
// 결과 수집
go func() {
wg.Wait()
close(results)
}()
// 결과 출력
for result := range results {
fmt.Printf("Job %d: %s\n", result.Job.ID, result.Output)
}
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)
results <- Result{
Job: job,
Output: fmt.Sprintf("processed by worker %d", id),
}
}
}
2. 파이프라인 (Pipeline)
func pipeline() {
// Stage 1: 숫자 생성
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: 제곱
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Stage 3: 출력
print := func(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
// 파이프라인 실행
numbers := gen(1, 2, 3, 4, 5)
squared := sq(numbers)
print(squared)
}
3. Fan-out, Fan-in
func fanOutFanIn() {
// 입력 생성
input := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Fan-out: 여러 워커에게 분산
numWorkers := 3
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = process(input)
}
// Fan-in: 여러 채널을 하나로 합침
merged := merge(channels...)
for result := range merged {
fmt.Println(result)
}
}
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
time.Sleep(50 * time.Millisecond)
}
close(out)
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
4. 컨텍스트를 사용한 취소
import "context"
func contextCancellation() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan int)
go func() {
n := 1
for {
select {
case <-ctx.Done():
fmt.Println("Goroutine cancelled")
return
case ch <- n:
n++
time.Sleep(100 * time.Millisecond)
}
}
}()
for i := 0; i < 5; i++ {
fmt.Println(<-ch)
}
cancel() // 고루틴 종료
time.Sleep(200 * time.Millisecond)
}
5. Rate Limiting
func rateLimiting() {
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
// 200ms마다 하나씩 처리
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter // 다음 틱까지 대기
fmt.Println("Request", req, time.Now())
}
}
func burstRateLimiting() {
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
// 버스트 허용 (최대 3개까지 즉시 처리)
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
for req := range requests {
<-burstyLimiter
fmt.Println("Request", req, time.Now())
}
}
6. 세마포어 (Semaphore)
func semaphore() {
const maxConcurrent = 3
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem <- struct{}{} // 세마포어 획득
defer func() { <-sem }() // 세마포어 해제
fmt.Printf("Task %d starting\n", id)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Task %d done\n", id)
}(i)
}
wg.Wait()
}
7. 타임아웃과 함께 작업 실행
func doWorkWithTimeout(timeout time.Duration) error {
done := make(chan bool, 1)
go func() {
// 시간이 걸리는 작업
time.Sleep(2 * time.Second)
done <- true
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return fmt.Errorf("operation timed out")
}
}
func main() {
err := doWorkWithTimeout(1 * time.Second)
if err != nil {
fmt.Println("Error:", err) // Timeout
}
err = doWorkWithTimeout(3 * time.Second)
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Success")
}
}
8. 고루틴 풀 재사용
type Pool struct {
work chan func()
sem chan struct{}
}
func NewPool(workers int) *Pool {
p := &Pool{
work: make(chan func()),
sem: make(chan struct{}, workers),
}
for i := 0; i < workers; i++ {
go p.worker()
}
return p
}
func (p *Pool) worker() {
for fn := range p.work {
fn()
<-p.sem
}
}
func (p *Pool) Schedule(fn func()) {
p.sem <- struct{}{}
p.work <- fn
}
func (p *Pool) Shutdown() {
close(p.work)
}
func poolExample() {
pool := NewPool(3)
for i := 1; i <= 10; i++ {
i := i
pool.Schedule(func() {
fmt.Printf("Task %d\n", i)
time.Sleep(500 * time.Millisecond)
})
}
time.Sleep(6 * time.Second)
pool.Shutdown()
}
동기화 프리미티브
1. Mutex (상호 배제)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func mutexExample() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Value()) // 1000
}
2. RWMutex (읽기-쓰기 잠금)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 읽기 잠금
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 쓰기 잠금
defer c.mu.Unlock()
c.data[key] = value
}
3. sync.Once (한 번만 실행)
var (
instance *Singleton
once sync.Once
)
type Singleton struct {
value string
}
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating instance")
instance = &Singleton{value: "singleton"}
})
return instance
}
func onceExample() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := GetInstance()
fmt.Println(s.value)
}()
}
wg.Wait()
// "Creating instance"는 한 번만 출력됨
}
4. sync.Cond (조건 변수)
func condExample() {
cond := sync.NewCond(&sync.Mutex{})
ready := false
// 대기하는 고루틴들
for i := 0; i < 3; i++ {
go func(id int) {
cond.L.Lock()
for !ready {
cond.Wait() // 조건이 만족될 때까지 대기
}
fmt.Printf("Goroutine %d proceeding\n", id)
cond.L.Unlock()
}(i)
}
time.Sleep(1 * time.Second)
// 조건 만족 후 알림
cond.L.Lock()
ready = true
cond.Broadcast() // 모든 대기 중인 고루틴 깨우기
cond.L.Unlock()
time.Sleep(1 * time.Second)
}
일반적인 실수
1. 고루틴 누수
func goroutineLeakBad() {
ch := make(chan int)
// ❌ 고루틴이 영원히 블록됨
go func() {
val := <-ch // 아무도 보내지 않음
fmt.Println(val)
}()
// 고루틴이 메모리에 남음
}
func goroutineLeakGood() {
ch := make(chan int)
done := make(chan bool)
// ✅ 취소 가능한 고루틴
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-done:
return
}
}()
time.Sleep(1 * time.Second)
close(done) // 고루틴 종료
}
2. 닫힌 채널에 송신
func closedChannelSend() {
ch := make(chan int, 1)
close(ch)
// ❌ 패닉 발생
// ch <- 1 // panic: send on closed channel
// ✅ 채널 상태 확인 또는 컨텍스트 사용
}
3. WaitGroup 카운터 오류
func waitGroupCounterBad() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
go func() {
wg.Add(1) // ❌ 레이스 컨디션
defer wg.Done()
time.Sleep(100 * time.Millisecond)
}()
}
wg.Wait() // Wait이 Add보다 먼저 실행될 수 있음
}
func waitGroupCounterGood() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // ✅ 고루틴 시작 전에 Add
go func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
}()
}
wg.Wait()
}
4. 버퍼 크기 오류
func bufferSizeMistake() {
ch := make(chan int, 1)
ch <- 1
ch <- 2 // ❌ 블록됨 (버퍼 가득)
// ✅ 적절한 버퍼 크기 또는 비동기 수신
}
5. select에서 nil 채널
func nilChannelSelect() {
var ch chan int // nil
select {
case val := <-ch: // nil 채널은 영원히 블록
fmt.Println(val)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
// Timeout 출력
}
6. 데이터 레이스
func dataRace() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // ❌ 레이스 컨디션
}()
}
wg.Wait()
fmt.Println(counter) // 1000이 아닐 수 있음
// go run -race program.go로 검출
}
7. 포인터 없이 WaitGroup 전달
func waitGroupValueBad(wg sync.WaitGroup) {
defer wg.Done() // ❌ 복사본에만 영향
time.Sleep(100 * time.Millisecond)
}
func waitGroupValueGood(wg *sync.WaitGroup) {
defer wg.Done() // ✅ 원본에 영향
time.Sleep(100 * time.Millisecond)
}
성능 최적화
1. 고루틴 수 제한
func limitGoroutines() {
tasks := make([]int, 100)
results := make(chan int, 100)
// ❌ 100개 고루틴 생성 (과도함)
// for _, task := range tasks {
// go process(task, results)
// }
// ✅ 워커 풀로 제한
const maxWorkers = 10
sem := make(chan struct{}, maxWorkers)
for _, task := range tasks {
sem <- struct{}{}
go func(t int) {
defer func() { <-sem }()
results <- t * 2
}(task)
}
for i := 0; i < len(tasks); i++ {
<-results
}
}
2. 버퍼 크기 최적화
func bufferOptimization() {
// ❌ 버퍼 없음 (동기화 오버헤드)
ch1 := make(chan int)
// ✅ 적절한 버퍼 (비동기 처리)
ch2 := make(chan int, 100)
// 너무 큰 버퍼는 메모리 낭비
// 작업 특성에 맞게 조정
_ = ch1
_ = ch2
}
3. sync.Pool로 객체 재사용
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func processData(data []byte) {
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
}()
buf.Write(data)
// 처리...
}
베스트 프랙티스
1. 채널 소유권
// ✅ 채널을 생성한 쪽에서 닫기
func producer() <-chan int {
ch := make(chan int)
go func() {
defer close(ch) // 생산자가 닫음
for i := 0; i < 10; i++ {
ch <- i
}
}()
return ch
}
func consumer(ch <-chan int) {
for val := range ch {
fmt.Println(val)
}
// 소비자는 닫지 않음
}
2. 컨텍스트 사용
func longRunningTask(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 작업 수행
time.Sleep(100 * time.Millisecond)
}
}
}
3. 에러 그룹
import "golang.org/x/sync/errgroup"
func errorGroupExample() error {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"http://example.com",
"http://example.org",
"http://example.net",
}
for _, url := range urls {
url := url
g.Go(func() error {
return fetch(ctx, url)
})
}
return g.Wait() // 첫 에러 반환
}
func fetch(ctx context.Context, url string) error {
// HTTP 요청
return nil
}
정리
- 고루틴: 경량 쓰레드,
go키워드로 실행 - 채널: 고루틴 간 통신,
make(chan Type, buffer) - Unbuffered: 동기 통신, 송수신 모두 블로킹
- Buffered: 비동기 통신, 버퍼 가득 차면 블로킹
- close(): 송신 종료 신호, 수신은 계속 가능
- select: 여러 채널 중 준비된 것 처리
- WaitGroup: 고루틴 완료 대기, Add/Done/Wait
- Mutex: 상호 배제, 임계 영역 보호
- 패턴: 워커 풀, 파이프라인, Fan-out/in
- 주의: 고루틴 누수, 데이터 레이스, 데드락
- 최적화: 고루틴 수 제한, 버퍼 크기, sync.Pool
- 원칙: 채널로 통신, 메모리 공유 최소화