/
api.go
112 lines (100 loc) · 2.88 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package socket
import (
"strings"
"sync"
_ "unsafe"
tp "github.com/henrylee2cn/teleport"
"github.com/xiaoenai/tp-micro/gateway/logic"
"github.com/xiaoenai/tp-micro/gateway/logic/hosts"
"github.com/xiaoenai/tp-micro/gateway/types"
)
// gw long connection controller.
type gw struct {
tp.PullCtx
}
// Hosts returns the gateway seriver hosts.
func (g *gw) Hosts(*struct{}) (*types.GwHosts, *tp.Rerror) {
return hosts.GwHosts(), nil
}
// totalConn returns the long connections total.
//go:linkname totalConn github.com/xiaoenai/tp-micro/gateway.TotalConn
//go:nosplit
func totalConn() int32 {
if outerPeer == nil {
return 0
}
return int32(outerPeer.CountSession())
}
// SocketTotal returns the long connections total.
func (g *gw) SocketTotal(*types.SocketTotalArgs) (*types.SocketTotalReply, *tp.Rerror) {
return &types.SocketTotalReply{ConnTotal: totalConn()}, nil
}
// innerPush pushes the message to the specified user.
func innerPush(uid string, uri string, args interface{}, bodyCodec byte) *tp.Rerror {
sess, rerr := logic.SocketHooks().GetSession(outerPeer, uid)
if rerr != nil {
return rerr
}
return sess.Push(uri, args, tp.WithBodyCodec(bodyCodec))
}
var socketPushReply = new(types.SocketPushReply)
// SocketPush pushes message to the specified user.
func (g *gw) SocketPush(args *types.SocketPushArgs) (*types.SocketPushReply, *tp.Rerror) {
rerr := innerPush(args.SessionId, args.Uri, args.Body, byte(args.BodyCodec))
if rerr != nil {
return nil, rerr
}
return socketPushReply, nil
}
// SocketMpush multi-push messages to the specified users.
func (g *gw) SocketMpush(args *types.SocketMpushArgs) (*types.SocketMpushReply, *tp.Rerror) {
var (
uri string
wg sync.WaitGroup
sep = "?"
failureSessionIds = make([]string, 0, len(args.Target))
lock sync.Mutex
body = args.Body
bodyCodec = byte(args.BodyCodec)
)
if strings.Contains(args.Uri, "?") {
sep = "&"
}
wg.Add(len(args.Target))
for _, t := range args.Target {
if t.AdditionalQuery != "" {
uri = args.Uri + sep + t.AdditionalQuery
} else {
uri = args.Uri
}
tp.TryGo(func() {
defer wg.Done()
rerr := innerPush(t.SessionId, uri, body, bodyCodec)
if rerr != nil {
lock.Lock()
failureSessionIds = append(failureSessionIds, t.SessionId)
lock.Unlock()
tp.Tracef("SocketMpush: %s", rerr.String())
}
})
}
wg.Wait()
return &types.SocketMpushReply{
FailureSessionIds: failureSessionIds,
}, nil
}
// Kick kicks the uid offline.
func Kick(uid string) (existed bool, err error) {
sess, existed := outerPeer.GetSession(uid)
if existed {
err = sess.Close()
}
return existed, err
}
// SocketKick kicks the uid offline.
func (g *gw) SocketKick(args *types.SocketKickArgs) (*types.SocketKickReply, *tp.Rerror) {
existed, _ := Kick(args.SessionId)
return &types.SocketKickReply{
Existed: existed,
}, nil
}