8e7a3d3793b611a09745c28553b517f0df4bd422
[scm/test.git] / tq / basic_upload.go
1 package tq
2
3 import (
4         "io"
5         "io/ioutil"
6         "net/http"
7         "os"
8         "path/filepath"
9         "strconv"
10         "strings"
11
12         "github.com/git-lfs/git-lfs/errors"
13         "github.com/git-lfs/git-lfs/lfsapi"
14         "github.com/git-lfs/git-lfs/tools"
15 )
16
17 const (
18         BasicAdapterName   = "basic"
19         defaultContentType = "application/octet-stream"
20 )
21
22 // Adapter for basic uploads (non resumable)
23 type basicUploadAdapter struct {
24         *adapterBase
25 }
26
27 func (a *basicUploadAdapter) ClearTempStorage() error {
28         // Should be empty already but also remove dir
29         return os.RemoveAll(a.tempDir())
30 }
31
32 func (a *basicUploadAdapter) tempDir() string {
33         // Must be dedicated to this adapter as deleted by ClearTempStorage
34         d := filepath.Join(os.TempDir(), "git-lfs-basic-temp")
35         if err := os.MkdirAll(d, 0755); err != nil {
36                 return os.TempDir()
37         }
38         return d
39 }
40
41 func (a *basicUploadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
42         return nil, nil
43 }
44 func (a *basicUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
45 }
46
47 func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
48         rel, err := t.Rel("upload")
49         if err != nil {
50                 return err
51         }
52         if rel == nil {
53                 return errors.Errorf("No upload action for object: %s", t.Oid)
54         }
55
56         req, err := a.newHTTPRequest("PUT", rel)
57         if err != nil {
58                 return err
59         }
60
61         if req.Header.Get("Transfer-Encoding") == "chunked" {
62                 req.TransferEncoding = []string{"chunked"}
63         } else {
64                 req.Header.Set("Content-Length", strconv.FormatInt(t.Size, 10))
65         }
66
67         req.ContentLength = t.Size
68
69         f, err := os.OpenFile(t.Path, os.O_RDONLY, 0644)
70         if err != nil {
71                 return errors.Wrap(err, "basic upload")
72         }
73         defer f.Close()
74
75         if err := setContentTypeFor(req, f); err != nil {
76                 return err
77         }
78
79         // Ensure progress callbacks made while uploading
80         // Wrap callback to give name context
81         ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
82                 if cb != nil {
83                         return cb(t.Name, totalSize, readSoFar, readSinceLast)
84                 }
85                 return nil
86         }
87
88         cbr := tools.NewBodyWithCallback(f, t.Size, ccb)
89         var reader lfsapi.ReadSeekCloser = cbr
90
91         // Signal auth was ok on first read; this frees up other workers to start
92         if authOkFunc != nil {
93                 reader = newStartCallbackReader(reader, func() error {
94                         authOkFunc()
95                         return nil
96                 })
97         }
98
99         req.Body = reader
100
101         req = a.apiClient.LogRequest(req, "lfs.data.upload")
102         res, err := a.doHTTP(t, req)
103         if err != nil {
104                 // We're about to return a retriable error, meaning that this
105                 // transfer will either be retried, or it will fail.
106                 //
107                 // Either way, let's decrement the number of bytes that we've
108                 // read _so far_, so that the next iteration doesn't re-transfer
109                 // those bytes, according to the progress meter.
110                 if perr := cbr.ResetProgress(); perr != nil {
111                         err = errors.Wrap(err, perr.Error())
112                 }
113
114                 return errors.NewRetriableError(err)
115         }
116
117         // A status code of 403 likely means that an authentication token for the
118         // upload has expired. This can be safely retried.
119         if res.StatusCode == 403 {
120                 err = errors.New("http: received status 403")
121                 return errors.NewRetriableError(err)
122         }
123
124         if res.StatusCode > 299 {
125                 return errors.Wrapf(nil, "Invalid status for %s %s: %d",
126                         req.Method,
127                         strings.SplitN(req.URL.String(), "?", 2)[0],
128                         res.StatusCode,
129                 )
130         }
131
132         io.Copy(ioutil.Discard, res.Body)
133         res.Body.Close()
134
135         return verifyUpload(a.apiClient, a.remote, t)
136 }
137
138 // startCallbackReader is a reader wrapper which calls a function as soon as the
139 // first Read() call is made. This callback is only made once
140 type startCallbackReader struct {
141         cb     func() error
142         cbDone bool
143         lfsapi.ReadSeekCloser
144 }
145
146 func (s *startCallbackReader) Read(p []byte) (n int, err error) {
147         if !s.cbDone && s.cb != nil {
148                 if err := s.cb(); err != nil {
149                         return 0, err
150                 }
151                 s.cbDone = true
152         }
153         return s.ReadSeekCloser.Read(p)
154 }
155 func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCallbackReader {
156         return &startCallbackReader{
157                 ReadSeekCloser: r,
158                 cb:             cb,
159         }
160 }
161
162 func configureBasicUploadAdapter(m *Manifest) {
163         m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter {
164                 switch dir {
165                 case Upload:
166                         bu := &basicUploadAdapter{newAdapterBase(m.fs, name, dir, nil)}
167                         // self implements impl
168                         bu.transferImpl = bu
169                         return bu
170                 case Download:
171                         panic("Should never ask this func for basic download")
172                 }
173                 return nil
174         })
175 }
176
177 func setContentTypeFor(req *http.Request, r io.ReadSeeker) error {
178         if len(req.Header.Get("Content-Type")) != 0 {
179                 return nil
180         }
181
182         buffer := make([]byte, 512)
183         n, err := r.Read(buffer)
184         if err != nil && err != io.EOF {
185                 return errors.Wrap(err, "content type detect")
186         }
187
188         contentType := http.DetectContentType(buffer[:n])
189         if _, err := r.Seek(0, 0); err != nil {
190                 return errors.Wrap(err, "content type rewind")
191         }
192
193         if contentType == "" {
194                 contentType = defaultContentType
195         }
196
197         req.Header.Set("Content-Type", contentType)
198         return nil
199 }