一つのアプローチは、第一延ByteStringのindexedSeqから摺動ペアを作成することで、次の例のように、区切り文字対の識別されたインデックスを使用して、延ByteStringを分割:
import akka.util.ByteString
val bs = ByteString("aa\nbb\n\nxyz")
// bs: akka.util.ByteString = ByteString(97, 97, 10, 98, 98, 10, 10, 120, 121, 122)
val delimiter = 10
// Create sliding pairs from indexedSeq of the ByteString
val slidingList = bs.zipWithIndex.sliding(2).toList
// slidingList: List[scala.collection.immutable.IndexedSeq[(Byte, Int)]] = List(
// Vector((97,0), (97,1)), Vector((97,1), (10,2)), Vector((10,2), (98,3)),
// Vector((98,3), (98,4)), Vector((98,4), (10,5)), Vector((10,5), (10,6)),
// Vector((10,6), (120,7)), Vector((120,7), (121,8)), Vector((121,8), (122,9))
//)
// Get indexes of the delimiter-pair
val dIndex = slidingList.filter{
case Vector(x, y) => x._1 == delimiter && y._1 == delimiter
}.flatMap{
case Vector(x, y) => Seq(x._2, y._2)
}
// Split the ByteString list
val (bs1, bs2) = (bs.splitAt(dIndex(0))._1, bs.splitAt(dIndex(1))._2.tail)
// bs1: akka.util.ByteString = ByteString(97, 97, 10, 98, 98)
// bs2: akka.util.ByteString = ByteString(120, 121, 122)
コピー/再利用できるakka-httpソースコードに(内部の)ストリーミングLineParserがあります。 https://github.com/akka/akka-http/blob/86c56ab41c595b739f36b30bbf3135cdb7e45bba/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/sse/LineParser.scala#L25 – jrudolph