1 // Copyright 2012 SocialCode. All rights reserved.
2 // Use of this source code is governed by the MIT
3 // license that can be found in the LICENSE file.
24 func NewReader(addr string) (*Reader, error) {
26 udpAddr, err := net.ResolveUDPAddr("udp", addr)
28 return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
31 conn, err := net.ListenUDP("udp", udpAddr)
33 return nil, fmt.Errorf("ListenUDP: %s", err)
41 func (r *Reader) Addr() string {
42 return r.conn.LocalAddr().String()
45 // FIXME: this will discard data if p isn't big enough to hold the
47 func (r *Reader) Read(p []byte) (int, error) {
48 msg, err := r.ReadMessage()
61 return strings.NewReader(data).Read(p)
64 func (r *Reader) ReadMessage() (*Message, error) {
65 cBuf := make([]byte, ChunkSize)
76 for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
77 if n, err = r.conn.Read(cBuf); err != nil {
78 return nil, fmt.Errorf("Read: %s", err)
80 cHead, cBuf = cBuf[:2], cBuf[:n]
82 if bytes.Equal(cHead, magicChunked) {
83 //fmt.Printf("chunked %v\n", cBuf[:14])
84 cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
85 if ocid != nil && !bytes.Equal(cid, ocid) {
86 return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
87 } else if ocid == nil {
89 chunks = make([][]byte, total)
91 n = len(cBuf) - chunkedHeaderLen
92 //fmt.Printf("setting chunks[%d]: %d\n", seq, n)
93 chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
95 } else { //not chunked
97 return nil, fmt.Errorf("out-of-band message (not chunked)")
102 //fmt.Printf("\nchunks: %v\n", chunks)
105 if cap(cBuf) < length {
106 cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
109 for i := range chunks {
110 //fmt.Printf("appending %d %v\n", i, chunks[i])
111 cBuf = append(cBuf, chunks[i]...)
116 // the data we get from the wire is compressed
117 if bytes.Equal(cHead, magicGzip) {
118 cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
119 } else if cHead[0] == magicZlib[0] &&
120 (int(cHead[0])*256+int(cHead[1]))%31 == 0 {
121 // zlib is slightly more complicated, but correct
122 cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
124 // compliance with https://github.com/Graylog2/graylog2-server
125 // treating all messages as uncompressed if they are not gzip, zlib or
127 cReader = bytes.NewReader(cBuf)
131 return nil, fmt.Errorf("NewReader: %s", err)
135 if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
136 return nil, fmt.Errorf("json.Unmarshal: %s", err)