Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / Graylog2 / go-gelf / gelf / reader.go
1 // Copyright 2012 SocialCode. All rights reserved.
2 // Use of this source code is governed by the MIT
3 // license that can be found in the LICENSE file.
4
5 package gelf
6
7 import (
8         "bytes"
9         "compress/gzip"
10         "compress/zlib"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net"
15         "strings"
16         "sync"
17 )
18
19 type Reader struct {
20         mu   sync.Mutex
21         conn net.Conn
22 }
23
24 func NewReader(addr string) (*Reader, error) {
25         var err error
26         udpAddr, err := net.ResolveUDPAddr("udp", addr)
27         if err != nil {
28                 return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
29         }
30
31         conn, err := net.ListenUDP("udp", udpAddr)
32         if err != nil {
33                 return nil, fmt.Errorf("ListenUDP: %s", err)
34         }
35
36         r := new(Reader)
37         r.conn = conn
38         return r, nil
39 }
40
41 func (r *Reader) Addr() string {
42         return r.conn.LocalAddr().String()
43 }
44
45 // FIXME: this will discard data if p isn't big enough to hold the
46 // full message.
47 func (r *Reader) Read(p []byte) (int, error) {
48         msg, err := r.ReadMessage()
49         if err != nil {
50                 return -1, err
51         }
52
53         var data string
54
55         if msg.Full == "" {
56                 data = msg.Short
57         } else {
58                 data = msg.Full
59         }
60
61         return strings.NewReader(data).Read(p)
62 }
63
64 func (r *Reader) ReadMessage() (*Message, error) {
65         cBuf := make([]byte, ChunkSize)
66         var (
67                 err        error
68                 n, length  int
69                 cid, ocid  []byte
70                 seq, total uint8
71                 cHead      []byte
72                 cReader    io.Reader
73                 chunks     [][]byte
74         )
75
76         for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
77                 if n, err = r.conn.Read(cBuf); err != nil {
78                         return nil, fmt.Errorf("Read: %s", err)
79                 }
80                 cHead, cBuf = cBuf[:2], cBuf[:n]
81
82                 if bytes.Equal(cHead, magicChunked) {
83                         //fmt.Printf("chunked %v\n", cBuf[:14])
84                         cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
85                         if ocid != nil && !bytes.Equal(cid, ocid) {
86                                 return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
87                         } else if ocid == nil {
88                                 ocid = cid
89                                 chunks = make([][]byte, total)
90                         }
91                         n = len(cBuf) - chunkedHeaderLen
92                         //fmt.Printf("setting chunks[%d]: %d\n", seq, n)
93                         chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
94                         length += n
95                 } else { //not chunked
96                         if total > 0 {
97                                 return nil, fmt.Errorf("out-of-band message (not chunked)")
98                         }
99                         break
100                 }
101         }
102         //fmt.Printf("\nchunks: %v\n", chunks)
103
104         if length > 0 {
105                 if cap(cBuf) < length {
106                         cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
107                 }
108                 cBuf = cBuf[:0]
109                 for i := range chunks {
110                         //fmt.Printf("appending %d %v\n", i, chunks[i])
111                         cBuf = append(cBuf, chunks[i]...)
112                 }
113                 cHead = cBuf[:2]
114         }
115
116         // the data we get from the wire is compressed
117         if bytes.Equal(cHead, magicGzip) {
118                 cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
119         } else if cHead[0] == magicZlib[0] &&
120                 (int(cHead[0])*256+int(cHead[1]))%31 == 0 {
121                 // zlib is slightly more complicated, but correct
122                 cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
123         } else {
124                 // compliance with https://github.com/Graylog2/graylog2-server
125                 // treating all messages as uncompressed if  they are not gzip, zlib or
126                 // chunked
127                 cReader = bytes.NewReader(cBuf)
128         }
129
130         if err != nil {
131                 return nil, fmt.Errorf("NewReader: %s", err)
132         }
133
134         msg := new(Message)
135         if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
136                 return nil, fmt.Errorf("json.Unmarshal: %s", err)
137         }
138
139         return msg, nil
140 }