From c15acdf0dbaa5d553cd923f2d15d8502906d70f4 Mon Sep 17 00:00:00 2001 From: claincly Date: Fri, 4 Jun 2021 11:07:02 +0100 Subject: [PATCH] Allow reading RTSP message body by Content-Length. Related to Issue: #8941. RTSP message body's format is not regulated by the RTSP spec, meaning it can use either CRLF or LF as its line terminator. The old code assumes every line ends with CRLF (RTSP message and the message body); the new code will rely on the Content-Length information to receive the bytes for the message body. #minor-release PiperOrigin-RevId: 377475565 --- .../source/rtsp/RtspMessageChannel.java | 168 ++++++++++++------ .../source/rtsp/RtspMessageUtil.java | 16 +- .../source/rtsp/SessionDescriptionParser.java | 4 +- .../source/rtsp/RtspMessageChannelTest.java | 26 ++- .../source/rtsp/RtspMessageUtilTest.java | 14 ++ 5 files changed, 163 insertions(+), 65 deletions(-) diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java index ab8d776033..d45af667dc 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java @@ -17,6 +17,7 @@ package com.google.android.exoplayer2.source.rtsp; import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine; import static com.google.android.exoplayer2.util.Assertions.checkArgument; +import static com.google.android.exoplayer2.util.Assertions.checkState; import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; import android.os.Handler; @@ -31,6 +32,7 @@ import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.common.base.Ascii; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; @@ -251,10 +253,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final class Receiver implements Loadable { /** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */ - private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$'; + private static final byte INTERLEAVED_MESSAGE_MARKER = '$'; private final DataInputStream dataInputStream; - private final RtspMessageBuilder messageBuilder; + private final MessageParser messageParser; private volatile boolean loadCanceled; /** @@ -266,7 +268,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; */ public Receiver(InputStream inputStream) { dataInputStream = new DataInputStream(inputStream); - messageBuilder = new RtspMessageBuilder(); + messageParser = new MessageParser(); } @Override @@ -279,7 +281,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; while (!loadCanceled) { // TODO(internal b/172331505) Use a buffered read. byte firstByte = dataInputStream.readByte(); - if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) { + if (firstByte == INTERLEAVED_MESSAGE_MARKER) { handleInterleavedBinaryData(); } else { handleRtspMessage(firstByte); @@ -289,36 +291,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** Handles an entire RTSP message. */ private void handleRtspMessage(byte firstByte) throws IOException { - @Nullable - ImmutableList messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte)); - while (messageLines == null) { - messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); - } - if (!closed) { - messageListener.onRtspMessageReceived(messageLines); + messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream)); } } - /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ - private byte[] handleRtspMessageLine(byte firstByte) throws IOException { - ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); - - byte[] peekedBytes = new byte[2]; - peekedBytes[0] = firstByte; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes); - - while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { - // Shift the CRLF buffer. - peekedBytes[0] = peekedBytes[1]; - peekedBytes[1] = dataInputStream.readByte(); - messageByteStream.write(peekedBytes[1]); - } - - return messageByteStream.toByteArray(); - } - private void handleInterleavedBinaryData() throws IOException { int channel = dataInputStream.readUnsignedByte(); int size = dataInputStream.readUnsignedShort(); @@ -354,38 +331,91 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; return Loader.DONT_RETRY; } } - /** Processes RTSP messages line-by-line. */ - private static final class RtspMessageBuilder { - @IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY}) + /** Processes RTSP messages line-by-line. */ + private static final class MessageParser { + + @IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY}) @interface ReadingState {} private static final int STATE_READING_FIRST_LINE = 1; - private static final int STATE_READING_RTSP_HEADER = 2; - private static final int STATE_READING_RTSP_BODY = 3; + private static final int STATE_READING_HEADER = 2; + private static final int STATE_READING_BODY = 3; private final List messageLines; @ReadingState private int state; private long messageBodyLength; - private long receivedMessageBodyLength; /** Creates a new instance. */ - public RtspMessageBuilder() { + public MessageParser() { messageLines = new ArrayList<>(); state = STATE_READING_FIRST_LINE; } /** - * Add a line to the builder. + * Receives and parses an entire RTSP message. * - * @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF. - * @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code - * null} if the message is not yet complete. + * @param firstByte The first byte received for the RTSP message. + * @param dataInputStream The {@link DataInputStream} on which RTSP messages are received. + * @return An {@link ImmutableList} of the lines that make up an RTSP message. + */ + public ImmutableList parseNext(byte firstByte, DataInputStream dataInputStream) + throws IOException { + @Nullable + ImmutableList parsedMessageLines = + addMessageLine(parseNextLine(firstByte, dataInputStream)); + + while (parsedMessageLines == null) { + if (state == STATE_READING_BODY) { + if (messageBodyLength > 0) { + // Message body's format is not regulated under RTSP, so it could use LF (instead of + // RTSP's CRLF) as line ending. The length of the message body is included in the RTSP + // Content-Length header. + // Assume the message body length is within a 32-bit integer. + int messageBodyLengthInt = Ints.checkedCast(messageBodyLength); + checkState(messageBodyLengthInt != C.LENGTH_UNSET); + byte[] messageBodyBytes = new byte[messageBodyLengthInt]; + dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt); + parsedMessageLines = addMessageBody(messageBodyBytes); + } else { + throw new IllegalStateException("Expects a greater than zero Content-Length."); + } + } else { + parsedMessageLines = + addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream)); + } + } + return parsedMessageLines; + } + + /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ + private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream) + throws IOException { + ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); + + byte[] peekedBytes = new byte[2]; + peekedBytes[0] = firstByte; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes); + + while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { + // Shift the CRLF buffer. + peekedBytes[0] = peekedBytes[1]; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes[1]); + } + + return messageByteStream.toByteArray(); + } + + /** + * Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code + * null} if the message is not yet complete. */ @Nullable - public ImmutableList addLine(byte[] lineBytes) throws ParserException { - // Trim CRLF. + private ImmutableList addMessageLine(byte[] lineBytes) throws ParserException { + // Trim CRLF. RTSP lists are terminated by a CRLF. checkArgument( lineBytes.length >= 2 && lineBytes[lineBytes.length - 2] == Ascii.CR @@ -397,11 +427,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; switch (state) { case STATE_READING_FIRST_LINE: if (isRtspStartLine(line)) { - state = STATE_READING_RTSP_HEADER; + state = STATE_READING_HEADER; } break; - case STATE_READING_RTSP_HEADER: + case STATE_READING_HEADER: // Check if the line contains RTSP Content-Length header. long contentLength = RtspMessageUtil.parseContentLengthHeader(line); if (contentLength != C.LENGTH_UNSET) { @@ -411,7 +441,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; if (line.isEmpty()) { // An empty line signals the end of the header section. if (messageBodyLength > 0) { - state = STATE_READING_RTSP_BODY; + state = STATE_READING_BODY; } else { ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); reset(); @@ -420,14 +450,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } break; - case STATE_READING_RTSP_BODY: - receivedMessageBodyLength += lineBytes.length; - if (receivedMessageBodyLength >= messageBodyLength) { - ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); - reset(); - return linesToReturn; - } - break; + case STATE_READING_BODY: + // Message body must be handled by addMessageBody(). default: throw new IllegalStateException(); @@ -435,11 +459,45 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; return null; } + /** Returns a list of completed RTSP message lines, without the line terminators. */ + private ImmutableList addMessageBody(byte[] messageBodyBytes) { + checkState(state == STATE_READING_BODY); + + String messageBody; + if (messageBodyBytes.length > 0 + && messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) { + if (messageBodyBytes.length > 1 + && messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) { + // Line ends with CRLF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 2, + CHARSET); + } else { + // Line ends with LF. + messageBody = + new String( + messageBodyBytes, + /* offset= */ 0, + /* length= */ messageBodyBytes.length - 1, + CHARSET); + } + } else { + throw new IllegalArgumentException("Message body is empty or does not end with a LF."); + } + + messageLines.add(messageBody); + ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); + reset(); + return linesToReturn; + } + private void reset() { messageLines.clear(); state = STATE_READING_FIRST_LINE; messageBodyLength = 0; - receivedMessageBodyLength = 0; } } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java index b385dc64b4..a6636effee 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java @@ -38,6 +38,7 @@ import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.ParserException; import com.google.android.exoplayer2.util.Util; +import com.google.common.base.Ascii; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; @@ -107,6 +108,8 @@ import java.util.regex.Pattern; Pattern.compile("Basic realm=\"([\\w\\s@.]+)\""); private static final String RTSP_VERSION = "RTSP/1.0"; + private static final String LF = new String(new byte[] {Ascii.LF}); + private static final String CRLF = new String(new byte[] {Ascii.CR, Ascii.LF}); /** * Serializes an {@link RtspRequest} to an {@link ImmutableList} of strings. @@ -167,7 +170,7 @@ import java.util.regex.Pattern; * removed. */ public static byte[] convertMessageToByteArray(List message) { - return Joiner.on("\r\n").join(message).getBytes(RtspMessageChannel.CHARSET); + return Joiner.on(CRLF).join(message).getBytes(RtspMessageChannel.CHARSET); } /** Removes the user info from the supplied {@link Uri}. */ @@ -211,7 +214,7 @@ import java.util.regex.Pattern; /** Returns the corresponding String representation of the {@link RtspRequest.Method} argument. */ public static String toMethodString(@RtspRequest.Method int method) { switch (method) { - case RtspRequest.METHOD_ANNOUNCE: + case METHOD_ANNOUNCE: return "ANNOUNCE"; case METHOD_DESCRIBE: return "DESCRIBE"; @@ -291,7 +294,7 @@ import java.util.regex.Pattern; List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspResponse(statusCode, headers, messageBody); } @@ -314,7 +317,7 @@ import java.util.regex.Pattern; List headerLines = lines.subList(1, messageBodyOffset); RtspHeaders headers = new RtspHeaders.Builder().addAll(headerLines).build(); - String messageBody = Joiner.on("\r\n").join(lines.subList(messageBodyOffset + 1, lines.size())); + String messageBody = Joiner.on(CRLF).join(lines.subList(messageBodyOffset + 1, lines.size())); return new RtspRequest(requestUri, method, headers, messageBody); } @@ -324,6 +327,11 @@ import java.util.regex.Pattern; || STATUS_LINE_PATTERN.matcher(line).matches(); } + /** Returns the lines in an RTSP message body split by the line terminator used in body. */ + public static String[] splitRtspMessageBody(String body) { + return Util.split(body, body.contains(CRLF) ? CRLF : LF); + } + /** * Returns the length in bytes if the line contains a Content-Length header, otherwise {@link * C#LENGTH_UNSET}. diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java index bff26c1703..b38c72d8ca 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/SessionDescriptionParser.java @@ -41,8 +41,6 @@ import java.util.regex.Pattern; private static final Pattern MEDIA_DESCRIPTION_PATTERN = Pattern.compile("(\\S+)\\s(\\S+)\\s(\\S+)\\s(\\S+)"); - private static final String CRLF = "\r\n"; - private static final String VERSION_TYPE = "v"; private static final String ORIGIN_TYPE = "o"; private static final String SESSION_TYPE = "s"; @@ -71,7 +69,7 @@ import java.util.regex.Pattern; @Nullable MediaDescription.Builder mediaDescriptionBuilder = null; // Lines are separated by an CRLF. - for (String line : Util.split(sdpString, CRLF)) { + for (String line : RtspMessageUtil.splitRtspMessageBody(sdpString)) { if ("".equals(line)) { continue; } diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java index b7d060ef95..cc2f4d24a3 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java @@ -66,11 +66,21 @@ public final class RtspMessageChannelTest { .build(), "v=安卓アンドロイド\r\n"); + RtspResponse describeResponse2 = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "4") + .add(RtspHeaders.CONTENT_TYPE, "application/sdp") + .add(RtspHeaders.CONTENT_LENGTH, "73") + .build(), + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1\n"); + RtspResponse setupResponse = new RtspResponse( 200, new RtspHeaders.Builder() - .add(RtspHeaders.CSEQ, "3") + .add(RtspHeaders.CSEQ, "5") .add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1") .build(), ""); @@ -97,6 +107,8 @@ public final class RtspMessageChannelTest { convertMessageToByteArray(serializeResponse(optionsResponse))); serverOutputStream.write( convertMessageToByteArray(serializeResponse(describeResponse))); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(describeResponse2))); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1)); serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2)); serverOutputStream.write( @@ -118,7 +130,7 @@ public final class RtspMessageChannelTest { new RtspMessageChannel( message -> { receivedRtspResponses.add(message); - if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) { + if (receivedRtspResponses.size() == 4 && receivedInterleavedData.size() == 2) { receivingFinished.set(true); } }); @@ -148,9 +160,17 @@ public final class RtspMessageChannelTest { "content-length: 28", "", "v=安卓アンドロイド"), + /* describeResponse2 */ + ImmutableList.of( + "RTSP/1.0 200 OK", + "cseq: 4", + "content-type: application/sdp", + "content-length: 73", + "", + "v=安卓アンドロイド\n" + "o=test 2890844526 2890842807 IN IP4 127.0.0.1"), /* setupResponse */ ImmutableList.of( - "RTSP/1.0 200 OK", "cseq: 3", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) + "RTSP/1.0 200 OK", "cseq: 5", "transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) .inOrder(); assertThat(receivedInterleavedData) .containsExactly( diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java index 4025a971ee..c0611dddca 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java @@ -480,4 +480,18 @@ public final class RtspMessageUtilTest { assertThat(authenticationInfo.realm).isEqualTo("LIVE555 Streaming Media"); assertThat(authenticationInfo.opaque).isEmpty(); } + + @Test + public void splitRtspMessageBody_withCrLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\r\nline2\r\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } + + @Test + public void splitRtspMessageBody_withLfLineTerminatorMessageBody_splitsMessageBody() { + String[] lines = RtspMessageUtil.splitRtspMessageBody("line1\nline2\nline3"); + + assertThat(lines).asList().containsExactly("line1", "line2", "line3").inOrder(); + } }