Allow reading RTSP message body by Content-Length.

Related to Issue: #8941.

RTSP message body's format is not regulated by the RTSP spec, meaning it can
use either CRLF or LF as its line terminator. The old code assumes every line
ends with CRLF (RTSP message and the message body); the new code will rely on
the Content-Length information to receive the bytes for the message body.

#minor-release

PiperOrigin-RevId: 377475565
This commit is contained in:
claincly 2021-06-04 11:07:02 +01:00 committed by Oliver Woodman
parent 4d3b98c212
commit c15acdf0db
5 changed files with 163 additions and 65 deletions

View File

@ -17,6 +17,7 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkState;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import android.os.Handler;
@ -31,6 +32,7 @@ import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
@ -251,10 +253,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final class Receiver implements Loadable {
/** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */
private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$';
private static final byte INTERLEAVED_MESSAGE_MARKER = '$';
private final DataInputStream dataInputStream;
private final RtspMessageBuilder messageBuilder;
private final MessageParser messageParser;
private volatile boolean loadCanceled;
/**
@ -266,7 +268,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
*/
public Receiver(InputStream inputStream) {
dataInputStream = new DataInputStream(inputStream);
messageBuilder = new RtspMessageBuilder();
messageParser = new MessageParser();
}
@Override
@ -279,7 +281,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
while (!loadCanceled) {
// TODO(internal b/172331505) Use a buffered read.
byte firstByte = dataInputStream.readByte();
if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) {
if (firstByte == INTERLEAVED_MESSAGE_MARKER) {
handleInterleavedBinaryData();
} else {
handleRtspMessage(firstByte);
@ -289,36 +291,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** Handles an entire RTSP message. */
private void handleRtspMessage(byte firstByte) throws IOException {
@Nullable
ImmutableList<String> messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte));
while (messageLines == null) {
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
}
if (!closed) {
messageListener.onRtspMessageReceived(messageLines);
messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream));
}
}
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private byte[] handleRtspMessageLine(byte firstByte) throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();
byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);
while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}
return messageByteStream.toByteArray();
}
private void handleInterleavedBinaryData() throws IOException {
int channel = dataInputStream.readUnsignedByte();
int size = dataInputStream.readUnsignedShort();
@ -354,38 +331,91 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return Loader.DONT_RETRY;
}
}
/** Processes RTSP messages line-by-line. */
private static final class RtspMessageBuilder {
@IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY})
/** Processes RTSP messages line-by-line. */
private static final class MessageParser {
@IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY})
@interface ReadingState {}
private static final int STATE_READING_FIRST_LINE = 1;
private static final int STATE_READING_RTSP_HEADER = 2;
private static final int STATE_READING_RTSP_BODY = 3;
private static final int STATE_READING_HEADER = 2;
private static final int STATE_READING_BODY = 3;
private final List<String> messageLines;
@ReadingState private int state;
private long messageBodyLength;
private long receivedMessageBodyLength;
/** Creates a new instance. */
public RtspMessageBuilder() {
public MessageParser() {
messageLines = new ArrayList<>();
state = STATE_READING_FIRST_LINE;
}
/**
* Add a line to the builder.
* Receives and parses an entire RTSP message.
*
* @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF.
* @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
* @param firstByte The first byte received for the RTSP message.
* @param dataInputStream The {@link DataInputStream} on which RTSP messages are received.
* @return An {@link ImmutableList} of the lines that make up an RTSP message.
*/
public ImmutableList<String> parseNext(byte firstByte, DataInputStream dataInputStream)
throws IOException {
@Nullable
ImmutableList<String> parsedMessageLines =
addMessageLine(parseNextLine(firstByte, dataInputStream));
while (parsedMessageLines == null) {
if (state == STATE_READING_BODY) {
if (messageBodyLength > 0) {
// Message body's format is not regulated under RTSP, so it could use LF (instead of
// RTSP's CRLF) as line ending. The length of the message body is included in the RTSP
// Content-Length header.
// Assume the message body length is within a 32-bit integer.
int messageBodyLengthInt = Ints.checkedCast(messageBodyLength);
checkState(messageBodyLengthInt != C.LENGTH_UNSET);
byte[] messageBodyBytes = new byte[messageBodyLengthInt];
dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt);
parsedMessageLines = addMessageBody(messageBodyBytes);
} else {
throw new IllegalStateException("Expects a greater than zero Content-Length.");
}
} else {
parsedMessageLines =
addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream));
}
}
return parsedMessageLines;
}
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream)
throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();
byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);
while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}
return messageByteStream.toByteArray();
}
/**
* Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
*/
@Nullable
public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
// Trim CRLF.
private ImmutableList<String> addMessageLine(byte[] lineBytes) throws ParserException {
// Trim CRLF. RTSP lists are terminated by a CRLF.
checkArgument(
lineBytes.length >= 2
&& lineBytes[lineBytes.length - 2] == Ascii.CR
@ -397,11 +427,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
switch (state) {
case STATE_READING_FIRST_LINE:
if (isRtspStartLine(line)) {
state = STATE_READING_RTSP_HEADER;
state = STATE_READING_HEADER;
}
break;
case STATE_READING_RTSP_HEADER:
case STATE_READING_HEADER:
// Check if the line contains RTSP Content-Length header.
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
if (contentLength != C.LENGTH_UNSET) {
@ -411,7 +441,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (line.isEmpty()) {
// An empty line signals the end of the header section.
if (messageBodyLength > 0) {
state = STATE_READING_RTSP_BODY;
state = STATE_READING_BODY;
} else {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
@ -420,14 +450,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
break;
case STATE_READING_RTSP_BODY:
receivedMessageBodyLength += lineBytes.length;
if (receivedMessageBodyLength >= messageBodyLength) {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
break;
case STATE_READING_BODY:
// Message body must be handled by addMessageBody().
default:
throw new IllegalStateException();
@ -435,11 +459,45 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return null;
}
/** Returns a list of completed RTSP message lines, without the line terminators. */
private ImmutableList<String> addMessageBody(byte[] messageBodyBytes) {
checkState(state == STATE_READING_BODY);
String messageBody;
if (messageBodyBytes.length > 0
&& messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) {
if (messageBodyBytes.length > 1
&& messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) {
// Line ends with CRLF.
messageBody =
new String(
messageBodyBytes,
/* offset= */ 0,
/* length= */ messageBodyBytes.length - 2,
CHARSET);
} else {
// Line ends with LF.
messageBody =
new String(
messageBodyBytes,
/* offset= */ 0,
/* length= */ messageBodyBytes.length - 1,
CHARSET);
}
} else {
throw new IllegalArgumentException("Message body is empty or does not end with a LF.");
}
messageLines.add(messageBody);
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
private void reset() {
messageLines.clear();
state = STATE_READING_FIRST_LINE;
messageBodyLength = 0;
receivedMessageBodyLength = 0;
}
}
}

View File

@ -38,6 +38,7 @@ import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.util.Util;
import com.google.common.base.Ascii;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
@ -107,6 +108,8 @@ import java.util.regex.Pattern;
Pattern.compile("Basic realm=\"([\\w\\s@.]+)\"");
private static final String RTSP_VERSION = "RTSP/1.0";
private static final String LF = new String(new byte[] {Ascii.LF});
private static final String CRLF = new String(new byte[] {Ascii.CR, Ascii.LF});
/**
* Serializes an {@link RtspRequest} to an {@link ImmutableList} of strings.
@ -167,7 +170,7 @@ import java.util.regex.Pattern;
* removed.
*/
public static byte[] convertMessageToByteArray(List<String> message) {
return Joiner.on("\r\n").join(message).getBytes(RtspMessageChannel.CHARSET);
return Joiner.on(CRLF).join(message).getBytes(RtspMessageChannel.CHARSET);
}
/** Removes the user info from the supplied {@link Uri}. */
@ -211,7 +214,7 @@ import java.util.regex.Pattern;
/** Returns the corresponding String representation of the {@link RtspRequest.Method} argument. */
public static String toMethodString(@RtspRequest.Method int method) {
switch (method) {
case RtspRequest.METHOD_ANNOUNCE:
case METHOD_ANNOUNCE:
return "ANNOUNCE";
case METHOD_DESCRIBE:
return "DESCRIBE";
@ -291,7 +294,7 @@ import java.util.regex.Pattern;
List<String> headerLines = lines.subList(1, messageBodyOffset);
RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build();
String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size()));
String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size()));
return new RtspResponse(statusCode, headers, messageBody);
}
@ -314,7 +317,7 @@ import java.util.regex.Pattern;
List<String> headerLines = lines.subList(1, messageBodyOffset);
RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build();
String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size()));
String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size()));
return new RtspRequest(requestUri, method, headers, messageBody);
}
@ -324,6 +327,11 @@ import java.util.regex.Pattern;
|| STATUS_LINE_PATTERN.matcher(line).matches();
}
/** Returns the lines in an RTSP message body split by the line terminator used in body. */
public static String[] splitRtspMessageBody(String body) {
return Util.split(body, body.contains(CRLF) ? CRLF : LF);
}
/**
* Returns the length in bytes if the line contains a Content-Length header, otherwise {@link
* C#LENGTH_UNSET}.

View File

@ -41,8 +41,6 @@ import java.util.regex.Pattern;
private static final Pattern MEDIA_DESCRIPTION_PATTERN =
Pattern.compile("(\\S+)\\s(\\S+)\\s(\\S+)\\s(\\S+)");
private static final String CRLF = "\r\n";
private static final String VERSION_TYPE = "v";
private static final String ORIGIN_TYPE = "o";
private static final String SESSION_TYPE = "s";
@ -71,7 +69,7 @@ import java.util.regex.Pattern;
@Nullable MediaDescription.Builder mediaDescriptionBuilder = null;
// Lines are separated by an CRLF.
for (String line : Util.split(sdpString, CRLF)) {
for (String line : RtspMessageUtil.splitRtspMessageBody(sdpString)) {
if ("".equals(line)) {
continue;
}

View File

@ -66,11 +66,21 @@ public final class RtspMessageChannelTest {
.build(),
"v=安卓アンドロイド\r\n");
RtspResponse describeResponse2 =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "4")
.add(RtspHeaders.CONTENT_TYPE, "application/sdp")
.add(RtspHeaders.CONTENT_LENGTH, "73")
.build(),
"v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1\n");
RtspResponse setupResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "3")
.add(RtspHeaders.CSEQ, "5")
.add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1")
.build(),
"");
@ -97,6 +107,8 @@ public final class RtspMessageChannelTest {
convertMessageToByteArray(serializeResponse(optionsResponse)));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(describeResponse)));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(describeResponse2)));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2));
serverOutputStream.write(
@ -118,7 +130,7 @@ public final class RtspMessageChannelTest {
new RtspMessageChannel(
message -> {
receivedRtspResponses.add(message);
if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) {
if (receivedRtspResponses.size() == 4 && receivedInterleavedData.size() == 2) {
receivingFinished.set(true);
}
});
@ -148,9 +160,17 @@ public final class RtspMessageChannelTest {
"content-length: 28",
"",
"v=安卓アンドロイド"),
/* describeResponse2 */
ImmutableList.of(
"RTSP/1.0 200 OK",
"cseq: 4",
"content-type: application/sdp",
"content-length: 73",
"",
"v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1"),
/* setupResponse */
ImmutableList.of(
"RTSP/1.0 200 OK", "cseq: 3", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", ""))
"RTSP/1.0 200 OK", "cseq: 5", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", ""))
.inOrder();
assertThat(receivedInterleavedData)
.containsExactly(

View File

@ -480,4 +480,18 @@ public final class RtspMessageUtilTest {
assertThat(authenticationInfo.realm).isEqualTo("LIVE555 Streaming Media");
assertThat(authenticationInfo.opaque).isEmpty();
}
@Test
public void splitRtspMessageBody_withCrLfLineTerminatorMessageBody_splitsMessageBody() {
String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\r\nline2\r\nline3");
assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder();
}
@Test
public void splitRtspMessageBody_withLfLineTerminatorMessageBody_splitsMessageBody() {
String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\nline2\nline3");
assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder();
}
}