Skip to content

Commit

Permalink
Merge pull request #1558 from xushiwei/rpc
Browse files Browse the repository at this point in the history
jsonrpc: stdio
  • Loading branch information
xushiwei committed Nov 26, 2023
2 parents 87b526f + 5eb6b9d commit 911c819
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 306 deletions.
129 changes: 129 additions & 0 deletions x/fakenet/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package fakenet

import (
"io"
"net"
"sync"
"time"
)

// NewConn returns a net.Conn built on top of the supplied reader and writer.
// It decouples the read and write on the conn from the underlying stream
// to enable Close to abort ones that are in progress.
// It's primary use is to fake a network connection from stdin and stdout.
func NewConn(name string, in io.ReadCloser, out io.WriteCloser) net.Conn {
c := &fakeConn{
name: name,
reader: newFeeder(in.Read),
writer: newFeeder(out.Write),
in: in,
out: out,
}
go c.reader.run()
go c.writer.run()
return c
}

type fakeConn struct {
name string
reader *connFeeder
writer *connFeeder
in io.ReadCloser
out io.WriteCloser
}

type fakeAddr string

// connFeeder serializes calls to the source function (io.Reader.Read or
// io.Writer.Write) by delegating them to a channel. This also allows calls to
// be intercepted when the connection is closed, and cancelled early if the
// connection is closed while the calls are still outstanding.
type connFeeder struct {
source func([]byte) (int, error)
input chan []byte
result chan feedResult
mu sync.Mutex
closed bool
done chan struct{}
}

type feedResult struct {
n int
err error
}

func (c *fakeConn) Close() error {
c.reader.close()
c.writer.close()
c.in.Close()
c.out.Close()
return nil
}

func (c *fakeConn) Read(b []byte) (n int, err error) { return c.reader.do(b) }
func (c *fakeConn) Write(b []byte) (n int, err error) { return c.writer.do(b) }
func (c *fakeConn) LocalAddr() net.Addr { return fakeAddr(c.name) }
func (c *fakeConn) RemoteAddr() net.Addr { return fakeAddr(c.name) }
func (c *fakeConn) SetDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetReadDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetWriteDeadline(t time.Time) error { return nil }
func (a fakeAddr) Network() string { return "fake" }
func (a fakeAddr) String() string { return string(a) }

func newFeeder(source func([]byte) (int, error)) *connFeeder {
return &connFeeder{
source: source,
input: make(chan []byte),
result: make(chan feedResult),
done: make(chan struct{}),
}
}

func (f *connFeeder) close() {
f.mu.Lock()
if !f.closed {
f.closed = true
close(f.done)
}
f.mu.Unlock()
}

func (f *connFeeder) do(b []byte) (n int, err error) {
// send the request to the worker
select {
case f.input <- b:
case <-f.done:
return 0, io.EOF
}
// get the result from the worker
select {
case r := <-f.result:
return r.n, r.err
case <-f.done:
return 0, io.EOF
}
}

func (f *connFeeder) run() {
var b []byte
for {
// wait for an input request
select {
case b = <-f.input:
case <-f.done:
return
}
// invoke the underlying method
n, err := f.source(b)
// send the result back to the requester
select {
case f.result <- feedResult{n: n, err: err}:
case <-f.done:
return
}
}
}
5 changes: 1 addition & 4 deletions x/jsonrpc2/jsonrpc2test/jsonrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
func TestNetPipe(t *testing.T) {
jsonrpc2.SetDebug(jsonrpc2.DbgFlagCall)
ctx := context.Background()
listener, err := jsonrpc2test.NetPipeListener(ctx)
if err != nil {
t.Fatal(err)
}
listener := jsonrpc2test.NetPipeListener()
cases.Test(t, ctx, listener, jsonrpc2.HeaderFramer(), true)
}
4 changes: 2 additions & 2 deletions x/jsonrpc2/jsonrpc2test/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
// It is only possibly to connect to it using the Dialer returned by the
// Dialer method, each call to that method will generate a new pipe the other
// side of which will be returned from the Accept call.
func NetPipeListener(ctx context.Context) (jsonrpc2.Listener, error) {
func NetPipeListener() jsonrpc2.Listener {
return &netPiper{
done: make(chan struct{}),
dialed: make(chan io.ReadWriteCloser),
}, nil
}
}

