/
writer.go
77 lines (71 loc) · 2.02 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package bercon
import (
"sync/atomic"
"time"
"github.com/golang/glog"
)
func (c *Client) writerLoop(disc chan int, cmd chan transmission) {
defer func(disc chan int) { disc <- 1 }(disc)
for {
glog.V(10).Infoln("Looping in writerLoop")
if !c.looping {
glog.V(4).Infoln("WriterLoop ended by watcher. Exiting.")
return
}
if c.con == nil {
glog.Errorln(ErrConnectionNil)
return
}
timeout := time.After(time.Second * time.Duration(c.keepAliveTimer))
select {
case trm := <-cmd:
glog.V(4).Infoln("Preparing Command: ", trm)
err := c.writeCommand(trm)
if err != nil {
glog.Error(err)
return
}
case <-timeout:
if c.con != nil {
glog.V(3).Infof("Sending Keepalive")
c.con.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
_, err := c.con.Write(buildKeepAlivePacket(c.sequence.s))
if err != nil {
glog.Errorln(err)
return
}
keepAliveCount := atomic.AddInt64(&c.keepAliveCount, 1)
pinbackCount := atomic.LoadInt64(&c.pingbackCount)
if diff := keepAliveCount - pinbackCount; diff > c.keepAliveTolerance || diff < c.keepAliveTolerance*-1 {
glog.Errorf("KeepAlive Packets are out of sync by %v", diff)
return
}
// Experimental change to check if growing count is causing performance leak
if keepAliveCount > 20 {
atomic.SwapInt64(&c.keepAliveCount, 0)
atomic.SwapInt64(&c.pingbackCount, 0)
}
}
}
}
}
func (c *Client) writeCommand(trm transmission) error {
c.sequence.Lock()
if c.con != nil {
c.con.SetWriteDeadline(time.Now().Add(time.Second * 2)) //TODO: Evaluate Deadlines
trm.packet = buildCmdPacket(trm.command, c.sequence.s)
glog.V(3).Infof("Sending Packet: %v - Command: %v - Sequence: %v", string(trm.packet), string(trm.command), c.sequence.s)
_, err := c.con.Write(trm.packet)
if err != nil {
c.sequence.Unlock()
return err
}
trm.sequence = c.sequence.s
c.cmdLock.Lock()
c.cmdMap[c.sequence.s] = trm
c.cmdLock.Unlock()
c.sequence.s = c.sequence.s + 1
}
c.sequence.Unlock()
return nil
}