aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPetter Rasmussen2016-02-20 23:11:23 +0100
committerPetter Rasmussen2016-02-20 23:39:11 +0100
commita9e9da783481fcb8022eb52fb944cb9ee13997de (patch)
treef817301b74f0f1787e310c8d6f2560bbb9cce457
parent308c7dceac93e7496453332139aac006da408629 (diff)
downloadgdrive-a9e9da783481fcb8022eb52fb944cb9ee13997de.tar.bz2
Wrap media uploads in TimeoutReader
-rw-r--r--drive/sync_upload.go14
-rw-r--r--drive/timeout_reader.go5
-rw-r--r--drive/update.go7
-rw-r--r--drive/upload.go14
4 files changed, 30 insertions, 10 deletions
diff --git a/drive/sync_upload.go b/drive/sync_upload.go
index a94e507..7a0833a 100644
--- a/drive/sync_upload.go
+++ b/drive/sync_upload.go
@@ -305,9 +305,12 @@ func (self *Drive) uploadMissingFile(parentId string, lf *LocalFile, args Upload
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
// Wrap file in progress reader
- srcReader := getProgressReader(srcFile, args.Progress, lf.info.Size())
+ progressReader := getProgressReader(srcFile, args.Progress, lf.info.Size())
- _, err = self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum").Media(srcReader, chunkSize).Do()
+ // Wrap reader in timeout reader
+ reader, ctx := getTimeoutReaderContext(progressReader)
+
+ _, err = self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
if isBackendError(err) && try < MaxBackendErrorRetries {
exponentialBackoffSleep(try)
@@ -341,9 +344,12 @@ func (self *Drive) updateChangedFile(cf *changedFile, args UploadSyncArgs, try i
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
// Wrap file in progress reader
- srcReader := getProgressReader(srcFile, args.Progress, cf.local.info.Size())
+ progressReader := getProgressReader(srcFile, args.Progress, cf.local.info.Size())
+
+ // Wrap reader in timeout reader
+ reader, ctx := getTimeoutReaderContext(progressReader)
- _, err = self.service.Files.Update(cf.remote.file.Id, dstFile).Media(srcReader, chunkSize).Do()
+ _, err = self.service.Files.Update(cf.remote.file.Id, dstFile).Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
if isBackendError(err) && try < MaxBackendErrorRetries {
exponentialBackoffSleep(try)
diff --git a/drive/timeout_reader.go b/drive/timeout_reader.go
index e228160..ba2bb83 100644
--- a/drive/timeout_reader.go
+++ b/drive/timeout_reader.go
@@ -10,6 +10,11 @@ import (
const MaxIdleTimeout = time.Second * 120
const TimeoutTimerInterval = time.Second * 10
+func getTimeoutReaderContext(r io.Reader) (io.Reader, context.Context) {
+ ctx, cancel := context.WithCancel(context.TODO())
+ return getTimeoutReader(r, cancel), ctx
+}
+
func getTimeoutReader(r io.Reader, cancel context.CancelFunc) io.Reader {
return &TimeoutReader{
reader: r,
diff --git a/drive/update.go b/drive/update.go
index c4ee341..5bdd040 100644
--- a/drive/update.go
+++ b/drive/update.go
@@ -54,12 +54,15 @@ func (self *Drive) Update(args UpdateArgs) error {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
// Wrap file in progress reader
- srcReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
+ progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
+
+ // Wrap reader in timeout reader
+ reader, ctx := getTimeoutReaderContext(progressReader)
fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
started := time.Now()
- f, err := self.service.Files.Update(args.Id, dstFile).Fields("id", "name", "size").Media(srcReader, chunkSize).Do()
+ f, err := self.service.Files.Update(args.Id, dstFile).Fields("id", "name", "size").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return fmt.Errorf("Failed to upload file: %s", err)
}
diff --git a/drive/upload.go b/drive/upload.go
index 05f52cd..2b8c7c3 100644
--- a/drive/upload.go
+++ b/drive/upload.go
@@ -158,12 +158,15 @@ func (self *Drive) uploadFile(args UploadArgs) (*drive.File, int64, error) {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
// Wrap file in progress reader
- srcReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
+ progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
+
+ // Wrap reader in timeout reader
+ reader, ctx := getTimeoutReaderContext(progressReader)
fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
started := time.Now()
- f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Media(srcReader, chunkSize).Do()
+ f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return nil, 0, fmt.Errorf("Failed to upload file: %s", err)
}
@@ -205,12 +208,15 @@ func (self *Drive) UploadStream(args UploadStreamArgs) error {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
// Wrap file in progress reader
- srcReader := getProgressReader(args.In, args.Progress, 0)
+ progressReader := getProgressReader(args.In, args.Progress, 0)
+
+ // Wrap reader in timeout reader
+ reader, ctx := getTimeoutReaderContext(progressReader)
fmt.Fprintf(args.Out, "Uploading %s\n", dstFile.Name)
started := time.Now()
- f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "webContentLink").Media(srcReader, chunkSize).Do()
+ f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "webContentLink").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return fmt.Errorf("Failed to upload file: %s", err)
}