-
Notifications
You must be signed in to change notification settings - Fork 54
/
api.go
107 lines (96 loc) · 2.63 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package websocket
import (
"strings"
"sync"
_ "unsafe"
tp "github.com/henrylee2cn/teleport"
"github.com/xiaoenai/tp-micro/gateway/logic"
"github.com/xiaoenai/tp-micro/gateway/types"
)
// Gw long connection controller.
type Gw struct {
tp.CallCtx
}
// totalConn returns the long connections total.
//go:linkname totalConn github.com/xiaoenai/tp-micro/gateway.TotalConn
//go:nosplit
func wsTotalConn() int32 {
if outerPeer == nil {
return 0
}
return int32(outerPeer.CountSession())
}
// WsTotal returns the long connections total.
func (g *Gw) WsTotal(*types.WsTotalArgs) (*types.WsTotalReply, *tp.Status) {
return &types.WsTotalReply{ConnTotal: wsTotalConn()}, nil
}
// innerPush pushes the message to the specified user.
func innerPush(uid string, uri string, args interface{}, bodyCodec byte) *tp.Status {
sess, stat := logic.WebSocketHooks().GetSession(outerPeer, uid)
if stat != nil {
return stat
}
return sess.Push(uri, args, tp.WithBodyCodec(bodyCodec))
}
var wsPushReply = new(types.WsPushReply)
// WsPush pushes message to the specified user.
func (g *Gw) WsPush(args *types.WsPushArgs) (*types.WsPushReply, *tp.Status) {
stat := innerPush(args.SessionId, args.Uri, args.Body, byte(args.BodyCodec))
if stat != nil {
return nil, stat
}
return wsPushReply, nil
}
// WsMpush multi-push messages to the specified users.
func (g *Gw) WsMpush(args *types.WsMpushArgs) (*types.WsMpushReply, *tp.Status) {
var (
wg sync.WaitGroup
sep = "?"
failureSessionIds = make([]string, 0, len(args.Target))
lock sync.Mutex
body = args.Body
bodyCodec = byte(args.BodyCodec)
)
if strings.Contains(args.Uri, "?") {
sep = "&"
}
wg.Add(len(args.Target))
for _, t := range args.Target {
var uri string
if t.AdditionalQuery != "" {
uri = args.Uri + sep + t.AdditionalQuery
} else {
uri = args.Uri
}
sessId := t.SessionId
tp.TryGo(func() {
defer wg.Done()
stat := innerPush(sessId, uri, body, bodyCodec)
if stat != nil {
lock.Lock()
failureSessionIds = append(failureSessionIds, sessId)
lock.Unlock()
tp.Tracef("SocketMpush: %s", stat.String())
}
})
}
wg.Wait()
return &types.WsMpushReply{
FailureSessionIds: failureSessionIds,
}, nil
}
// Kick kicks the uid offline.
func Kick(uid string) (existed bool, err error) {
sess, existed := outerPeer.GetSession(uid)
if existed {
err = sess.Close()
}
return existed, err
}
// SocketKick kicks the uid offline.
func (g *Gw) WsKick(args *types.SocketKickArgs) (*types.SocketKickReply, *tp.Status) {
existed, _ := Kick(args.SessionId)
return &types.SocketKickReply{
Existed: existed,
}, nil
}