Imported Upstream version 2.5.1
[scm/test.git] / tq / basic_upload.go
1 package tq
2
3 import (
4         "io"
5         "io/ioutil"
6         "net/http"
7         "os"
8         "path/filepath"
9         "strconv"
10         "strings"
11
12         "github.com/git-lfs/git-lfs/config"
13         "github.com/git-lfs/git-lfs/errors"
14         "github.com/git-lfs/git-lfs/lfsapi"
15         "github.com/git-lfs/git-lfs/tools"
16 )
17
18 const (
19         BasicAdapterName   = "basic"
20         defaultContentType = "application/octet-stream"
21 )
22
23 // Adapter for basic uploads (non resumable)
24 type basicUploadAdapter struct {
25         *adapterBase
26 }
27
28 func (a *basicUploadAdapter) ClearTempStorage() error {
29         // Should be empty already but also remove dir
30         return os.RemoveAll(a.tempDir())
31 }
32
33 func (a *basicUploadAdapter) tempDir() string {
34         // Must be dedicated to this adapter as deleted by ClearTempStorage
35         d := filepath.Join(os.TempDir(), "git-lfs-basic-temp")
36         if err := os.MkdirAll(d, 0755); err != nil {
37                 return os.TempDir()
38         }
39         return d
40 }
41
42 func (a *basicUploadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
43         return nil, nil
44 }
45 func (a *basicUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
46 }
47
48 func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
49         rel, err := t.Rel("upload")
50         if err != nil {
51                 return err
52         }
53         if rel == nil {
54                 return errors.Errorf("No upload action for object: %s", t.Oid)
55         }
56
57         req, err := a.newHTTPRequest("PUT", rel)
58         if err != nil {
59                 return err
60         }
61
62         if req.Header.Get("Transfer-Encoding") == "chunked" {
63                 req.TransferEncoding = []string{"chunked"}
64         } else {
65                 req.Header.Set("Content-Length", strconv.FormatInt(t.Size, 10))
66         }
67
68         req.ContentLength = t.Size
69
70         f, err := os.OpenFile(t.Path, os.O_RDONLY, 0644)
71         if err != nil {
72                 return errors.Wrap(err, "basic upload")
73         }
74         defer f.Close()
75
76         if err := a.setContentTypeFor(req, f); err != nil {
77                 return err
78         }
79
80         // Ensure progress callbacks made while uploading
81         // Wrap callback to give name context
82         ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
83                 if cb != nil {
84                         return cb(t.Name, totalSize, readSoFar, readSinceLast)
85                 }
86                 return nil
87         }
88
89         cbr := tools.NewBodyWithCallback(f, t.Size, ccb)
90         var reader lfsapi.ReadSeekCloser = cbr
91
92         // Signal auth was ok on first read; this frees up other workers to start
93         if authOkFunc != nil {
94                 reader = newStartCallbackReader(reader, func() error {
95                         authOkFunc()
96                         return nil
97                 })
98         }
99
100         req.Body = reader
101
102         req = a.apiClient.LogRequest(req, "lfs.data.upload")
103         res, err := a.doHTTP(t, req)
104         if err != nil {
105                 if errors.IsUnprocessableEntityError(err) {
106                         // If we got an HTTP 422, we do _not_ want to retry the
107                         // request later below, because it is likely that the
108                         // implementing server does not support non-standard
109                         // Content-Type headers.
110                         //
111                         // Instead, return immediately and wait for the
112                         // *tq.TransferQueue to report an error message.
113                         return err
114                 }
115
116                 // We're about to return a retriable error, meaning that this
117                 // transfer will either be retried, or it will fail.
118                 //
119                 // Either way, let's decrement the number of bytes that we've
120                 // read _so far_, so that the next iteration doesn't re-transfer
121                 // those bytes, according to the progress meter.
122                 if perr := cbr.ResetProgress(); perr != nil {
123                         err = errors.Wrap(err, perr.Error())
124                 }
125
126                 return errors.NewRetriableError(err)
127         }
128
129         // A status code of 403 likely means that an authentication token for the
130         // upload has expired. This can be safely retried.
131         if res.StatusCode == 403 {
132                 err = errors.New("http: received status 403")
133                 return errors.NewRetriableError(err)
134         }
135
136         if res.StatusCode > 299 {
137                 return errors.Wrapf(nil, "Invalid status for %s %s: %d",
138                         req.Method,
139                         strings.SplitN(req.URL.String(), "?", 2)[0],
140                         res.StatusCode,
141                 )
142         }
143
144         io.Copy(ioutil.Discard, res.Body)
145         res.Body.Close()
146
147         return verifyUpload(a.apiClient, a.remote, t)
148 }
149
150 func (a *adapterBase) setContentTypeFor(req *http.Request, r io.ReadSeeker) error {
151         uc := config.NewURLConfig(a.apiClient.GitEnv())
152         disabled := !uc.Bool("lfs", req.URL.String(), "contenttype", true)
153         if len(req.Header.Get("Content-Type")) != 0 || disabled {
154                 return nil
155         }
156
157         buffer := make([]byte, 512)
158         n, err := r.Read(buffer)
159         if err != nil && err != io.EOF {
160                 return errors.Wrap(err, "content type detect")
161         }
162
163         contentType := http.DetectContentType(buffer[:n])
164         if _, err := r.Seek(0, 0); err != nil {
165                 return errors.Wrap(err, "content type rewind")
166         }
167
168         if contentType == "" {
169                 contentType = defaultContentType
170         }
171
172         req.Header.Set("Content-Type", contentType)
173         return nil
174 }
175
176 // startCallbackReader is a reader wrapper which calls a function as soon as the
177 // first Read() call is made. This callback is only made once
178 type startCallbackReader struct {
179         cb     func() error
180         cbDone bool
181         lfsapi.ReadSeekCloser
182 }
183
184 func (s *startCallbackReader) Read(p []byte) (n int, err error) {
185         if !s.cbDone && s.cb != nil {
186                 if err := s.cb(); err != nil {
187                         return 0, err
188                 }
189                 s.cbDone = true
190         }
191         return s.ReadSeekCloser.Read(p)
192 }
193 func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCallbackReader {
194         return &startCallbackReader{
195                 ReadSeekCloser: r,
196                 cb:             cb,
197         }
198 }
199
200 func configureBasicUploadAdapter(m *Manifest) {
201         m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter {
202                 switch dir {
203                 case Upload:
204                         bu := &basicUploadAdapter{newAdapterBase(m.fs, name, dir, nil)}
205                         // self implements impl
206                         bu.transferImpl = bu
207                         return bu
208                 case Download:
209                         panic("Should never ask this func for basic download")
210                 }
211                 return nil
212         })
213 }