// Copyright 2012 SocialCode. All rights reserved. // Use of this source code is governed by the MIT // license that can be found in the LICENSE file. package gelf import ( "bytes" "compress/gzip" "compress/zlib" "encoding/json" "fmt" "io" "net" "strings" "sync" ) type Reader struct { mu sync.Mutex conn net.Conn } func NewReader(addr string) (*Reader, error) { var err error udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err) } conn, err := net.ListenUDP("udp", udpAddr) if err != nil { return nil, fmt.Errorf("ListenUDP: %s", err) } r := new(Reader) r.conn = conn return r, nil } func (r *Reader) Addr() string { return r.conn.LocalAddr().String() } // FIXME: this will discard data if p isn't big enough to hold the // full message. func (r *Reader) Read(p []byte) (int, error) { msg, err := r.ReadMessage() if err != nil { return -1, err } var data string if msg.Full == "" { data = msg.Short } else { data = msg.Full } return strings.NewReader(data).Read(p) } func (r *Reader) ReadMessage() (*Message, error) { cBuf := make([]byte, ChunkSize) var ( err error n, length int cid, ocid []byte seq, total uint8 cHead []byte cReader io.Reader chunks [][]byte ) for got := 0; got < 128 && (total == 0 || got < int(total)); got++ { if n, err = r.conn.Read(cBuf); err != nil { return nil, fmt.Errorf("Read: %s", err) } cHead, cBuf = cBuf[:2], cBuf[:n] if bytes.Equal(cHead, magicChunked) { //fmt.Printf("chunked %v\n", cBuf[:14]) cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1] if ocid != nil && !bytes.Equal(cid, ocid) { return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid) } else if ocid == nil { ocid = cid chunks = make([][]byte, total) } n = len(cBuf) - chunkedHeaderLen //fmt.Printf("setting chunks[%d]: %d\n", seq, n) chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...) length += n } else { //not chunked if total > 0 { return nil, fmt.Errorf("out-of-band message (not chunked)") } break } } //fmt.Printf("\nchunks: %v\n", chunks) if length > 0 { if cap(cBuf) < length { cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...) } cBuf = cBuf[:0] for i := range chunks { //fmt.Printf("appending %d %v\n", i, chunks[i]) cBuf = append(cBuf, chunks[i]...) } cHead = cBuf[:2] } // the data we get from the wire is compressed if bytes.Equal(cHead, magicGzip) { cReader, err = gzip.NewReader(bytes.NewReader(cBuf)) } else if cHead[0] == magicZlib[0] && (int(cHead[0])*256+int(cHead[1]))%31 == 0 { // zlib is slightly more complicated, but correct cReader, err = zlib.NewReader(bytes.NewReader(cBuf)) } else { // compliance with https://github.com/Graylog2/graylog2-server // treating all messages as uncompressed if they are not gzip, zlib or // chunked cReader = bytes.NewReader(cBuf) } if err != nil { return nil, fmt.Errorf("NewReader: %s", err) } msg := new(Message) if err := json.NewDecoder(cReader).Decode(&msg); err != nil { return nil, fmt.Errorf("json.Unmarshal: %s", err) } return msg, nil }