すきま風

勉強したことのメモとか

goroutineを使ってdatabaseに並列でInsertするサンプル

databaseにgoroutineで並行にinsertするサンプルプログラムです。仕事で必要になって実験しました。

database 接続

sql.Open() を使ってDB Instanceを取得します。DB Instanceは並行安全で、このInstanceがConnection Poolを管理するので Singletonにしてアプリケーション全体で共通利用することにします。

// database.go

// かんたんsingleton
var singletonDB *sql.DB
var lock sync.Mutex

func Connect() *sql.DB {
    lock.Lock()
    defer lock.Unlock()
    if singletonDB != nil {
        return singletonDB
    }
    // サンプルなのでerr無視
    singletonDB, _ = sql.Open(
        "postgres",
        fmt.Sprintf(
            "host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
            os.Getenv("host"),
            os.Getenv("port"),
            os.Getenv("user"),
            os.Getenv("password"),
            os.Getenv("dbname"),
        ),
    )

    // connection poolingの設定をする
    singletonDB.SetMaxOpenConns(10)
    singletonDB.SetConnMaxIdleTime(10 * time.Second)
    singletonDB.SetConnMaxLifetime(10 * time.Second)

    return singletonDB
}

goroutine fan-out

Sample tableを適当に用意してInsertしていきます。PrepareするとDB InstanceはConnectionを発行して、Stmt Instanceを返します。 Stmtも並行安全なので、goroutineで共用します。自分のLocal環境のテストでは結果は変わらなかったのですが、 きちんとした環境ではgoroutineごとにStmtを発行 (= goroutineごとにConnectionをもつ) ほうが早いかもしれません。ただ、Transaction管理とかしだすと辛いと思います。
あと、公式の説明によるとgoではDB InstanceをCloseするということは基本行わないらしいのでStmtのCloseしかしていません。StmtをCloseするとConnectionはIdleになります。

func () Insert(entityCh <-chan entity.SampleEntity, gophers int) <-chan error {
    errCh := make(chan error)

    // singleton DB instanceを取得する
    db := Connect()

    // stmtは並行安全なのでgoroutineで共用する
    stmt, err := db.Prepare("insert into sample (id, label) values ($1, $2)")
    if err != nil {
        log.Fatal(err)
    }

    var wg sync.WaitGroup
    wg.Add(gophers)

    // fan-out. goroutineを複数生成する
    for i := 0; i < gophers; i++ {
        go func() {
            defer wg.Done()

            // 並行処理でInsertしていく...
            for ent := range entityCh {
                _, err := stmt.Exec(ent.Id, ent.Label)
                if err != nil {
                    errCh <- err
                }
            }
        }()
    }

    go func() {
        defer stmt.Close()
        wg.Wait()
        close(errCh)
    }()

    return errCh
}

性能

以下の環境で試験しています。

software version
os macOS Catalina
go 1.15
postgres postgres:13-alpine


50,000件Insertしてみた結果は以下

並行数 時間
1 101 sec
3 52 sec

goroutine の数を増やしたらちゃんと早くなりました 😌

参考

golang.org