Imported Upstream version 2.4.2
[scm/test.git] / test / cmd / lfstest-customadapter.go
1 // +build testtools
2
3 package main
4
5 import (
6         "bufio"
7         "encoding/json"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         "os"
13         "strconv"
14         "strings"
15         "time"
16
17         "github.com/git-lfs/git-lfs/config"
18         "github.com/git-lfs/git-lfs/lfsapi"
19         "github.com/git-lfs/git-lfs/tools"
20 )
21
22 var cfg = config.New()
23
24 // This test custom adapter just acts as a bridge for uploads/downloads
25 // in order to demonstrate & test the custom transfer adapter protocols
26 // All we actually do is relay the requests back to the normal storage URLs
27 // of our test server for simplicity, but this proves the principle
28 func main() {
29         scanner := bufio.NewScanner(os.Stdin)
30         writer := bufio.NewWriter(os.Stdout)
31         errWriter := bufio.NewWriter(os.Stderr)
32         apiClient, err := lfsapi.NewClient(cfg)
33         if err != nil {
34                 writeToStderr("Error creating api client: "+err.Error(), errWriter)
35                 os.Exit(1)
36         }
37
38         for scanner.Scan() {
39                 line := scanner.Text()
40                 var req request
41                 if err := json.Unmarshal([]byte(line), &req); err != nil {
42                         writeToStderr(fmt.Sprintf("Unable to parse request: %v\n", line), errWriter)
43                         continue
44                 }
45
46                 switch req.Event {
47                 case "init":
48                         writeToStderr(fmt.Sprintf("Initialised test custom adapter for %s\n", req.Operation), errWriter)
49                         resp := &initResponse{}
50                         sendResponse(resp, writer, errWriter)
51                 case "download":
52                         writeToStderr(fmt.Sprintf("Received download request for %s\n", req.Oid), errWriter)
53                         performDownload(apiClient, req.Oid, req.Size, req.Action, writer, errWriter)
54                 case "upload":
55                         writeToStderr(fmt.Sprintf("Received upload request for %s\n", req.Oid), errWriter)
56                         performUpload(apiClient, req.Oid, req.Size, req.Action, req.Path, writer, errWriter)
57                 case "terminate":
58                         writeToStderr("Terminating test custom adapter gracefully.\n", errWriter)
59                         break
60                 }
61         }
62
63 }
64
65 func writeToStderr(msg string, errWriter *bufio.Writer) {
66         if !strings.HasSuffix(msg, "\n") {
67                 msg = msg + "\n"
68         }
69         errWriter.WriteString(msg)
70         errWriter.Flush()
71 }
72
73 func sendResponse(r interface{}, writer, errWriter *bufio.Writer) error {
74         b, err := json.Marshal(r)
75         if err != nil {
76                 return err
77         }
78         // Line oriented JSON
79         b = append(b, '\n')
80         _, err = writer.Write(b)
81         if err != nil {
82                 return err
83         }
84         writer.Flush()
85         writeToStderr(fmt.Sprintf("Sent message %v", string(b)), errWriter)
86         return nil
87 }
88
89 func sendTransferError(oid string, code int, message string, writer, errWriter *bufio.Writer) {
90         resp := &transferResponse{"complete", oid, "", &transferError{code, message}}
91         err := sendResponse(resp, writer, errWriter)
92         if err != nil {
93                 writeToStderr(fmt.Sprintf("Unable to send transfer error: %v\n", err), errWriter)
94         }
95 }
96
97 func sendProgress(oid string, bytesSoFar int64, bytesSinceLast int, writer, errWriter *bufio.Writer) {
98         resp := &progressResponse{"progress", oid, bytesSoFar, bytesSinceLast}
99         err := sendResponse(resp, writer, errWriter)
100         if err != nil {
101                 writeToStderr(fmt.Sprintf("Unable to send progress update: %v\n", err), errWriter)
102         }
103 }
104
105 func performDownload(apiClient *lfsapi.Client, oid string, size int64, a *action, writer, errWriter *bufio.Writer) {
106         // We just use the URLs we're given, so we're just a proxy for the direct method
107         // but this is enough to test intermediate custom adapters
108         req, err := http.NewRequest("GET", a.Href, nil)
109         if err != nil {
110                 sendTransferError(oid, 2, err.Error(), writer, errWriter)
111                 return
112         }
113
114         for k := range a.Header {
115                 req.Header.Set(k, a.Header[k])
116         }
117
118         res, err := apiClient.DoWithAuth("origin", req)
119         if err != nil {
120                 sendTransferError(oid, res.StatusCode, err.Error(), writer, errWriter)
121                 return
122         }
123         defer res.Body.Close()
124
125         dlFile, err := ioutil.TempFile("", "lfscustomdl")
126         if err != nil {
127                 sendTransferError(oid, 3, err.Error(), writer, errWriter)
128                 return
129         }
130         defer dlFile.Close()
131         dlfilename := dlFile.Name()
132         // Turn callback into progress messages
133         cb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
134                 sendProgress(oid, readSoFar, readSinceLast, writer, errWriter)
135                 return nil
136         }
137         _, err = tools.CopyWithCallback(dlFile, res.Body, res.ContentLength, cb)
138         if err != nil {
139                 sendTransferError(oid, 4, fmt.Sprintf("cannot write data to tempfile %q: %v", dlfilename, err), writer, errWriter)
140                 os.Remove(dlfilename)
141                 return
142         }
143         if err := dlFile.Close(); err != nil {
144                 sendTransferError(oid, 5, fmt.Sprintf("can't close tempfile %q: %v", dlfilename, err), writer, errWriter)
145                 os.Remove(dlfilename)
146                 return
147         }
148
149         // completed
150         complete := &transferResponse{"complete", oid, dlfilename, nil}
151         err = sendResponse(complete, writer, errWriter)
152         if err != nil {
153                 writeToStderr(fmt.Sprintf("Unable to send completion message: %v\n", err), errWriter)
154         }
155 }
156
157 func performUpload(apiClient *lfsapi.Client, oid string, size int64, a *action, fromPath string, writer, errWriter *bufio.Writer) {
158         // We just use the URLs we're given, so we're just a proxy for the direct method
159         // but this is enough to test intermediate custom adapters
160         req, err := http.NewRequest("PUT", a.Href, nil)
161         if err != nil {
162                 sendTransferError(oid, 2, err.Error(), writer, errWriter)
163                 return
164         }
165
166         for k := range a.Header {
167                 req.Header.Set(k, a.Header[k])
168         }
169
170         if len(req.Header.Get("Content-Type")) == 0 {
171                 req.Header.Set("Content-Type", "application/octet-stream")
172         }
173
174         if req.Header.Get("Transfer-Encoding") == "chunked" {
175                 req.TransferEncoding = []string{"chunked"}
176         } else {
177                 req.Header.Set("Content-Length", strconv.FormatInt(size, 10))
178         }
179
180         req.ContentLength = size
181
182         f, err := os.OpenFile(fromPath, os.O_RDONLY, 0644)
183         if err != nil {
184                 sendTransferError(oid, 3, fmt.Sprintf("Cannot read data from %q: %v", fromPath, err), writer, errWriter)
185                 return
186         }
187         defer f.Close()
188
189         // Turn callback into progress messages
190         cb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
191                 sendProgress(oid, readSoFar, readSinceLast, writer, errWriter)
192                 return nil
193         }
194         req.Body = tools.NewBodyWithCallback(f, size, cb)
195
196         res, err := apiClient.DoWithAuth("origin", req)
197         if err != nil {
198                 sendTransferError(oid, res.StatusCode, fmt.Sprintf("Error uploading data for %s: %v", oid, err), writer, errWriter)
199                 return
200         }
201
202         if res.StatusCode > 299 {
203                 msg := fmt.Sprintf("Invalid status for %s %s: %d",
204                         req.Method, strings.SplitN(req.URL.String(), "?", 2)[0], res.StatusCode)
205                 sendTransferError(oid, res.StatusCode, msg, writer, errWriter)
206                 return
207         }
208
209         io.Copy(ioutil.Discard, res.Body)
210         res.Body.Close()
211
212         // completed
213         complete := &transferResponse{"complete", oid, "", nil}
214         err = sendResponse(complete, writer, errWriter)
215         if err != nil {
216                 writeToStderr(fmt.Sprintf("Unable to send completion message: %v\n", err), errWriter)
217         }
218
219 }
220
221 // Structs reimplemented so closer to a real external implementation
222 type header struct {
223         Key   string `json:"key"`
224         Value string `json:"value"`
225 }
226 type action struct {
227         Href      string            `json:"href"`
228         Header    map[string]string `json:"header,omitempty"`
229         ExpiresAt time.Time         `json:"expires_at,omitempty"`
230 }
231 type transferError struct {
232         Code    int    `json:"code"`
233         Message string `json:"message"`
234 }
235
236 // Combined request struct which can accept anything
237 type request struct {
238         Event               string  `json:"event"`
239         Operation           string  `json:"operation"`
240         Concurrent          bool    `json:"concurrent"`
241         ConcurrentTransfers int     `json:"concurrenttransfers"`
242         Oid                 string  `json:"oid"`
243         Size                int64   `json:"size"`
244         Path                string  `json:"path"`
245         Action              *action `json:"action"`
246 }
247
248 type initResponse struct {
249         Error *transferError `json:"error,omitempty"`
250 }
251 type transferResponse struct {
252         Event string         `json:"event"`
253         Oid   string         `json:"oid"`
254         Path  string         `json:"path,omitempty"` // always blank for upload
255         Error *transferError `json:"error,omitempty"`
256 }
257 type progressResponse struct {
258         Event          string `json:"event"`
259         Oid            string `json:"oid"`
260         BytesSoFar     int64  `json:"bytesSoFar"`
261         BytesSinceLast int    `json:"bytesSinceLast"`
262 }