goroutineを使ってdatabaseに並列でInsertするサンプル
databaseにgoroutineで並行にinsertするサンプルプログラムです。仕事で必要になって実験しました。
database 接続
sql.Open() を使ってDB Instanceを取得します。DB Instanceは並行安全で、このInstanceがConnection Poolを管理するので Singletonにしてアプリケーション全体で共通利用することにします。
// database.go // かんたんsingleton var singletonDB *sql.DB var lock sync.Mutex func Connect() *sql.DB { lock.Lock() defer lock.Unlock() if singletonDB != nil { return singletonDB } // サンプルなのでerr無視 singletonDB, _ = sql.Open( "postgres", fmt.Sprintf( "host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", os.Getenv("host"), os.Getenv("port"), os.Getenv("user"), os.Getenv("password"), os.Getenv("dbname"), ), ) // connection poolingの設定をする singletonDB.SetMaxOpenConns(10) singletonDB.SetConnMaxIdleTime(10 * time.Second) singletonDB.SetConnMaxLifetime(10 * time.Second) return singletonDB }
goroutine fan-out
Sample tableを適当に用意してInsertしていきます。PrepareするとDB InstanceはConnectionを発行して、Stmt Instanceを返します。
Stmtも並行安全なので、goroutineで共用します。自分のLocal環境のテストでは結果は変わらなかったのですが、
きちんとした環境ではgoroutineごとにStmtを発行 (= goroutineごとにConnectionをもつ) ほうが早いかもしれません。ただ、Transaction管理とかしだすと辛いと思います。
あと、公式の説明によるとgoではDB InstanceをCloseするということは基本行わないらしいのでStmtのCloseしかしていません。StmtをCloseするとConnectionはIdleになります。
func () Insert(entityCh <-chan entity.SampleEntity, gophers int) <-chan error { errCh := make(chan error) // singleton DB instanceを取得する db := Connect() // stmtは並行安全なのでgoroutineで共用する stmt, err := db.Prepare("insert into sample (id, label) values ($1, $2)") if err != nil { log.Fatal(err) } var wg sync.WaitGroup wg.Add(gophers) // fan-out. goroutineを複数生成する for i := 0; i < gophers; i++ { go func() { defer wg.Done() // 並行処理でInsertしていく... for ent := range entityCh { _, err := stmt.Exec(ent.Id, ent.Label) if err != nil { errCh <- err } } }() } go func() { defer stmt.Close() wg.Wait() close(errCh) }() return errCh }
性能
以下の環境で試験しています。
software | version |
---|---|
os | macOS Catalina |
go | 1.15 |
postgres | postgres:13-alpine |
50,000件Insertしてみた結果は以下
並行数 | 時間 |
---|---|
1 | 101 sec |
3 | 52 sec |
goroutine の数を増やしたらちゃんと早くなりました 😌