implement simple work stealing multithreading on a per pixel basis
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Patrick Michl 2021-03-15 21:53:23 +01:00
parent 75b025d2ac
commit 0d813cfb6f
4 changed files with 434 additions and 24 deletions

325
Cargo.lock generated
View File

@ -1,17 +1,185 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler32"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.2.1" version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bytemuck"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bed57e2090563b83ba8f83366628ce535a7584c9afa4c9fc0612a03925c6df58"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "color_quant"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
[[package]]
name = "crc32fast"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
name = "deflate"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73770f8e1fe7d64df17ca66ad28994a0a623ea497fa69486e14984e715c5d174"
dependencies = [
"adler32",
"byteorder",
]
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "gif"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02efba560f227847cb41463a7395c514d127d4f74fff12ef0137fff1b84b96c4"
dependencies = [
"color_quant",
"weezl",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c"
dependencies = [
"libc",
]
[[package]]
name = "image"
version = "0.23.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24ffcb7e7244a9bf19d35bf2883b9c080c4ced3c07a9895572178cdb8f13f6a1"
dependencies = [
"bytemuck",
"byteorder",
"color_quant",
"gif",
"jpeg-decoder",
"num-iter",
"num-rational",
"num-traits",
"png",
"scoped_threadpool",
"tiff",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.9" version = "0.1.9"
@ -21,6 +189,21 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "jpeg-decoder"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "229d53d58899083193af11e15917b5640cd40b29ff475a1fe4ef725deb02d0f2"
dependencies = [
"rayon",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.88" version = "0.2.88"
@ -45,6 +228,85 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "memoffset"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435"
dependencies = [
"adler32",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.1" version = "0.11.1"
@ -74,7 +336,22 @@ dependencies = [
name = "pixelrush" name = "pixelrush"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam",
"image",
"r2d2", "r2d2",
"rayon",
]
[[package]]
name = "png"
version = "0.16.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3287920cb847dee3de33d301c463fba14dda99db24214ddf93f83d3021f4c6"
dependencies = [
"bitflags",
"crc32fast",
"deflate",
"miniz_oxide 0.3.7",
] ]
[[package]] [[package]]
@ -88,6 +365,31 @@ dependencies = [
"scheduled-thread-pool", "scheduled-thread-pool",
] ]
[[package]]
name = "rayon"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b0d8e0819fadc20c74ea8373106ead0600e3a67ef1fe8da56e39b9ae7275674"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.5" version = "0.2.5"
@ -106,6 +408,12 @@ dependencies = [
"parking_lot", "parking_lot",
] ]
[[package]]
name = "scoped_threadpool"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d51f5df5af43ab3f1360b429fa5e0152ac5ce8c0bd6485cae490332e96846a8"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -118,6 +426,23 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "tiff"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a53f4706d65497df0c4349241deddf35f84cee19c87ed86ea8ca590f4464437"
dependencies = [
"jpeg-decoder",
"miniz_oxide 0.4.4",
"weezl",
]
[[package]]
name = "weezl"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a32b378380f4e9869b22f0b5177c68a5519f03b3454fde0b291455ddbae266c"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View File

@ -8,3 +8,6 @@ edition = "2018"
[dependencies] [dependencies]
r2d2 = "*" r2d2 = "*"
image = "*"
rayon = "*"
crossbeam = "*"

View File

@ -1,11 +1,59 @@
mod pixelflut; mod pixelflut;
use std::{
io::{self, BufReader, Read},
thread::JoinHandle,
};
use crossbeam::channel::*;
use pixelflut::*; use pixelflut::*;
enum Message {
Pixel(u32, u32, image::Rgba<u8>),
Exit,
}
fn main() -> Result<(), std::io::Error> { fn main() -> Result<(), std::io::Error> {
let mut stdin_reader = BufReader::new(io::stdin());
let mut image_buffer: Vec<u8> = Vec::new();
let _ = stdin_reader.read_to_end(&mut image_buffer)?;
let mut image = image::load_from_memory(&image_buffer).expect("Unable to read image format");
let manager = PixelflutConnectionManager::new("localhost:1234")?; let manager = PixelflutConnectionManager::new("localhost:1234")?;
let pool = r2d2::Pool::builder() let pool = r2d2::Pool::builder()
.max_size(15) .max_size(16)
.build(manager).expect("Could not initialize connection pool"); .build(manager)
.expect("Could not initialize connection pool");
let image = image.as_rgba8().expect("Not a valid Image");
let (sender, receiver) = unbounded::<Message>();
let mut handles: Vec<JoinHandle<()>> = Vec::new();
for _ in 0..16 {
let r2 = receiver.clone();
let p2 = pool.clone();
let handle = std::thread::spawn(move || {
let mut connection = match p2.get() {
Ok(c) => c,
Err(_) => return,
};
loop {
match r2.recv().unwrap() {
Message::Pixel(x, y, c) => {
let _ = connection.set(x, y, c);
}
Message::Exit => return,
}
}
});
handles.push(handle);
}
for (x, y, p) in image.enumerate_pixels() {
let _ = sender.send(Message::Pixel(x, y, p.clone()));
}
for thread in handles {
let _ = thread.join();
}
Ok(()) Ok(())
} }

View File

@ -1,52 +1,87 @@
use core::fmt;
use r2d2::ManageConnection; use r2d2::ManageConnection;
use std::{io::Error, net::{SocketAddr, TcpStream}}; use std::io::{Read, Write};
use std::io::Write; use std::{
io::Error,
net::{SocketAddr, TcpStream},
};
pub struct PixelflutConnectionManager { pub struct PixelflutConnectionManager {
addr: SocketAddr addr: SocketAddr,
} }
pub struct PixelflutConnection { pub struct PixelflutConnection {
stream: TcpStream stream: TcpStream,
} }
impl PixelflutConnectionManager { impl PixelflutConnectionManager {
pub fn new(addr: &str) -> Result<Self, Error> { pub fn new(addr: &str) -> Result<Self, Error> {
let addr: SocketAddr = match addr.parse() { let addr: SocketAddr = match addr.parse() {
Ok(addr) => addr, Ok(addr) => addr,
Err(e) => panic!("Error in parsing the socket address: {}", e) Err(e) => panic!("Error in parsing the socket address: {}", e),
}; };
Ok(Self { addr }) Ok(Self { addr })
} }
fn get_connection(&self) -> Result<PixelflutConnection, Error> { fn get_connection(&self) -> Result<PixelflutConnection, Error> {
let stream = TcpStream::connect(self.addr)?; let stream = TcpStream::connect(self.addr)?;
stream.set_nodelay(true)?;
Ok(PixelflutConnection { stream }) Ok(PixelflutConnection { stream })
} }
} }
pub struct Rgba(pub u32); pub struct Rgba(u8, u8, u8, u8);
impl fmt::Display for Rgba {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:02X}{:02X}{:02X}",
self.0, self.1, self.2, self.3
)
}
}
impl From<u32> for Rgba { impl From<u32> for Rgba {
fn from(a: u32) -> Self { fn from(a: u32) -> Self {
Self(a) Self((a >> 24) as u8, (a >> 16) as u8, (a >> 8) as u8, a as u8)
}
}
impl From<image::Rgba<u8>> for Rgba {
fn from(c: image::Rgba<u8>) -> Self {
Self(c.0[0], c.0[1], c.0[2], c.0[3])
} }
} }
impl PixelflutConnection { impl PixelflutConnection {
#[inline] pub fn set(&mut self, x: u32, y: u32, c: impl Into<Rgba>) -> Result<(), Error> {
pub fn new(addr: SocketAddr) -> Result<Self, Error> { let msg = format!("PX {} {} {}\n", x, y, c.into());
let stream = TcpStream::connect(addr)?; self.write_command(&msg)
Ok(Self {stream})
} }
pub fn set(&mut self, x: usize, y: usize, c: impl Into<Rgba>) { pub fn get(&mut self, x: u32, y: u32) -> Result<Rgba, Error> {
let msg = format!("PX {} {} {}\n", x,y,12); let msg = format!("PX {} {}\n", x, y);
self.stream.write(&msg[..].as_bytes()); let mut out = String::new();
}
self.write_command(&msg)?;
self.stream.read_to_string(&mut out);
todo!()
} }
pub fn size(&mut self) -> Result<(usize, usize), Error> {
let mut out = String::new();
self.write_command("SIZE\n")?;
self.stream.read_to_string(&mut out);
todo!()
}
fn write_command(&mut self, cmd: &str) -> Result<(), Error> {
let _ = self.stream.write(&cmd[..].as_bytes())?;
Ok(())
}
}
impl ManageConnection for PixelflutConnectionManager { impl ManageConnection for PixelflutConnectionManager {
type Connection = PixelflutConnection; type Connection = PixelflutConnection;
@ -57,11 +92,10 @@ impl ManageConnection for PixelflutConnectionManager {
} }
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
todo!() Ok(())
} }
fn has_broken(&self, conn: &mut Self::Connection) -> bool { fn has_broken(&self, _: &mut Self::Connection) -> bool {
false false
} }
} }