Storage chains: indexing, renewals and reference counting (#8265)

* Transaction indexing

* Tests and fixes

* Fixed a comment

* Style

* Build

* Style

* Apply suggestions from code review

Co-authored-by: cheme <emericchevalier.pro@gmail.com>

* Code review suggestions

* Add missing impl

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* impl JoinInput

* Don't store empty slices

* JoinInput operates on slices

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-03-18 12:46:27 +01:00
committed by GitHub
parent f69f79cc20
commit 4a0d6d9490
22 changed files with 600 additions and 246 deletions
+295 -84
View File
@@ -78,7 +78,7 @@ use sp_runtime::traits::{
use sp_state_machine::{
DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, UsageInfo as StateUsageInfo,
StorageCollection, ChildStorageCollection, OffchainChangesCollection,
backend::Backend as StateBackend, StateMachineStats,
backend::Backend as StateBackend, StateMachineStats, IndexOperation,
};
use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta};
use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTransaction};
@@ -107,7 +107,16 @@ pub type DbState<B> = sp_state_machine::TrieBackend<
const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = [u8; DB_HASH_LEN];
pub type DbHash = sp_core::H256;
/// This is used as block body when storage-chain mode is enabled.
#[derive(Debug, Encode, Decode)]
struct ExtrinsicHeader {
/// Hash of the indexed part
indexed_hash: DbHash, // Zero hash if there's no indexed data
/// The rest of the data.
data: Vec<u8>,
}
/// A reference tracking state.
///
@@ -506,33 +515,47 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => {
match self.transaction_storage {
TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body: {}", err)
)),
},
TransactionStorageMode::StorageChain => {
match Vec::<Block::Hash>::decode(&mut &body[..]) {
Ok(hashes) => {
let extrinsics: ClientResult<Vec<Block::Extrinsic>> = hashes.into_iter().map(
|h| self.extrinsic(&h).and_then(|maybe_ex| maybe_ex.ok_or_else(
|| sp_blockchain::Error::Backend(
format!("Missing transaction: {}", h))))
).collect();
Ok(Some(extrinsics?))
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => body,
None => return Ok(None),
};
match self.transaction_storage {
TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body: {}", err)
)),
},
TransactionStorageMode::StorageChain => {
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(index) => {
let extrinsics: ClientResult<Vec<Block::Extrinsic>> = index.into_iter().map(
| ExtrinsicHeader { indexed_hash, data } | {
let decode_result = if indexed_hash != Default::default() {
match self.db.get(columns::TRANSACTION, indexed_hash.as_ref()) {
Some(t) => {
let mut input = utils::join_input(data.as_ref(), t.as_ref());
Block::Extrinsic::decode(&mut input)
},
None => return Err(sp_blockchain::Error::Backend(
format!("Missing indexed transaction {:?}", indexed_hash))
)
}
} else {
Block::Extrinsic::decode(&mut data.as_ref())
};
decode_result.map_err(|err| sp_blockchain::Error::Backend(
format!("Error decoding extrinsic: {}", err))
)
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
).collect();
Ok(Some(extrinsics?))
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
}
None => Ok(None),
}
}
@@ -564,21 +587,11 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
}
fn extrinsic(&self, hash: &Block::Hash) -> ClientResult<Option<Block::Extrinsic>> {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(ex) => {
match Decode::decode(&mut &ex[..]) {
Ok(ex) => Ok(Some(ex)),
Err(err) => Err(sp_blockchain::Error::Backend(
format!("Error decoding extrinsic {}: {}", hash, err)
)),
}
},
None => Ok(None),
}
fn indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<Option<Vec<u8>>> {
Ok(self.db.get(columns::TRANSACTION, hash.as_ref()))
}
fn have_extrinsic(&self, hash: &Block::Hash) -> ClientResult<bool> {
fn has_indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<bool> {
Ok(self.db.contains(columns::TRANSACTION, hash.as_ref()))
}
}
@@ -681,6 +694,7 @@ pub struct BlockImportOperation<Block: BlockT> {
finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>,
set_head: Option<BlockId<Block>>,
commit_state: bool,
index_ops: Vec<IndexOperation>,
}
impl<Block: BlockT> BlockImportOperation<Block> {
@@ -823,6 +837,11 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
self.set_head = Some(block);
Ok(())
}
fn update_transaction_index(&mut self, index_ops: Vec<IndexOperation>) -> ClientResult<()> {
self.index_ops = index_ops;
Ok(())
}
}
struct StorageDb<Block: BlockT> {
@@ -1155,21 +1174,21 @@ impl<Block: BlockT> Backend<Block> {
if new_canonical <= self.storage.state_db.best_canonical().unwrap_or(0) {
return Ok(())
}
let hash = if new_canonical == number_u64 {
hash
} else {
::sc_client_api::blockchain::HeaderBackend::hash(&self.blockchain, new_canonical.saturated_into())?
.expect("existence of block with number `new_canonical` \
implies existence of blocks with all numbers before it; qed")
sc_client_api::blockchain::HeaderBackend::hash(
&self.blockchain,
new_canonical.saturated_into(),
)?.expect("existence of block with number `new_canonical` \
implies existence of blocks with all numbers before it; qed")
};
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?;
apply_state_commit(transaction, commit);
};
}
Ok(())
}
@@ -1225,20 +1244,14 @@ impl<Block: BlockT> Backend<Block> {
)?;
transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
if let Some(body) = &pending_block.body {
if let Some(body) = pending_block.body {
match self.transaction_storage {
TransactionStorageMode::BlockBody => {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
},
TransactionStorageMode::StorageChain => {
let mut hashes = Vec::with_capacity(body.len());
for extrinsic in body {
let extrinsic = extrinsic.encode();
let hash = HashFor::<Block>::hash(&extrinsic);
transaction.set(columns::TRANSACTION, &hash.as_ref(), &extrinsic);
hashes.push(hash);
}
transaction.set_from_vec(columns::BODY, &lookup_key, hashes.encode());
let body = apply_index_ops::<Block>(&mut transaction, body, operation.index_ops);
transaction.set_from_vec(columns::BODY, &lookup_key, body);
},
}
}
@@ -1491,8 +1504,8 @@ impl<Block: BlockT> Backend<Block> {
}
}
self.prune_blocks(transaction, f_num)?;
let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
self.prune_blocks(transaction, f_num, &new_displaced)?;
match displaced {
x @ &mut None => *x = Some(new_displaced),
&mut Some(ref mut displaced) => displaced.merge(new_displaced),
@@ -1505,47 +1518,83 @@ impl<Block: BlockT> Backend<Block> {
&self,
transaction: &mut Transaction<DbHash>,
finalized: NumberFor<Block>,
displaced: &FinalizationDisplaced<Block::Hash, NumberFor<Block>>,
) -> ClientResult<()> {
if let KeepBlocks::Some(keep_blocks) = self.keep_blocks {
// Always keep the last finalized block
let keep = std::cmp::max(keep_blocks, 1);
if finalized < keep.into() {
return Ok(())
if finalized >= keep.into() {
let number = finalized.saturating_sub(keep.into());
self.prune_block(transaction, BlockId::<Block>::number(number))?;
}
let number = finalized.saturating_sub(keep.into());
match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, BlockId::<Block>::number(number))? {
Some(body) => {
debug!(target: "db", "Removing block #{}", number);
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY,
BlockId::<Block>::number(number),
)?;
match self.transaction_storage {
TransactionStorageMode::BlockBody => {},
TransactionStorageMode::StorageChain => {
match Vec::<Block::Hash>::decode(&mut &body[..]) {
Ok(hashes) => {
for h in hashes {
transaction.remove(columns::TRANSACTION, h.as_ref());
}
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
}
// Also discard all blocks from displaced branches
for h in displaced.leaves() {
let mut number = finalized;
let mut hash = h.clone();
// Follow displaced chains back until we reach a finalized block.
// Since leaves are discarded due to finality, they can't have parents
// that are canonical, but not yet finalized. So we stop deletig as soon as
// we reach canonical chain.
while self.blockchain.hash(number)? != Some(hash.clone()) {
let id = BlockId::<Block>::hash(hash.clone());
match self.blockchain.header(id)? {
Some(header) => {
self.prune_block(transaction, id)?;
number = header.number().saturating_sub(One::one());
hash = header.parent_hash().clone();
},
None => break,
}
}
None => return Ok(()),
}
}
Ok(())
}
fn prune_block(
&self,
transaction: &mut Transaction<DbHash>,
id: BlockId<Block>,
) -> ClientResult<()> {
match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => {
debug!(target: "db", "Removing block #{}", id);
utils::remove_from_db(
transaction,
&*self.storage.db,
columns::KEY_LOOKUP,
columns::BODY,
id,
)?;
match self.transaction_storage {
TransactionStorageMode::BlockBody => {},
TransactionStorageMode::StorageChain => {
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(body) => {
for ExtrinsicHeader { indexed_hash, .. } in body {
if indexed_hash != Default::default() {
transaction.release(
columns::TRANSACTION,
indexed_hash,
);
}
}
}
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body list: {}", err)
)),
}
}
}
}
None => return Ok(()),
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.set_from_vec(columns::STATE, &key[..], val);
@@ -1561,6 +1610,67 @@ fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db
}
}
fn apply_index_ops<Block: BlockT>(
transaction: &mut Transaction<DbHash>,
body: Vec<Block::Extrinsic>,
ops: Vec<IndexOperation>,
) -> Vec<u8> {
let mut extrinsic_headers: Vec<ExtrinsicHeader> = Vec::with_capacity(body.len());
let mut index_map = HashMap::new();
let mut renewed_map = HashMap::new();
for op in ops {
match op {
IndexOperation::Insert { extrinsic, offset } => {
index_map.insert(extrinsic, offset);
}
IndexOperation::Renew { extrinsic, hash, .. } => {
renewed_map.insert(extrinsic, DbHash::from_slice(hash.as_ref()));
}
}
}
for (index, extrinsic) in body.into_iter().enumerate() {
let extrinsic = extrinsic.encode();
let extrinsic_header = if let Some(hash) = renewed_map.get(&(index as u32)) {
// Bump ref counter
transaction.reference(columns::TRANSACTION, DbHash::from_slice(hash.as_ref()));
ExtrinsicHeader {
indexed_hash: hash.clone(),
data: extrinsic,
}
} else {
match index_map.get(&(index as u32)) {
Some(offset) if *offset as usize <= extrinsic.len() => {
let offset = *offset as usize;
let hash = HashFor::<Block>::hash(&extrinsic[offset..]);
transaction.store(
columns::TRANSACTION,
DbHash::from_slice(hash.as_ref()),
extrinsic[offset..].to_vec(),
);
ExtrinsicHeader {
indexed_hash: DbHash::from_slice(hash.as_ref()),
data: extrinsic[..offset].to_vec(),
}
},
_ => {
ExtrinsicHeader {
indexed_hash: Default::default(),
data: extrinsic,
}
}
}
};
extrinsic_headers.push(extrinsic_header);
}
debug!(
target: "db",
"DB transaction index: {} inserted, {} renewed",
index_map.len(),
renewed_map.len()
);
extrinsic_headers.encode()
}
impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: BlockT {
fn insert_aux<
'a,
@@ -1609,6 +1719,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
finalized_blocks: Vec::new(),
set_head: None,
commit_state: false,
index_ops: Default::default(),
})
}
@@ -1998,7 +2109,7 @@ pub(crate) mod tests {
changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
extrinsics_root: H256,
) -> H256 {
insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new())
insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new(), None)
}
pub fn insert_block(
@@ -2008,6 +2119,7 @@ pub(crate) mod tests {
changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
extrinsics_root: H256,
body: Vec<ExtrinsicWrapper<u64>>,
transaction_index: Option<Vec<IndexOperation>>,
) -> H256 {
use sp_runtime::testing::Digest;
@@ -2035,6 +2147,9 @@ pub(crate) mod tests {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, block_id).unwrap();
op.set_block_data(header, Some(body), None, NewBlockState::Best).unwrap();
if let Some(index) = transaction_index {
op.update_transaction_index(index).unwrap();
}
op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap();
backend.commit_operation(op).unwrap();
@@ -2676,7 +2791,7 @@ pub(crate) mod tests {
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0 .. 5 {
let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()]);
let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
blocks.push(hash);
prev_hash = hash;
}
@@ -2697,4 +2812,100 @@ pub(crate) mod tests {
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
}
}
#[test]
fn prune_blocks_on_finalize_with_fork() {
let backend = Backend::<Block>::new_test_with_tx_storage(
2,
10,
TransactionStorageMode::StorageChain
);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
for i in 0 .. 5 {
let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
blocks.push(hash);
prev_hash = hash;
}
// insert a fork at block 2
let fork_hash_root = insert_block(
&backend,
2,
blocks[1],
None,
sp_core::H256::random(),
vec![2.into()],
None
);
insert_block(&backend, 3, fork_hash_root, None, H256::random(), vec![3.into(), 11.into()], None);
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
op.mark_head(BlockId::Hash(blocks[4])).unwrap();
backend.commit_operation(op).unwrap();
for i in 1 .. 5 {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
backend.commit_operation(op).unwrap();
}
let bc = backend.blockchain();
assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap());
assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap());
assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap());
assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap());
}
#[test]
fn renew_transaction_storage() {
let backend = Backend::<Block>::new_test_with_tx_storage(
2,
10,
TransactionStorageMode::StorageChain
);
let mut blocks = Vec::new();
let mut prev_hash = Default::default();
let x1 = ExtrinsicWrapper::from(0u64).encode();
let x1_hash = <HashFor::<Block> as sp_core::Hasher>::hash(&x1[1..]);
for i in 0 .. 10 {
let mut index = Vec::new();
if i == 0 {
index.push(IndexOperation::Insert { extrinsic: 0, offset: 1 });
} else if i < 5 {
// keep renewing 1st
index.push(IndexOperation::Renew {
extrinsic: 0,
hash: x1_hash.as_ref().to_vec(),
size: (x1.len() - 1) as u32,
});
} // else stop renewing
let hash = insert_block(
&backend,
i,
prev_hash,
None,
Default::default(),
vec![i.into()],
Some(index)
);
blocks.push(hash);
prev_hash = hash;
}
for i in 1 .. 10 {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap();
op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap();
backend.commit_operation(op).unwrap();
let bc = backend.blockchain();
if i < 6 {
assert!(bc.indexed_transaction(&x1_hash).unwrap().is_some());
} else {
assert!(bc.indexed_transaction(&x1_hash).unwrap().is_none());
}
}
}
}
+1 -1
View File
@@ -756,7 +756,7 @@ pub(crate) mod tests {
#[test]
fn finalized_ancient_headers_are_replaced_with_cht() {
fn insert_headers<F: Fn(&Hash, u64) -> Header>(header_producer: F) ->
(Arc<sp_database::MemDb<DbHash>>, LightStorage<Block>)
(Arc<sp_database::MemDb>, LightStorage<Block>)
{
let raw_db = Arc::new(sp_database::MemDb::default());
let db = LightStorage::from_kvdb(raw_db.clone()).unwrap();
+8 -4
View File
@@ -33,7 +33,7 @@ fn handle_err<T>(result: parity_db::Result<T>) -> T {
}
/// Wrap parity-db database into a trait object that implements `sp_database::Database`
pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType)
pub fn open<H: Clone + AsRef<[u8]>>(path: &std::path::Path, db_type: DatabaseType)
-> parity_db::Result<std::sync::Arc<dyn Database<H>>>
{
let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8);
@@ -48,7 +48,7 @@ pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType)
Ok(std::sync::Arc::new(DbAdapter(db)))
}
impl<H: Clone> Database<H> for DbAdapter {
impl<H: Clone + AsRef<[u8]>> Database<H> for DbAdapter {
fn commit(&self, transaction: Transaction<H>) -> Result<(), DatabaseError> {
handle_err(self.0.commit(transaction.0.into_iter().map(|change|
match change {
@@ -65,7 +65,11 @@ impl<H: Clone> Database<H> for DbAdapter {
handle_err(self.0.get(col as u8, key))
}
fn lookup(&self, _hash: &H) -> Option<Vec<u8>> {
unimplemented!();
fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
handle_err(self.0.get_size(col as u8, key)).is_some()
}
fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> {
handle_err(self.0.get_size(col as u8, key)).map(|s| s as usize)
}
}
+47 -1
View File
@@ -278,7 +278,7 @@ pub fn open_database<Block: BlockT>(
#[cfg(feature = "with-parity-db")]
DatabaseSettingsSrc::ParityDb { path } => {
crate::parity_db::open(&path, db_type)
.map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))?
.map_err(|e| sp_blockchain::Error::Backend(format!("{}", e)))?
},
#[cfg(not(feature = "with-parity-db"))]
DatabaseSettingsSrc::ParityDb { .. } => {
@@ -449,10 +449,35 @@ impl DatabaseType {
}
}
pub(crate) struct JoinInput<'a, 'b>(&'a [u8], &'b [u8]);
pub(crate) fn join_input<'a, 'b>(i1: &'a[u8], i2: &'b [u8]) -> JoinInput<'a, 'b> {
JoinInput(i1, i2)
}
impl<'a, 'b> codec::Input for JoinInput<'a, 'b> {
fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
Ok(Some(self.0.len() + self.1.len()))
}
fn read(&mut self, into: &mut [u8]) -> Result<(), codec::Error> {
let mut read = 0;
if self.0.len() > 0 {
read = std::cmp::min(self.0.len(), into.len());
self.0.read(&mut into[..read])?;
}
if read < into.len() {
self.1.read(&mut into[read..])?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
use codec::Input;
type Block = RawBlock<ExtrinsicWrapper<u32>>;
#[test]
@@ -469,4 +494,25 @@ mod tests {
assert_eq!(DatabaseType::Full.as_str(), "full");
assert_eq!(DatabaseType::Light.as_str(), "light");
}
#[test]
fn join_input_works() {
let buf1 = [1, 2, 3, 4];
let buf2 = [5, 6, 7, 8];
let mut test = [0, 0, 0];
let mut joined = join_input(buf1.as_ref(), buf2.as_ref());
assert_eq!(joined.remaining_len().unwrap(), Some(8));
joined.read(&mut test).unwrap();
assert_eq!(test, [1, 2, 3]);
assert_eq!(joined.remaining_len().unwrap(), Some(5));
joined.read(&mut test).unwrap();
assert_eq!(test, [4, 5, 6]);
assert_eq!(joined.remaining_len().unwrap(), Some(2));
joined.read(&mut test[0..2]).unwrap();
assert_eq!(test, [7, 8, 6]);
assert_eq!(joined.remaining_len().unwrap(), Some(0));
}
}