Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use ConcurrentMap to replace map and mutex #187

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/xtaci/kcp-go/v5

require (
github.com/OneOfOne/xxhash v1.2.2
github.com/cespare/xxhash v1.1.0 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/klauspost/reedsolomon v1.9.9
github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
Expand All @@ -7,6 +11,8 @@ github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104 h1:ULR/QWMgcgRiZLU
github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104/go.mod h1:wqKykBG2QzQDJEzvRkcS8x6MiSJkF52hXZsXcjaB3ls=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY=
github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk=
github.com/templexxx/cpu v0.0.7 h1:pUEZn8JBy/w5yzdYWgx+0m0xL9uk6j4K91C5kOViAzo=
Expand Down
106 changes: 106 additions & 0 deletions map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package kcp

import (
"github.com/OneOfOne/xxhash"
"sync"
"sync/atomic"
)

const (
segmentCount = 256
segmentMask = 255
)

type ConcurrentMap struct {
segments [segmentCount]map[string]interface{}
locks [segmentCount]sync.RWMutex
length int64
}

func (m *ConcurrentMap) Put(key string, val interface{}) {
keyHash := xxhash.Checksum32([]byte(key))
segIdx := keyHash & segmentMask
m.locks[segIdx].Lock()
defer m.locks[segIdx].Unlock()
if seg := m.segments[segIdx]; seg != nil {
if _, ok := seg[key]; !ok {
atomic.AddInt64(&m.length, 1)
}
seg[key] = val
} else {
seg = make(map[string]interface{})
seg[key] = val
m.segments[segIdx] = seg
atomic.AddInt64(&m.length, 1)
}
}

func (m *ConcurrentMap) Get(key string) (interface{}, bool) {
keyHash := xxhash.Checksum32([]byte(key))
segIdx := keyHash & segmentMask
m.locks[segIdx].RLock()
defer m.locks[segIdx].RUnlock()
if seg := m.segments[segIdx]; seg == nil {
return nil, false
} else {
val, ok := seg[key]
return val, ok
}
}

func (m *ConcurrentMap) Del(key string) {
keyHash := xxhash.Checksum32([]byte(key))
segIdx := keyHash & segmentMask
m.locks[segIdx].Lock()
defer m.locks[segIdx].Unlock()
if seg := m.segments[segIdx]; seg != nil {
if _, ok := seg[key]; ok {
atomic.AddInt64(&m.length, -1)
}
delete(seg, key)
}
}

type IterFun func(key string, val interface{}) bool

func (m *ConcurrentMap) Iterate(f IterFun) {
for i := 0; i < segmentCount; i++ {
func(i int) {
m.locks[i].RLock()
defer m.locks[i].RUnlock()
tmp := m.segments[i]
for k, v := range tmp {
if !f(k, v) {
break
}
}
}(i)
}
}

func (m *ConcurrentMap) Length() int {
return int(m.length)
}

func (m *ConcurrentMap) Filter(f IterFun) {
for i := 0; i < segmentCount; i++ {
func(i int) {
m.locks[i].Lock()
defer m.locks[i].Unlock()
tmp := m.segments[i]
var deleteKeys []string
for k, v := range tmp {
if !f(k, v) {
deleteKeys = append(deleteKeys, k)
}
}
for _, k := range deleteKeys {
delete(tmp, k)
}
}(i)
}
}

func NewConcurrentMap() *ConcurrentMap {
return &ConcurrentMap{}
}
247 changes: 247 additions & 0 deletions map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package kcp

import (
"math/rand"
"strconv"
"sync"
"testing"
)

func BenchmarkConcurrentMap_Write(b *testing.B) {
dataLen := b.N
concurrency := 8
workLoad := dataLen / concurrency
newMap := NewConcurrentMap()
wait := &sync.WaitGroup{}

b.ResetTimer()
for i := 0; i < concurrency; i++ {
wait.Add(1)
go func(m *ConcurrentMap, start, end int, wait *sync.WaitGroup) {
for i := start; i < end; i++ {
m.Put(strconv.Itoa(i), i)
}
wait.Done()
}(newMap, i*workLoad, (i+1)*workLoad, wait)
}
wait.Wait()
}

func BenchmarkSyncMap_Write(b *testing.B) {
oldMap := &sync.Map{}
dataLen := b.N
concurrency := 8
workLoad := dataLen / concurrency
wait := &sync.WaitGroup{}
b.ResetTimer()
for i := 0; i < concurrency; i++ {
wait.Add(1)
go func(m *sync.Map, start, end int, wait *sync.WaitGroup) {
for i := start; i < end; i++ {
m.Store(strconv.Itoa(i), i)
}
wait.Done()
}(oldMap, i*workLoad, (i+1)*workLoad, wait)
}
wait.Wait()
}

func BenchmarkConcurrentMap_ReadWrite(b *testing.B) {
m := NewConcurrentMap()
dataLen := 65536
for i := 0; i < dataLen; i++ {
m.Put(strconv.Itoa(i), i)
}
concurrency := 8
wait := &sync.WaitGroup{}

b.ResetTimer()
for i := 0; i < concurrency; i++ {
wait.Add(1)
go func(w *sync.WaitGroup, m *ConcurrentMap) {
for j := 0; j < b.N; j++ {
l := rand.Intn(dataLen)
if l%2 == 0 {
m.Get(strconv.Itoa(l))
} else {
m.Put(strconv.Itoa(l), l+1)
}
}
w.Done()
}(wait, m)
}
wait.Wait()
}

func BenchmarkSyncMap_ReadWrite(b *testing.B) {
m := &sync.Map{}
dataLen := 65536
for i := 0; i < dataLen; i++ {
m.Store(strconv.Itoa(i), i)
}
concurrency := 8
wait := &sync.WaitGroup{}

b.ResetTimer()
for i := 0; i < concurrency; i++ {
wait.Add(1)
go func(w *sync.WaitGroup, m *sync.Map) {
for j := 0; j < b.N; j++ {
l := rand.Intn(dataLen)
if l%2 == 0 {
m.Load(strconv.Itoa(l))
} else {
m.Store(strconv.Itoa(l), l+1)
}
}
w.Done()
}(wait, m)
}
wait.Wait()
}

func BenchmarkMapMutex_ReadWrite(b *testing.B) {
m := make(map[string]int)
lock := sync.Mutex{}
dataLen := 65536
for i := 0; i < dataLen; i++ {
m[strconv.Itoa(i)] = i
}
concurrency := 8
data := make([]string, dataLen)
for i := 0; i < dataLen; i++ {
data[i] = strconv.Itoa(i)
}
wait := &sync.WaitGroup{}

b.ResetTimer()
for i := 0; i < concurrency; i++ {
wait.Add(1)
go func(w *sync.WaitGroup, m map[string]int, ) {
for j := 0; j < b.N; j++ {
l := rand.Intn(dataLen)
lock.Lock()
if l%2 == 0 {
_ = m[strconv.Itoa(l)]
} else {
m[strconv.Itoa(l)] = l+1
}
lock.Unlock()
}
w.Done()
}(wait, m)
}
wait.Wait()
}

func TestConcurrentMap_DelAndLength(t *testing.T) {
m := NewConcurrentMap()
w := &sync.WaitGroup{}
l := 1000
c := 8
for i := 0; i < c; i++ {
w.Add(1)
go func(start int, wait *sync.WaitGroup) {
for j := start * l; j < (start+1)*l; j++ {
m.Put(strconv.Itoa(j), j)
}
wait.Done()
}(i, w)
}
w.Wait()

if m.Length() != l*c {
t.Errorf("Length not expected. Length returns %d, expected %d", m.Length(), l*c)
}

for i := 0; i < c; i++ {
w.Add(1)
go func(start int, wait *sync.WaitGroup) {
for j := start * l; j < (start+1)*l; j++ {
m.Del(strconv.Itoa(j))
}
wait.Done()
}(i, w)
}
w.Wait()
if m.Length() != 0 {
t.Errorf("Length not expected. Length returns %d, expected %d", m.Length(), 0)
}
}

func TestConcurrentMap_PutAndGet(t *testing.T) {
m := NewConcurrentMap()
w := &sync.WaitGroup{}
l := 10000
c := 8
workLoad := l / c

for i := 0; i < c; i++ {
w.Add(1)
go func(w *sync.WaitGroup, start, end int) {
for j := start; j < end; j++ {
m.Put(strconv.Itoa(j), j)
}
w.Done()
}(w, i*workLoad, (i+1)*workLoad)
}
w.Wait()

for i := 0; i < c; i++ {
w.Add(1)
go func(w *sync.WaitGroup, start, end int) {
for j := start; j < end; j++ {
tmp, ok := m.Get(strconv.Itoa(j))
if !ok || tmp.(int) != j {
t.Errorf("Get not expexted. Get return: %v, expected:%d", tmp, j)
}
}
w.Done()
}(w, i*workLoad, (i+1)*workLoad)
}
w.Wait()
}

func TestConcurrentMap_Iterate(t *testing.T) {
m := NewConcurrentMap()
l := 10000
type Value struct {
Num int
}
for i := 0; i < l; i++ {
m.Put(strconv.Itoa(i), &Value{Num: i})
}
m.Iterate(func(key string, val interface{}) bool {
val.(*Value).Num++
return true
})
for i := 0; i < l; i++ {
tmp, _ := m.Get(strconv.Itoa(i))
if tmp.(*Value).Num != i+1 {
t.Errorf("Iterate not expected.")
break
}
}
}

func TestConcurrentMap_Filter(t *testing.T) {
m := NewConcurrentMap()
l := 10000
type Value struct {
Num int
}
for i := 0; i < l; i++ {
m.Put(strconv.Itoa(i), &Value{Num: i})
}
m.Filter(func(key string, val interface{}) bool {
return val.(*Value).Num%2 == 0
})
for i := 0; i < l; i++ {
_, ok := m.Get(strconv.Itoa(i))
isEven := i%2 == 0
if (isEven && !ok) || (!isEven && ok) {
t.Errorf("Filter not expected")

}
}
}
Loading