Enable reading interleaved message in RtspMessageChannel.

RTSP interleaving enables RTP packets to be sent using RTSP's TCP connection.
The interleaving RTSP messages contain binary data only and always start with a
'$'. Normal RTSP messages contain line breaks (CRLFs) that indicate complete
lines.

#minor-release

PiperOrigin-RevId: 372990181
This commit is contained in:
claincly 2021-05-10 20:41:58 +01:00 committed by Oliver Woodman
parent 89cfa4df32
commit beeb6e829d
4 changed files with 408 additions and 31 deletions

View File

@ -15,23 +15,31 @@
*/
package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import androidx.annotation.IntDef;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.upstream.Loader;
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.android.exoplayer2.util.Log;
import com.google.android.exoplayer2.util.Util;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
@ -42,7 +50,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/* package */ final class RtspMessageChannel implements Closeable {
private static final String TAG = "RtspMessageChannel";
private static final boolean LOG_RTSP_MESSAGES = false;
private static final boolean LOG_RTSP_MESSAGES = true;
/** A listener for received RTSP messages and possible failures. */
public interface MessageListener {
@ -54,6 +62,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
*/
void onRtspMessageReceived(List<String> message);
/**
* Called when interleaved binary data is received on RTSP.
*
* @param data The received binary data. The byte array will not be reused by {@link
* RtspMessageChannel}, and will always be full.
* @param channel The channel on which the data is received.
*/
default void onInterleavedBinaryDataReceived(byte[] data, int channel) {}
/**
* Called when failed to send an RTSP message.
*
@ -87,7 +104,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private @MonotonicNonNull Sender sender;
private @MonotonicNonNull Socket socket;
private boolean closed;
private volatile boolean closed;
/**
* Constructs a new instance.
@ -135,18 +152,25 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
*/
@Override
public void close() throws IOException {
// TODO(internal b/172331505) Make sure most resources are closed before throwing, and close()
// can be called again to close the resources that are still open.
if (closed) {
return;
}
try {
if (sender != null) {
sender.close();
}
receiverLoader.release();
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
if (socket != null) {
socket.close();
}
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
} finally {
closed = true;
}
}
/**
* Sends a serialized RTSP message.
@ -159,6 +183,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
private static void logMessage(List<String> rtspMessage) {
// TODO(b/172331505) Remove before release.
if (LOG_RTSP_MESSAGES) {
Log.d(TAG, Joiner.on('\n').join(rtspMessage));
}
@ -224,8 +249,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** A {@link Loadable} for receiving RTSP responses. */
private final class Receiver implements Loadable {
private final BufferedReader inputStreamReader;
/** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */
private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$';
private final DataInputStream dataInputStream;
private final RtspMessageBuilder messageBuilder;
private volatile boolean loadCanceled;
/**
@ -236,7 +265,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* InputStream}.
*/
public Receiver(InputStream inputStream) {
inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8));
dataInputStream = new DataInputStream(inputStream);
messageBuilder = new RtspMessageBuilder();
}
@Override
@ -246,26 +276,66 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override
public void load() throws IOException {
List<String> messageLines = new ArrayList<>();
while (!loadCanceled) {
String line;
while (inputStreamReader.ready() && (line = inputStreamReader.readLine()) != null) {
messageLines.add(line);
// TODO(internal b/172331505) Use a buffered read.
byte firstByte = dataInputStream.readByte();
if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) {
handleInterleavedBinaryData();
} else {
handleRtspMessage(firstByte);
}
}
}
if (!messageLines.isEmpty()) {
List<String> message = new ArrayList<>(messageLines);
logMessage(message);
/** Handles an entire RTSP message. */
private void handleRtspMessage(byte firstByte) throws IOException {
@Nullable
ImmutableList<String> messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte));
while (messageLines == null) {
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
}
logMessage(messageLines);
ImmutableList<String> messageLinesToPost = ImmutableList.copyOf(messageLines);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onRtspMessageReceived(message);
messageListener.onRtspMessageReceived(messageLinesToPost);
}
});
// Resets for the next response.
messageLines.clear();
}
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private byte[] handleRtspMessageLine(byte firstByte) throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();
byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);
while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}
return messageByteStream.toByteArray();
}
private void handleInterleavedBinaryData() throws IOException {
int channel = dataInputStream.readUnsignedByte();
int size = dataInputStream.readUnsignedShort();
byte[] data = new byte[size];
dataInputStream.readFully(data, /* off= */ 0, size);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onInterleavedBinaryDataReceived(data, channel);
}
});
}
}
@ -288,4 +358,93 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return Loader.DONT_RETRY;
}
}
/** Processes RTSP messages line-by-line. */
private static final class RtspMessageBuilder {
@IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY})
@interface ReadingState {}
private static final int STATE_READING_FIRST_LINE = 1;
private static final int STATE_READING_RTSP_HEADER = 2;
private static final int STATE_READING_RTSP_BODY = 3;
private final List<String> messageLines;
@ReadingState private int state;
private long messageBodyLength;
private long receivedMessageBodyLength;
/** Creates a new instance. */
public RtspMessageBuilder() {
messageLines = new ArrayList<>();
state = STATE_READING_FIRST_LINE;
}
/**
* Add a line to the builder.
*
* @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF.
* @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
*/
@Nullable
public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
// Trim CRLF.
checkArgument(
lineBytes.length >= 2
&& lineBytes[lineBytes.length - 2] == Ascii.CR
&& lineBytes[lineBytes.length - 1] == Ascii.LF);
String line =
new String(
lineBytes, /* offset= */ 0, /* length= */ lineBytes.length - 2, Charsets.UTF_8);
messageLines.add(line);
switch (state) {
case STATE_READING_FIRST_LINE:
if (isRtspStartLine(line)) {
state = STATE_READING_RTSP_HEADER;
}
break;
case STATE_READING_RTSP_HEADER:
// Check if the line contains RTSP Content-Length header.
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
if (contentLength != C.LENGTH_UNSET) {
messageBodyLength = contentLength;
}
if (line.isEmpty()) {
// An empty line signals the end of the header section.
if (messageBodyLength > 0) {
state = STATE_READING_RTSP_BODY;
} else {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
}
break;
case STATE_READING_RTSP_BODY:
receivedMessageBodyLength += lineBytes.length;
if (receivedMessageBodyLength >= messageBodyLength) {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
break;
default:
throw new IllegalStateException();
}
return null;
}
private void reset() {
messageLines.clear();
state = STATE_READING_FIRST_LINE;
messageBodyLength = 0;
receivedMessageBodyLength = 0;
}
}
}