// netPiper is the implementation of Listener build on top of net.Pipes.
Expand Down
32 changes: 0 additions & 32 deletions x/jsonrpc2/stdio/jsonrpc2_test.go

This file was deleted.

138 changes: 10 additions & 128 deletions x/jsonrpc2/stdio/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,162 +18,44 @@ package stdio

import (
"context"
"errors"
"io"
"log"
"net"
"os"
"sync/atomic"
"time"

"github.com/goplus/gop/x/fakenet"
"github.com/goplus/gop/x/jsonrpc2"
"github.com/goplus/gop/x/jsonrpc2/jsonrpc2test"
)

// -----------------------------------------------------------------------------

var (
ErrTooManyConnections = errors.New("too many connections")
)

var (
connCnt int32
)

type serverConn struct {
closed bool
}

func (p *serverConn) Read(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
n, err = os.Stdin.Read(b)
for n == 0 { // retry (to support localDailer)
time.Sleep(time.Millisecond)
n, err = os.Stdin.Read(b)
}
return
}

func (p *serverConn) Write(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return os.Stdout.Write(b)
}

func (p *serverConn) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.conn.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
atomic.AddInt32(&connCnt, -1)
return nil
}

// -----------------------------------------------------------------------------

type newConn struct {
in, out *os.File
r, w *os.File
oStdin *os.File
oStdout *os.File
closed bool
}

func (p *newConn) Read(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return p.in.Read(b)
}

func (p *newConn) Write(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return p.out.Write(b)
}

func (p *newConn) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.newConn.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
atomic.AddInt32(&dailCnt, -1)
os.Stdin, os.Stdout = p.oStdin, p.oStdout
p.in.Close()
p.w.Close()
p.r.Close()
p.out.Close()
return nil
}

// -----------------------------------------------------------------------------

type listener struct {
closed bool
localDail bool
}

func (p *listener) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.listener.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
return nil
}

// Accept blocks waiting for an incoming connection to the listener.
func (p *listener) Accept(context.Context) (io.ReadWriteCloser, error) {
if atomic.AddInt32(&connCnt, 1) != 1 {
atomic.AddInt32(&connCnt, -1)
connCnt := &connCnt[server]
if atomic.AddInt32(connCnt, 1) != 1 {
atomic.AddInt32(connCnt, -1)
return nil, ErrTooManyConnections
}
return new(serverConn), nil
}

// Dial returns a new communication byte stream to a listening server.
func (p *listener) Dial(ctx context.Context) (ret io.ReadWriteCloser, err error) {
if atomic.AddInt32(&dailCnt, 1) != 1 {
atomic.AddInt32(&dailCnt, -1)
return nil, ErrTooManyConnections
}
in, w, err := os.Pipe()
if err != nil {
return
}
r, out, err := os.Pipe()
if err != nil {
in.Close()
w.Close()
return
}
oStdin, oStdout := os.Stdin, os.Stdout
os.Stdin, os.Stdout = r, w
return &newConn{in: in, out: out, r: r, w: w, oStdin: oStdin, oStdout: oStdout}, nil
return fakenet.NewConn("stdio", os.Stdin, os.Stdout), nil
}

func (p *listener) Dialer() jsonrpc2.Dialer {
if p.localDail {
return p
}
return nil
}

// Listener returns a jsonrpc2.Listener based on stdin and stdout.
func Listener(allowLocalDail bool) jsonrpc2.Listener {
l := &listener{localDail: allowLocalDail}
return l
if allowLocalDail {
return jsonrpc2test.NetPipeListener()
}
return &listener{}
}

// -----------------------------------------------------------------------------
Loading

0 comments on commit 911c819

Please sign in to comment.