goでfile splitterを実装する あるいは AWS ECSによるbatch fan-out
仕事で↓の画像みたいな仕組みを作りました。
S3にアップロードしたcsv fileをLambdaで複数ファイルに分割して再度アップロードし、それぞれのs3 eventを別のLambdaで補足して、分割したファイルごとにFargateによるバッチ処理を起動してDynamoDBにデータを登録します。
条件が噛み合えばお好みの数のFargateを並列実行できます。
また、FargateのアプリケーションコードもGoで書かれていて、ここでもCSPによる並行処理を実装しています。
画像の例だと 3 ECS × 3 goroutineで 9並列!界王拳9倍!うぉおおおお!!
今回の記事では元ファイルを分割するLambda用に書いた splitter についてコードを載せます。GoによるCSPの練習として書いたものです。 一応機能していたので、多分そんなに間違っていないはず。。 (s3 関連の処理は適当に省略しています)
import ( "bufio" "context" "fmt" "github.com/abc/def/s3" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "io/ioutil" "log" "os" "strings" "sync" ) const UploadFolder = "SPLIT_CSV_FILES" const MaxMultiplicity = 3 func main() { lambda.Start(splitter) } // eventで取得したs3 objectをsplitしてs3にuploadするLambda function func splitter(_ context.Context, event events.S3Event) { s3Downloader := s3.InitS3Downloader() var wg sync.WaitGroup wg.Add(len(event.Records)) for _, r := range event.Records { bucketName := r.S3.Bucket.Name key := r.S3.Object.Key log.Printf("download bucket:%s\n", bucketName) log.Printf("download key:%s\n", key) go func() { defer wg.Done() // file download fn, n, err := s3Downloader.Download(bucketName, key) if err != nil { log.Panic(err.Error()) } log.Printf("DownloadedSize: %d byte\n", n) uploadFileCh := splitFile(fn, MaxMultiplicity) uploadFiles(uploadFileCh, bucketName) }() } wg.Wait() } func splitFile(fn, mu int) <-chan string { f, err := os.Open(fn) if err != nil { log.Panic(err.Error()) } scanner := bufio.NewScanner(f) // read header scanner.Scan() header := scanner.Text() // produce channel lineCh := produceLineCh(scanner) // temporary file create tempFiles := make([]*os.File, 0, mu) for i := 0; i < mu; i++ { f, err := ioutil.TempFile("", fmt.Sprintf("split%d.csv", i)) if err != nil { log.Panic(err.Error()) } tempFiles = append(tempFiles, f) } // fan-out var wg sync.WaitGroup uploadFileCh := make(chan string) for j := 0; j < mu; j++ { wg.Add(1) tf := tempFiles[j] go func() { defer wg.Done() // write header _, err = tf.WriteString(fmt.Sprintln(header)) if err != nil { log.Panic(err.Error()) } // write body for line := range lineCh { _, err := tf.WriteString(fmt.Sprintln(line)) if err != nil { log.Panic(err.Error()) } } uploadFileCh <- tf.Name() }() } go func() { wg.Wait() close(uploadFileCh) for _, v := range tempFiles { v.Close() } }() return uploadFileCh } func uploadFiles(uploadFileCh <-chan string, bucketName string) { s3Uploader := s3.InitS3Uploader() var wg sync.WaitGroup for fn := range uploadFileCh { wg.Add(1) fn := fn go func() { defer wg.Done() // tempFileはsuffixにランダムな数値がつくのでここで除去する var rfn string i := strings.LastIndex(fn, ".csv") if i != -1 { rfn = fn[:i+4] err := os.Rename(fn, rfn) if err != nil { log.Panic(err.Error()) } } else { // add suffix rfn = fn + ".csv" } key, err := s3Uploader.Upload(bucketName, UploadFolder, rfn) if err != nil { log.Panic(err.Error()) } fmt.Printf("upload file end successfully. Key=[%s]\n", key) }() } wg.Wait() } func produceLineCh(scanner *bufio.Scanner) <-chan string { lineCh := make(chan string) go func() { defer close(lineCh) for scanner.Scan() { lineCh <- scanner.Text() } if err := scanner.Err(); err != nil { log.Panic(err.Error()) } }() return lineCh }
まとめ
Goの並行処理実装は本当に面白いので色んな所で使っていきたい。