File upload using Go Routine and MinIo

October 6, 2024 (3mo ago)

Hi there!

I've been working on a side project for while. (Click here to see the project) This project led me to explore Go.

For my use case, I needed an object storage solution similar to S3 or Google Cloud Storage. I found an open-source alternative, minio, which fit my needs perfectly.

I tested MinIO using their publicly hosted version. You can access it from here. For credentials and login details, Refer Here

Next, I integrated MinIO with my Go application using the MinIO Go SDK. Since I wanted to upload multiple files simultaneously, I used goroutines and channels for concurrent uploads.

The full source code is here, but I'll explain a few key snippets.

Go initial setup

Create a simple go project. (If you haven't installed Go yet, refer here)

go mod init minio-example

To begin, I wrote a simple API using the Gin framework to handle file uploads:

// end point to upload the files
r.POST("/upload-files", func(ctx *gin.Context) {
	// Call service to handle the request
	result := service.UploadFiles(ctx)
	ctx.JSON(result.Status_code, result)
})

This API accepts a POST request with files (e.g., images) in the payload. The actual file handling occurs in the service layer, which:

  1. Connects to MinIO
  2. Retrieves the files from the multipart request
  3. Iterates through each file
  4. Fires off a goroutine to upload each file, using a channel to track the status
  5. Listens to the channel for file upload statuses
  6. Returns the status and closes the request
// get the files
files := form.File["files"]
 
// Create a channel with length of files
ch := make(chan model.Uploadstatus, len(files))
 
// variable to store the fileuplod status
var FileUploadStatus []model.Uploadstatus
 
// Upload into bucket
for _, file := range files {
	// Fire each goroutine to upload files into bucket
	// Set WaitGroup to wait till it ends
	// After data upload each goroutine update their status in the structure
	// to protect from Race condition we can use either mutex or channels
	// Im using channels to collect the result without race condition
 
	go utils.PutImageInBucket(ctx, bucket_name, file, client, FileUploadStatus, ch)
}
 
// Run a channel to collect the result
for i := 0; i < len(files); i++ {
	FileUploadStatus = append(FileUploadStatus, <-ch)
}

Each goroutine PutImageInBucket() abstracts the MinIO SDK's PutObject function and uses a channel to return the status:

func PutImageInBucket(ctx *gin.Context, bucket_name string, file *multipart.FileHeader, client *minio.Client, FileUploadStatus []model.Uploadstatus, ch chan model.Uploadstatus) {
	var uploadStatusOfGoRoutine model.Uploadstatus
	// Create a unique file name
	object_name := fmt.Sprintf("%s-%s", uuid.NewString(), file.Filename) // uuid + file name (to makesure file name is unique)
 
	// Update the static vars
	uploadStatusOfGoRoutine.BucketName = bucket_name
	uploadStatusOfGoRoutine.ObjectName = object_name
 
	// open file
	reader, err := file.Open()
	if err != nil {
		// update status
		uploadStatusOfGoRoutine.Status = false
		slog.Error("Error processing file", "filename", file.Filename, "error", err.Error())
		// pass value into channel and exit
		ch <- uploadStatusOfGoRoutine
		ctx.Done()
	}
	defer reader.Close()
 
	info, err := client.PutObject(ctx, bucket_name, object_name, reader, file.Size, minio.PutObjectOptions{ContentType: "application/image"})
	if err != nil {
		// update status
		uploadStatusOfGoRoutine.Status = false
		slog.Error("Error while uploading file", "filename", file.Filename, "error", err.Error())
		// pass value into channel and exit
		ch <- uploadStatusOfGoRoutine
		ctx.Done()
	}
	slog.Info("Successfully uploaded file %v Size : %d", file.Filename, info.Size)
	uploadStatusOfGoRoutine.Status = true
	// pass value into channel and exit
	ch <- uploadStatusOfGoRoutine
	ctx.Done()
}

After uploading, I generate pre-signed URLs to allow clients to access the uploaded files. Pre-signed URLs require you to specify the validity period (from 1 second to 7 days).

func GeneratePresignedURL(ctx *gin.Context, client *minio.Client, bucket_name string) []model.Files {
	var filePaths []model.Files
	// Create a done channel.
	doneCh := make(chan struct{})
	defer close(doneCh)
 
	// Read the obejct information from given bucket
	// After each reading use the object name to generate the PreSigned URLs
	for message := range client.ListObjects(ctx, bucket_name, minio.ListObjectsOptions{Prefix: "", Recursive: true}) {
		objectName := message.Key
 
		// With object name and bucket create a presigned URL with 60 Sec validity
		reqParams := make(url.Values)
		reqParams.Set("response-content-disposition", fmt.Sprintf("attachment; filename=\"%s\"", objectName))
 
		url, err := client.PresignedGetObject(ctx, bucket_name, objectName, time.Duration(60*int(time.Second)), reqParams)
		if err != nil {
			slog.Error("Error while retrieving preSigned URL", "error", err.Error())
			continue
		}
		filePaths = append(filePaths, model.Files{FilePath: url.String()})
	}
	slog.Info("Successfully Generated presigned URLs for objects in ", "bucket name", bucket_name)
	return filePaths
}

But Why??

By using goroutines, the application can upload multiple files concurrently. Channels help gather the upload statuses from each goroutine without encountering Race condition.