View File

@ -30,6 +30,7 @@ import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_TEARD
import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_UNSET;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import android.net.Uri;
import androidx.annotation.Nullable;
@ -72,6 +73,10 @@ import java.util.regex.Pattern;
// Status line pattern, see RFC2326 Section 7.1.
private static final Pattern STATUS_LINE_PATTERN = Pattern.compile("RTSP/1\\.0 (\\d+) (.+)");
// Content length header pattern, see RFC2326 Section 12.14.
private static final Pattern CONTENT_LENGTH_HEADER_PATTERN =
Pattern.compile("Content-Length:\\s?(\\d+)", CASE_INSENSITIVE);
// Session header pattern, see RFC2326 Section 12.37.
private static final Pattern SESSION_HEADER_PATTERN =
Pattern.compile("(\\w+)(?:;\\s?timeout=(\\d+))?");
@ -260,6 +265,31 @@ import java.util.regex.Pattern;
return new RtspRequest(requestUri, method, headers, messageBody);
}
/** Returns whether the line is a valid RTSP start line. */
public static boolean isRtspStartLine(String line) {
return REQUEST_LINE_PATTERN.matcher(line).matches()
|| STATUS_LINE_PATTERN.matcher(line).matches();
}
/**
* Returns the length in bytes if the line contains a Content-Length header, otherwise {@link
* C#LENGTH_UNSET}.
*
* @throws ParserException If Content-Length cannot be parsed to an integer.
*/
public static long parseContentLengthHeader(String line) throws ParserException {
try {
Matcher matcher = CONTENT_LENGTH_HEADER_PATTERN.matcher(line);
if (matcher.find()) {
return Long.parseLong(checkNotNull(matcher.group(1)));
} else {
return C.LENGTH_UNSET;
}
} catch (NumberFormatException e) {
throw new ParserException(e);
}
}
/**
* Parses the RTSP PUBLIC header into a list of RTSP methods.
*

View File

@ -0,0 +1,164 @@
/*
* Copyright 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.convertMessageToByteArray;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.serializeResponse;
import static com.google.common.truth.Truth.assertThat;
import android.net.Uri;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.robolectric.RobolectricUtil;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.MessageListener;
import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Bytes;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
/** Unit test for {@link RtspMessageChannel}. */
@RunWith(AndroidJUnit4.class)
public final class RtspMessageChannelTest {
@Test
public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_postsToListener()
throws Exception {
RtspResponse optionsResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "2")
.add(RtspHeaders.PUBLIC, "OPTIONS")
.build(),
"");
RtspResponse describeResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "3")
.add(RtspHeaders.CONTENT_TYPE, "application/sdp")
.add(RtspHeaders.CONTENT_LENGTH, "28")
.build(),
"v=安卓アンドロイド\r\n");
RtspResponse setupResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "3")
.add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1")
.build(),
"");
// Channel: 0, size: 5, data: 01 02 03 04 05.
byte[] interleavedData1 = Util.getBytesFromHexString("0000050102030405");
// Channel: 1, size: 4, data: AA BB CC DD.
byte[] interleavedData2 = Util.getBytesFromHexString("010004AABBCCDD");
AtomicBoolean receivingFinished = new AtomicBoolean();
AtomicReference<Exception> sendingException = new AtomicReference<>();
List<List<String>> receivedRtspResponses = new ArrayList<>(/* initialCapacity= */ 3);
Multimap<Integer, List<Byte>> receivedInterleavedData = LinkedListMultimap.create();
ServerSocket serverSocket =
new ServerSocket(/* port= */ 0, /* backlog= */ 1, InetAddress.getByName(/* host= */ null));
Thread serverListenThread =
new Thread(
() -> {
try {
Socket socket = serverSocket.accept();
OutputStream serverOutputStream = socket.getOutputStream();
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(optionsResponse)));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(describeResponse)));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(setupResponse)));
} catch (IOException e) {
sendingException.set(e);
}
},
"RtspMessageChannelTest:ServerListenThread");
serverListenThread.start();
int serverRtspPortNumber = serverSocket.getLocalPort();
Uri connectionUri =
Uri.parse(Util.formatInvariant("rtsp://localhost:%d/test", serverRtspPortNumber));
Socket clientSideSocket =
SocketFactory.getDefault().createSocket(connectionUri.getHost(), connectionUri.getPort());
RtspMessageChannel rtspMessageChannel =
new RtspMessageChannel(
new MessageListener() {
@Override
public void onRtspMessageReceived(List<String> message) {
receivedRtspResponses.add(message);
if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) {
receivingFinished.set(true);
}
}
@Override
public void onInterleavedBinaryDataReceived(byte[] data, int channel) {
receivedInterleavedData.put(channel, Bytes.asList(data));
}
});
rtspMessageChannel.openSocket(clientSideSocket);
RobolectricUtil.runMainLooperUntil(receivingFinished::get);
Util.closeQuietly(rtspMessageChannel);
serverListenThread.join();
serverSocket.close();
assertThat(sendingException.get()).isNull();
assertThat(receivedRtspResponses)
.containsExactly(
/* optionsResponse */
ImmutableList.of("RTSP/1.0 200 OK", "CSeq: 2", "Public: OPTIONS", ""),
/* describeResponse */
ImmutableList.of(
"RTSP/1.0 200 OK",
"CSeq: 3",
"Content-Type: application/sdp",
"Content-Length: 28",
"",
"v=安卓アンドロイド"),
/* setupResponse */
ImmutableList.of(
"RTSP/1.0 200 OK", "CSeq: 3", "Transport: RTP/AVP/TCP;unicast;interleaved=0-1", ""))
.inOrder();
assertThat(receivedInterleavedData)
.containsExactly(
/* channel */ 0,
Bytes.asList(Util.getBytesFromHexString("0102030405")),
/* channel */ 1,
Bytes.asList(Util.getBytesFromHexString("AABBCCDD")));
}
}

View File

@ -363,4 +363,28 @@ public final class RtspMessageUtilTest {
assertThat(RtspMessageUtil.removeUserInfo(uri))
.isEqualTo(Uri.parse("rtsp://foo.bar:5050/foo.mkv"));
}
@Test
public void parseContentLengthHeader_withContentLengthOver31Bits_succeeds() throws Exception {
String line = "Content-Length: 1000000000000000";
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
assertThat(contentLength).isEqualTo(1000000000000000L);
}
@Test
public void isRtspStartLine_onValidRequestLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("OPTIONS rtsp://localhost/test RTSP/1.0")).isTrue();
}
@Test
public void isRtspStartLine_onValidResponseLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("RTSP/1.0 456 Header Field Not Valid for Resource"))
.isTrue();
}
@Test
public void isRtspStartLine_onValidHeaderLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("Transport: RTP/AVP;unicast;client_port=1000-1001"))
.isFalse();
}
}