すきま風

勉強したことのメモとか

goでfile splitterを実装する あるいは AWS ECSによるbatch fan-out

仕事で↓の画像みたいな仕組みを作りました。

f:id:radiochemical:20201006152546p:plain
fan-out

S3にアップロードしたcsv fileをLambdaで複数ファイルに分割して再度アップロードし、それぞれのs3 eventを別のLambdaで補足して、分割したファイルごとにFargateによるバッチ処理を起動してDynamoDBにデータを登録します。


条件が噛み合えばお好みの数のFargateを並列実行できます。
また、FargateのアプリケーションコードもGoで書かれていて、ここでもCSPによる並行処理を実装しています。
画像の例だと 3 ECS × 3 goroutineで 9並列!界王拳9倍!うぉおおおお!!


今回の記事では元ファイルを分割するLambda用に書いた splitter についてコードを載せます。GoによるCSPの練習として書いたものです。 一応機能していたので、多分そんなに間違っていないはず。。 (s3 関連の処理は適当に省略しています)

import (
    "bufio"
    "context"
    "fmt"
    "github.com/abc/def/s3"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "io/ioutil"
    "log"
    "os"
    "strings"
    "sync"
)

const UploadFolder = "SPLIT_CSV_FILES"

const MaxMultiplicity = 3

func main() {
    lambda.Start(splitter)
}

// eventで取得したs3 objectをsplitしてs3にuploadするLambda function
func splitter(_ context.Context, event events.S3Event) {
    s3Downloader := s3.InitS3Downloader()

    var wg sync.WaitGroup
    wg.Add(len(event.Records))

    for _, r := range event.Records {
        bucketName := r.S3.Bucket.Name
        key := r.S3.Object.Key

        log.Printf("download bucket:%s\n", bucketName)
        log.Printf("download key:%s\n", key)

        go func() {
            defer wg.Done()
            // file download
            fn, n, err := s3Downloader.Download(bucketName, key)

            if err != nil {
                log.Panic(err.Error())
            }

            log.Printf("DownloadedSize: %d byte\n", n)

            uploadFileCh := splitFile(fn, MaxMultiplicity)

            uploadFiles(uploadFileCh, bucketName)
        }()
    }

    wg.Wait()
}

func splitFile(fn, mu int) <-chan string {
    f, err := os.Open(fn)
    if err != nil {
        log.Panic(err.Error())
    }

    scanner := bufio.NewScanner(f)

    // read header
    scanner.Scan()
    header := scanner.Text()

    // produce channel
    lineCh := produceLineCh(scanner)

    // temporary file create
    tempFiles := make([]*os.File, 0, mu)
    for i := 0; i < mu; i++ {
        f, err := ioutil.TempFile("", fmt.Sprintf("split%d.csv", i))

        if err != nil {
            log.Panic(err.Error())
        }

        tempFiles = append(tempFiles, f)
    }

    // fan-out
    var wg sync.WaitGroup
    uploadFileCh := make(chan string)

    for j := 0; j < mu; j++ {
        wg.Add(1)
        tf := tempFiles[j]
        go func() {
            defer wg.Done()

            // write header
            _, err = tf.WriteString(fmt.Sprintln(header))
            if err != nil {
                log.Panic(err.Error())
            }

            // write body
            for line := range lineCh {
                _, err := tf.WriteString(fmt.Sprintln(line))
                if err != nil {
                    log.Panic(err.Error())
                }
            }

            uploadFileCh <- tf.Name()
        }()
    }

    go func() {
        wg.Wait()
        close(uploadFileCh)

        for _, v := range tempFiles {
            v.Close()
        }
    }()

    return uploadFileCh
}

func uploadFiles(uploadFileCh <-chan string, bucketName string) {
    s3Uploader := s3.InitS3Uploader()

    var wg sync.WaitGroup
    for fn := range uploadFileCh {
        wg.Add(1)
        fn := fn

        go func() {
            defer wg.Done()

            // tempFileはsuffixにランダムな数値がつくのでここで除去する
            var rfn string
            i := strings.LastIndex(fn, ".csv")
            if i != -1 {
                rfn = fn[:i+4]
                err := os.Rename(fn, rfn)
                if err != nil {
                    log.Panic(err.Error())
                }
            } else {
                // add suffix
                rfn = fn + ".csv"
            }

            key, err := s3Uploader.Upload(bucketName, UploadFolder, rfn)

            if err != nil {
                log.Panic(err.Error())
            }

            fmt.Printf("upload file end successfully. Key=[%s]\n", key)
        }()
    }

    wg.Wait()
}

func produceLineCh(scanner *bufio.Scanner) <-chan string {
    lineCh := make(chan string)

    go func() {
        defer close(lineCh)

        for scanner.Scan() {
            lineCh <- scanner.Text()
        }

        if err := scanner.Err(); err != nil {
            log.Panic(err.Error())
        }
    }()

    return lineCh
}

まとめ

Goの並行処理実装は本当に面白いので色んな所で使っていきたい。