Cleanup UdpDataSource.

This commit is contained in:
Oliver Woodman 2015-07-21 17:34:37 +01:00
parent a2f10399e7
commit ed2b65654c

View File

@ -44,11 +44,12 @@ public class UdpDataSource implements UriDataSource {
} }
/**
* The default maximum datagram packet size, in bytes.
*/
public static final int DEFAULT_MAX_PACKET_SIZE = 2000; public static final int DEFAULT_MAX_PACKET_SIZE = 2000;
public static final int TRANSFER_LISTENER_PACKET_INTERVAL = 1000; private final TransferListener listener;
private final TransferListener transferListener;
private final DatagramPacket packet; private final DatagramPacket packet;
private DataSpec dataSpec; private DataSpec dataSpec;
@ -58,17 +59,22 @@ public class UdpDataSource implements UriDataSource {
private InetSocketAddress socketAddress; private InetSocketAddress socketAddress;
private boolean opened; private boolean opened;
private int packetsReceived;
private byte[] packetBuffer; private byte[] packetBuffer;
private int packetRemaining; private int packetRemaining;
public UdpDataSource(TransferListener transferListener) { /**
this(transferListener, DEFAULT_MAX_PACKET_SIZE); * @param listener An optional listener.
*/
public UdpDataSource(TransferListener listener) {
this(listener, DEFAULT_MAX_PACKET_SIZE);
} }
public UdpDataSource(TransferListener transferListener, int maxPacketSize) { /**
this.transferListener = transferListener; * @param listener An optional listener.
* @param maxPacketSize The maximum datagram packet size, in bytes.
*/
public UdpDataSource(TransferListener listener, int maxPacketSize) {
this.listener = listener;
packetBuffer = new byte[maxPacketSize]; packetBuffer = new byte[maxPacketSize];
packet = new DatagramPacket(packetBuffer, 0, maxPacketSize); packet = new DatagramPacket(packetBuffer, 0, maxPacketSize);
} }
@ -83,7 +89,6 @@ public class UdpDataSource implements UriDataSource {
try { try {
address = InetAddress.getByName(host); address = InetAddress.getByName(host);
socketAddress = new InetSocketAddress(address, port); socketAddress = new InetSocketAddress(address, port);
if (address.isMulticastAddress()) { if (address.isMulticastAddress()) {
multicastSocket = new MulticastSocket(socketAddress); multicastSocket = new MulticastSocket(socketAddress);
multicastSocket.joinGroup(address); multicastSocket.joinGroup(address);
@ -96,17 +101,42 @@ public class UdpDataSource implements UriDataSource {
} }
opened = true; opened = true;
transferListener.onTransferStart(); if (listener != null) {
listener.onTransferStart();
}
return C.LENGTH_UNBOUNDED; return C.LENGTH_UNBOUNDED;
} }
@Override
public int read(byte[] buffer, int offset, int readLength) throws UdpDataSourceException {
if (packetRemaining == 0) {
// We've read all of the data from the current packet. Get another.
try {
socket.receive(packet);
} catch (IOException e) {
throw new UdpDataSourceException(e);
}
packetRemaining = packet.getLength();
if (listener != null) {
listener.onBytesTransferred(packetRemaining);
}
}
int packetOffset = packet.getLength() - packetRemaining;
int bytesToRead = Math.min(packetRemaining, readLength);
System.arraycopy(packetBuffer, packetOffset, buffer, offset, bytesToRead);
packetRemaining -= bytesToRead;
return bytesToRead;
}
@Override @Override
public void close() { public void close() {
if (multicastSocket != null) { if (multicastSocket != null) {
try { try {
multicastSocket.leaveGroup(address); multicastSocket.leaveGroup(address);
} catch (IOException e) { } catch (IOException e) {
// Do nothing // Do nothing.
} }
multicastSocket = null; multicastSocket = null;
} }
@ -117,44 +147,12 @@ public class UdpDataSource implements UriDataSource {
address = null; address = null;
socketAddress = null; socketAddress = null;
packetRemaining = 0; packetRemaining = 0;
packetsReceived = 0;
if (opened) { if (opened) {
opened = false; opened = false;
transferListener.onTransferEnd(); if (listener != null) {
} listener.onTransferEnd();
}
@Override
public int read(byte[] buffer, int offset, int readLength) throws UdpDataSourceException {
// if we've read all the data, get another packet
if (packetRemaining == 0) {
if (packetsReceived == TRANSFER_LISTENER_PACKET_INTERVAL) {
transferListener.onTransferEnd();
transferListener.onTransferStart();
packetsReceived = 0;
} }
try {
socket.receive(packet);
} catch (IOException e) {
throw new UdpDataSourceException(e);
}
packetRemaining = packet.getLength();
transferListener.onBytesTransferred(packetRemaining);
packetsReceived++;
} }
// don't try to read too much
if (packetRemaining < readLength) {
readLength = packetRemaining;
}
int packetOffset = packet.getLength() - packetRemaining;
System.arraycopy(packetBuffer, packetOffset, buffer, offset, readLength);
packetRemaining -= readLength;
return readLength;
} }
@Override @Override