Skip to content

Commit

Permalink
0.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Immueggpain authored and Immueggpain committed Dec 28, 2019
1 parent 3697ffa commit 048e165
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.github.immueggpain</groupId>
<artifactId>simple-streaming</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
version = Launcher.VERSTR, subcommands = { HelpCommand.class, StreamServer.class, Serve.class })
public class Launcher implements Callable<Void> {

public static final String VERSTR = "0.0.4";
public static final String VERSTR = "0.0.5";
public static final int LOCAL_PORT = 2233;
public static final int LOCAL_OVPN_PORT = 1194;
public static final int BUFLEN = 1024 * 16;

public static void main(String[] args) {
int exitCode = new CommandLine(new Launcher()).setCaseInsensitiveEnumValuesAllowed(true).execute(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private void accept_thread(int listen_port) {
private void send_thread(Socket socket) {
try {
OutputStream os = socket.getOutputStream();
byte[] buf = new byte[1024 * 8];
byte[] buf = new byte[Launcher.BUFLEN];

RandomAccessFile file = new RandomAccessFile(filepath, "r");
long length = file.length();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

@Command(description = "Start BMP client", name = "client", mixinStandardHelpOptions = true, version = Launcher.VERSTR)
public class BMPPeer implements Callable<Void> {
@Command(description = "Start uploader", name = "upload", mixinStandardHelpOptions = true, version = Launcher.VERSTR)
public class StreamDownloader implements Callable<Void> {

@Option(names = { "-p", "--port" }, required = true, description = "server's port")
public int serverPort;
Expand Down
183 changes: 118 additions & 65 deletions src/main/java/com/github/immueggpain/simplestreaming/StreamServer.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.github.immueggpain.simplestreaming;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
Expand All @@ -17,106 +17,159 @@
version = Launcher.VERSTR)
public class StreamServer implements Callable<Void> {

@Option(names = { "-p", "--port" }, required = true, description = "listening port")
public int serverPort;
@Option(names = { "-u", "--upload-port" }, required = true, description = "upload listening port")
public int uploadPort;

private static class Player {
/** time of the last packet received from this player */
@Option(names = { "-d", "--download-port" }, required = true, description = "download listening port")
public int downloadPort;

private static class Downloader {
/** time of the last packet send to this downloader */
public long t;
public InetSocketAddress saddr;
public long pktCount = 0;
public byte[] buf;
public volatile int buflen;
}

private HashMap<InetSocketAddress, Player> activePlayers = new HashMap<>();
private DatagramSocket socket;
private HashMap<Socket, Downloader> activeDownloaders = new HashMap<>();

@Override
public Void call() throws Exception {
Thread recvThread = Util.execAsync("recv_thread", () -> recv_thread(serverPort));
Thread uploadThread = Util.execAsync("upload_thread", () -> upload_thread());
Thread downloadThread = Util.execAsync("download_thread", () -> download_thread());
Thread removeExpiredPlayerThread = Util.execAsync("remove_expired_player_thread",
() -> remove_expired_player_thread());

recvThread.join();
uploadThread.join();
downloadThread.join();
removeExpiredPlayerThread.join();
return null;
}

private void recv_thread(int listen_port) {
private void download_thread() {
ServerSocket serverSocket = null;
try {
// setup sockets
InetAddress allbind_addr = InetAddress.getByName("0.0.0.0");
socket = new DatagramSocket(listen_port, allbind_addr);
serverSocket = new ServerSocket(downloadPort);

while (true) {
Socket socket = serverSocket.accept();
System.out.println("new downloader");
Util.execAsync("download_thread", () -> download_thread(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
if (serverSocket != null)
try {
serverSocket.close();
} catch (IOException e) {
}
}

private void download_thread(Socket socket) {
try (Socket socket_ = socket) {
OutputStream os = socket.getOutputStream();
byte[] buf = new byte[Launcher.BUFLEN];

// add me to active downloaders
Downloader me = new Downloader();
me.t = System.currentTimeMillis();
me.buf = buf;
me.buflen = 0;
synchronized (activeDownloaders) {
activeDownloaders.put(socket, me);
}

while (true) {
// wait for buf
if (me.buflen == 0) {
Thread.sleep(200);
continue;
}

// buf ok, send it
os.write(me.buf, 0, me.buflen);

me.t = System.currentTimeMillis();
me.buflen = 0;
}
} catch (Exception e) {
e.printStackTrace();
}
}

// setup packet
byte[] recvBuf = new byte[4096];
DatagramPacket p = new DatagramPacket(recvBuf, recvBuf.length);
private void upload_thread() {
ServerSocket serverSocket = null;
try {
// setup sockets
serverSocket = new ServerSocket(uploadPort);

// recv loop
while (true) {
p.setData(recvBuf);
socket.receive(p);
InetSocketAddress saddr = (InetSocketAddress) p.getSocketAddress();
updatePlayerInfo(saddr, System.currentTimeMillis());
broadcastPacket(saddr, p);
Socket socket = serverSocket.accept();
System.out.println("new uploader");
Thread uploadThread = Util.execAsync("upload_thread", () -> upload_thread(socket));
uploadThread.join();
}
} catch (Exception e) {
e.printStackTrace();
}
if (serverSocket != null)
try {
serverSocket.close();
} catch (IOException e) {
}
}

private void upload_thread(Socket socket) {
try (Socket socket_ = socket) {
InputStream is = socket.getInputStream();
byte[] buf = new byte[Launcher.BUFLEN];

int len = is.read(buf);
if (len == -1)
return;

synchronized (activeDownloaders) {
for (Downloader downloader : activeDownloaders.values()) {
if (downloader.buflen == 0) {
System.arraycopy(buf, 0, downloader.buf, 0, len);
downloader.buflen = len;
}
}
}

socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}

/** daemon cleaning expired players */
private void remove_expired_player_thread() {
while (true) {
synchronized (activePlayers) {
synchronized (activeDownloaders) {
long now = System.currentTimeMillis();
System.out.println("==player check==" + now);
for (Iterator<Entry<InetSocketAddress, Player>> iterator = activePlayers.entrySet().iterator(); iterator
for (Iterator<Entry<Socket, Downloader>> iterator = activeDownloaders.entrySet().iterator(); iterator
.hasNext();) {
Entry<InetSocketAddress, Player> entry = iterator.next();
Player playerInfo = entry.getValue();
Entry<Socket, Downloader> entry = iterator.next();
Socket key = entry.getKey();
Downloader playerInfo = entry.getValue();
long last = playerInfo.t;
if (now - last > 60000) {
System.out.println(String.format("dead player: %s, %d", playerInfo.saddr, playerInfo.pktCount));
if (playerInfo.buflen != 0 && now - last > 20000) {
System.out.println(String.format("dead player: %s", key.getRemoteSocketAddress()));
iterator.remove();
try {
key.close();
} catch (IOException e) {
}
} else {
System.out
.println(String.format("active player: %s, %d", playerInfo.saddr, playerInfo.pktCount));
System.out.println(String.format("active player: %s", key.getRemoteSocketAddress()));
}
}
}
Util.sleep(5000);
}
}

private void updatePlayerInfo(InetSocketAddress saddr, long t) {
synchronized (activePlayers) {
Player player = activePlayers.get(saddr);
if (player == null) {
player = new Player();
player.saddr = saddr;
activePlayers.put(saddr, player);
}
player.t = t;
player.pktCount++;
}
}

private void broadcastPacket(InetSocketAddress source, DatagramPacket p) {
synchronized (activePlayers) {
for (Entry<InetSocketAddress, Player> entry : activePlayers.entrySet()) {
InetSocketAddress dest = entry.getValue().saddr;

if (dest.equals(source))
continue;

p.setSocketAddress(dest);
try {
socket.send(p);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.github.immueggpain.simplestreaming;

import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.util.concurrent.Callable;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

@Command(description = "Start uploader", name = "upload", mixinStandardHelpOptions = true, version = Launcher.VERSTR)
public class StreamUpload implements Callable<Void> {

@Option(names = { "-p", "--port" }, required = true, description = "server's upload port")
public int serverPort;
@Option(names = { "-s", "--server" }, required = true, description = "server's name(ip or domain)")
public String serverName;
@Option(names = { "-f", "--file" }, required = true, description = "path to ts file")
public String filepath;

@Override
public Void call() throws Exception {
try {
Socket socket = new Socket(serverName, serverPort);

OutputStream os = socket.getOutputStream();
byte[] buf = new byte[Launcher.BUFLEN];

RandomAccessFile file = new RandomAccessFile(filepath, "r");
file.seek(file.length());

try {
while (true) {
long pos = copyLarge(file, os, buf);
// to EOF, check if file is overwriten
if (file.length() < pos) {
file.close();
file = new RandomAccessFile(filepath, "r");
file.seek(file.length());
}
}
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("close socket");
file.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

private static long copyLarge(final RandomAccessFile input, final OutputStream output, final byte[] buffer)
throws IOException {
long count = 0;
int n;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return input.getFilePointer();
}

}

0 comments on commit 048e165

Please sign in to comment.