diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java index ac66722cd9..1707ed0bb3 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java @@ -15,23 +15,31 @@ */ package com.google.android.exoplayer2.source.rtsp; +import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine; +import static com.google.android.exoplayer2.util.Assertions.checkArgument; import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; import android.os.Handler; import android.os.HandlerThread; import android.os.Looper; +import androidx.annotation.IntDef; +import androidx.annotation.Nullable; +import com.google.android.exoplayer2.C; +import com.google.android.exoplayer2.ParserException; import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction; import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.android.exoplayer2.util.Log; import com.google.android.exoplayer2.util.Util; +import com.google.common.base.Ascii; import com.google.common.base.Charsets; import com.google.common.base.Joiner; -import java.io.BufferedReader; +import com.google.common.collect.ImmutableList; +import java.io.ByteArrayOutputStream; import java.io.Closeable; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; @@ -42,7 +50,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /* package */ final class RtspMessageChannel implements Closeable { private static final String TAG = "RtspMessageChannel"; - private static final boolean LOG_RTSP_MESSAGES = false; + private static final boolean LOG_RTSP_MESSAGES = true; /** A listener for received RTSP messages and possible failures. */ public interface MessageListener { @@ -54,6 +62,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; */ void onRtspMessageReceived(List message); + /** + * Called when interleaved binary data is received on RTSP. + * + * @param data The received binary data. The byte array will not be reused by {@link + * RtspMessageChannel}, and will always be full. + * @param channel The channel on which the data is received. + */ + default void onInterleavedBinaryDataReceived(byte[] data, int channel) {} + /** * Called when failed to send an RTSP message. * @@ -87,7 +104,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private @MonotonicNonNull Sender sender; private @MonotonicNonNull Socket socket; - private boolean closed; + private volatile boolean closed; /** * Constructs a new instance. @@ -135,17 +152,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; */ @Override public void close() throws IOException { - if (sender != null) { - sender.close(); + // TODO(internal b/172331505) Make sure most resources are closed before throwing, and close() + // can be called again to close the resources that are still open. + if (closed) { + return; } - receiverLoader.release(); + try { + if (sender != null) { + sender.close(); + } + receiverLoader.release(); + messageListenerHandler.removeCallbacksAndMessages(/* token= */ null); - if (socket != null) { - socket.close(); + if (socket != null) { + socket.close(); + } + } finally { + closed = true; } - - messageListenerHandler.removeCallbacksAndMessages(/* token= */ null); - closed = true; } /** @@ -159,6 +183,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } private static void logMessage(List rtspMessage) { + // TODO(b/172331505) Remove before release. if (LOG_RTSP_MESSAGES) { Log.d(TAG, Joiner.on('\n').join(rtspMessage)); } @@ -224,8 +249,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** A {@link Loadable} for receiving RTSP responses. */ private final class Receiver implements Loadable { - private final BufferedReader inputStreamReader; + /** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */ + private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$'; + + private final DataInputStream dataInputStream; + private final RtspMessageBuilder messageBuilder; private volatile boolean loadCanceled; /** @@ -236,7 +265,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * InputStream}. */ public Receiver(InputStream inputStream) { - inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8)); + dataInputStream = new DataInputStream(inputStream); + messageBuilder = new RtspMessageBuilder(); } @Override @@ -246,27 +276,67 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Override public void load() throws IOException { - List messageLines = new ArrayList<>(); while (!loadCanceled) { - String line; - while (inputStreamReader.ready() && (line = inputStreamReader.readLine()) != null) { - messageLines.add(line); - } - - if (!messageLines.isEmpty()) { - List message = new ArrayList<>(messageLines); - logMessage(message); - messageListenerHandler.post( - () -> { - if (!closed) { - messageListener.onRtspMessageReceived(message); - } - }); - // Resets for the next response. - messageLines.clear(); + // TODO(internal b/172331505) Use a buffered read. + byte firstByte = dataInputStream.readByte(); + if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) { + handleInterleavedBinaryData(); + } else { + handleRtspMessage(firstByte); } } } + + /** Handles an entire RTSP message. */ + private void handleRtspMessage(byte firstByte) throws IOException { + @Nullable + ImmutableList messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte)); + while (messageLines == null) { + messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); + } + + logMessage(messageLines); + ImmutableList messageLinesToPost = ImmutableList.copyOf(messageLines); + messageListenerHandler.post( + () -> { + if (!closed) { + messageListener.onRtspMessageReceived(messageLinesToPost); + } + }); + } + + /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ + private byte[] handleRtspMessageLine(byte firstByte) throws IOException { + ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream(); + + byte[] peekedBytes = new byte[2]; + peekedBytes[0] = firstByte; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes); + + while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) { + // Shift the CRLF buffer. + peekedBytes[0] = peekedBytes[1]; + peekedBytes[1] = dataInputStream.readByte(); + messageByteStream.write(peekedBytes[1]); + } + + return messageByteStream.toByteArray(); + } + + private void handleInterleavedBinaryData() throws IOException { + int channel = dataInputStream.readUnsignedByte(); + int size = dataInputStream.readUnsignedShort(); + byte[] data = new byte[size]; + dataInputStream.readFully(data, /* off= */ 0, size); + + messageListenerHandler.post( + () -> { + if (!closed) { + messageListener.onInterleavedBinaryDataReceived(data, channel); + } + }); + } } private final class LoaderCallbackImpl implements Loader.Callback { @@ -288,4 +358,93 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; return Loader.DONT_RETRY; } } + /** Processes RTSP messages line-by-line. */ + private static final class RtspMessageBuilder { + + @IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY}) + @interface ReadingState {} + + private static final int STATE_READING_FIRST_LINE = 1; + private static final int STATE_READING_RTSP_HEADER = 2; + private static final int STATE_READING_RTSP_BODY = 3; + + private final List messageLines; + + @ReadingState private int state; + private long messageBodyLength; + private long receivedMessageBodyLength; + + /** Creates a new instance. */ + public RtspMessageBuilder() { + messageLines = new ArrayList<>(); + state = STATE_READING_FIRST_LINE; + } + + /** + * Add a line to the builder. + * + * @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF. + * @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code + * null} if the message is not yet complete. + */ + @Nullable + public ImmutableList addLine(byte[] lineBytes) throws ParserException { + // Trim CRLF. + checkArgument( + lineBytes.length >= 2 + && lineBytes[lineBytes.length - 2] == Ascii.CR + && lineBytes[lineBytes.length - 1] == Ascii.LF); + String line = + new String( + lineBytes, /* offset= */ 0, /* length= */ lineBytes.length - 2, Charsets.UTF_8); + messageLines.add(line); + + switch (state) { + case STATE_READING_FIRST_LINE: + if (isRtspStartLine(line)) { + state = STATE_READING_RTSP_HEADER; + } + break; + + case STATE_READING_RTSP_HEADER: + // Check if the line contains RTSP Content-Length header. + long contentLength = RtspMessageUtil.parseContentLengthHeader(line); + if (contentLength != C.LENGTH_UNSET) { + messageBodyLength = contentLength; + } + + if (line.isEmpty()) { + // An empty line signals the end of the header section. + if (messageBodyLength > 0) { + state = STATE_READING_RTSP_BODY; + } else { + ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); + reset(); + return linesToReturn; + } + } + break; + + case STATE_READING_RTSP_BODY: + receivedMessageBodyLength += lineBytes.length; + if (receivedMessageBodyLength >= messageBodyLength) { + ImmutableList linesToReturn = ImmutableList.copyOf(messageLines); + reset(); + return linesToReturn; + } + break; + + default: + throw new IllegalStateException(); + } + return null; + } + + private void reset() { + messageLines.clear(); + state = STATE_READING_FIRST_LINE; + messageBodyLength = 0; + receivedMessageBodyLength = 0; + } + } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java index 4ee0abb90c..968f2edae0 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtil.java @@ -30,6 +30,7 @@ import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_TEARD import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_UNSET; import static com.google.android.exoplayer2.util.Assertions.checkArgument; import static com.google.android.exoplayer2.util.Assertions.checkNotNull; +import static java.util.regex.Pattern.CASE_INSENSITIVE; import android.net.Uri; import androidx.annotation.Nullable; @@ -72,6 +73,10 @@ import java.util.regex.Pattern; // Status line pattern, see RFC2326 Section 7.1. private static final Pattern STATUS_LINE_PATTERN = Pattern.compile("RTSP/1\\.0 (\\d+) (.+)"); + // Content length header pattern, see RFC2326 Section 12.14. + private static final Pattern CONTENT_LENGTH_HEADER_PATTERN = + Pattern.compile("Content-Length:\\s?(\\d+)", CASE_INSENSITIVE); + // Session header pattern, see RFC2326 Section 12.37. private static final Pattern SESSION_HEADER_PATTERN = Pattern.compile("(\\w+)(?:;\\s?timeout=(\\d+))?"); @@ -260,6 +265,31 @@ import java.util.regex.Pattern; return new RtspRequest(requestUri, method, headers, messageBody); } + /** Returns whether the line is a valid RTSP start line. */ + public static boolean isRtspStartLine(String line) { + return REQUEST_LINE_PATTERN.matcher(line).matches() + || STATUS_LINE_PATTERN.matcher(line).matches(); + } + + /** + * Returns the length in bytes if the line contains a Content-Length header, otherwise {@link + * C#LENGTH_UNSET}. + * + * @throws ParserException If Content-Length cannot be parsed to an integer. + */ + public static long parseContentLengthHeader(String line) throws ParserException { + try { + Matcher matcher = CONTENT_LENGTH_HEADER_PATTERN.matcher(line); + if (matcher.find()) { + return Long.parseLong(checkNotNull(matcher.group(1))); + } else { + return C.LENGTH_UNSET; + } + } catch (NumberFormatException e) { + throw new ParserException(e); + } + } + /** * Parses the RTSP PUBLIC header into a list of RTSP methods. * diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java new file mode 100644 index 0000000000..99b2572434 --- /dev/null +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannelTest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.convertMessageToByteArray; +import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.serializeResponse; +import static com.google.common.truth.Truth.assertThat; + +import android.net.Uri; +import androidx.test.ext.junit.runners.AndroidJUnit4; +import com.google.android.exoplayer2.robolectric.RobolectricUtil; +import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.MessageListener; +import com.google.android.exoplayer2.util.Util; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.primitives.Bytes; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.SocketFactory; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** Unit test for {@link RtspMessageChannel}. */ +@RunWith(AndroidJUnit4.class) +public final class RtspMessageChannelTest { + + @Test + public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_postsToListener() + throws Exception { + RtspResponse optionsResponse = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "2") + .add(RtspHeaders.PUBLIC, "OPTIONS") + .build(), + ""); + + RtspResponse describeResponse = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "3") + .add(RtspHeaders.CONTENT_TYPE, "application/sdp") + .add(RtspHeaders.CONTENT_LENGTH, "28") + .build(), + "v=安卓アンドロイド\r\n"); + + RtspResponse setupResponse = + new RtspResponse( + 200, + new RtspHeaders.Builder() + .add(RtspHeaders.CSEQ, "3") + .add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1") + .build(), + ""); + + // Channel: 0, size: 5, data: 01 02 03 04 05. + byte[] interleavedData1 = Util.getBytesFromHexString("0000050102030405"); + // Channel: 1, size: 4, data: AA BB CC DD. + byte[] interleavedData2 = Util.getBytesFromHexString("010004AABBCCDD"); + + AtomicBoolean receivingFinished = new AtomicBoolean(); + AtomicReference sendingException = new AtomicReference<>(); + List> receivedRtspResponses = new ArrayList<>(/* initialCapacity= */ 3); + Multimap> receivedInterleavedData = LinkedListMultimap.create(); + ServerSocket serverSocket = + new ServerSocket(/* port= */ 0, /* backlog= */ 1, InetAddress.getByName(/* host= */ null)); + Thread serverListenThread = + new Thread( + () -> { + try { + Socket socket = serverSocket.accept(); + OutputStream serverOutputStream = socket.getOutputStream(); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(optionsResponse))); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(describeResponse))); + serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1)); + serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2)); + serverOutputStream.write( + convertMessageToByteArray(serializeResponse(setupResponse))); + } catch (IOException e) { + sendingException.set(e); + } + }, + "RtspMessageChannelTest:ServerListenThread"); + serverListenThread.start(); + + int serverRtspPortNumber = serverSocket.getLocalPort(); + Uri connectionUri = + Uri.parse(Util.formatInvariant("rtsp://localhost:%d/test", serverRtspPortNumber)); + Socket clientSideSocket = + SocketFactory.getDefault().createSocket(connectionUri.getHost(), connectionUri.getPort()); + + RtspMessageChannel rtspMessageChannel = + new RtspMessageChannel( + new MessageListener() { + @Override + public void onRtspMessageReceived(List message) { + receivedRtspResponses.add(message); + if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) { + receivingFinished.set(true); + } + } + + @Override + public void onInterleavedBinaryDataReceived(byte[] data, int channel) { + receivedInterleavedData.put(channel, Bytes.asList(data)); + } + }); + rtspMessageChannel.openSocket(clientSideSocket); + + RobolectricUtil.runMainLooperUntil(receivingFinished::get); + Util.closeQuietly(rtspMessageChannel); + serverListenThread.join(); + serverSocket.close(); + + assertThat(sendingException.get()).isNull(); + assertThat(receivedRtspResponses) + .containsExactly( + /* optionsResponse */ + ImmutableList.of("RTSP/1.0 200 OK", "CSeq: 2", "Public: OPTIONS", ""), + /* describeResponse */ + ImmutableList.of( + "RTSP/1.0 200 OK", + "CSeq: 3", + "Content-Type: application/sdp", + "Content-Length: 28", + "", + "v=安卓アンドロイド"), + /* setupResponse */ + ImmutableList.of( + "RTSP/1.0 200 OK", "CSeq: 3", "Transport: RTP/AVP/TCP;unicast;interleaved=0-1", "")) + .inOrder(); + assertThat(receivedInterleavedData) + .containsExactly( + /* channel */ 0, + Bytes.asList(Util.getBytesFromHexString("0102030405")), + /* channel */ 1, + Bytes.asList(Util.getBytesFromHexString("AABBCCDD"))); + } +} diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java index a594558ebc..d608192c9d 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMessageUtilTest.java @@ -363,4 +363,28 @@ public final class RtspMessageUtilTest { assertThat(RtspMessageUtil.removeUserInfo(uri)) .isEqualTo(Uri.parse("rtsp://foo.bar:5050/foo.mkv")); } + + @Test + public void parseContentLengthHeader_withContentLengthOver31Bits_succeeds() throws Exception { + String line = "Content-Length: 1000000000000000"; + long contentLength = RtspMessageUtil.parseContentLengthHeader(line); + assertThat(contentLength).isEqualTo(1000000000000000L); + } + + @Test + public void isRtspStartLine_onValidRequestLine_succeeds() { + assertThat(RtspMessageUtil.isRtspStartLine("OPTIONS rtsp://localhost/test RTSP/1.0")).isTrue(); + } + + @Test + public void isRtspStartLine_onValidResponseLine_succeeds() { + assertThat(RtspMessageUtil.isRtspStartLine("RTSP/1.0 456 Header Field Not Valid for Resource")) + .isTrue(); + } + + @Test + public void isRtspStartLine_onValidHeaderLine_succeeds() { + assertThat(RtspMessageUtil.isRtspStartLine("Transport: RTP/AVP;unicast;client_port=1000-1001")) + .isFalse(); + } }