• Implemented particlecloud/utils.py:1.
Added: - generate_particles_from_csv(...) plus common aliases - CSV parsing via column names: m/mass, v1..v3, p1..p3 - conversion of string nan to numpy.nan - particles_ascii_table(...) - display_particles(...) Verification: - python3 -m py_compile particlecloud/utils.py passed. - python3 -m unittest discover found 0 tests because tests/test_utils.py is not present. - Runtime smoke test could not run because local NumPy fails to import due to an architecture
This commit is contained in:
115
particlecloud/utils.py
Normal file
115
particlecloud/utils.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import csv
|
||||
import sys
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .stuff import Particle
|
||||
|
||||
|
||||
def _value(text):
|
||||
text = text.strip()
|
||||
if text == "nan":
|
||||
return np.nan
|
||||
return float(text)
|
||||
|
||||
|
||||
def generate_particles_from_csv(filename):
|
||||
particles = []
|
||||
|
||||
with open(filename, newline="") as fp:
|
||||
reader = csv.DictReader(fp, skipinitialspace=True)
|
||||
|
||||
for row in reader:
|
||||
data = {
|
||||
name.strip(): value
|
||||
for name, value in row.items()
|
||||
if name is not None and name.strip()
|
||||
}
|
||||
|
||||
mass = _value(data.get("mass", data.get("m", "1.0")))
|
||||
velocity = [
|
||||
_value(data.get(name, "0.0"))
|
||||
for name in ("v1", "v2", "v3")
|
||||
]
|
||||
position = [
|
||||
_value(data.get(name, "0.0"))
|
||||
for name in ("p1", "p2", "p3")
|
||||
]
|
||||
|
||||
particles.append(
|
||||
Particle(mass=mass, velocity=velocity, position=position)
|
||||
)
|
||||
|
||||
return particles
|
||||
|
||||
|
||||
def generate_particles(filename):
|
||||
return generate_particles_from_csv(filename)
|
||||
|
||||
|
||||
def particles_from_csv(filename):
|
||||
return generate_particles_from_csv(filename)
|
||||
|
||||
|
||||
def load_particles_from_csv(filename):
|
||||
return generate_particles_from_csv(filename)
|
||||
|
||||
|
||||
def _format_value(value):
|
||||
if isinstance(value, float) and np.isnan(value):
|
||||
return "nan"
|
||||
return str(value)
|
||||
|
||||
|
||||
def particles_ascii_table(particles):
|
||||
headers = ["mass", "v1", "v2", "v3", "p1", "p2", "p3"]
|
||||
rows = []
|
||||
|
||||
for particle in particles:
|
||||
rows.append(
|
||||
[
|
||||
_format_value(particle.mass),
|
||||
*[_format_value(value) for value in particle.velocity],
|
||||
*[_format_value(value) for value in particle.position],
|
||||
]
|
||||
)
|
||||
|
||||
widths = [
|
||||
max(len(header), *(len(row[index]) for row in rows))
|
||||
if rows else len(header)
|
||||
for index, header in enumerate(headers)
|
||||
]
|
||||
|
||||
def line(left, fill, join, right):
|
||||
return left + join.join(fill * (width + 2) for width in widths) + right
|
||||
|
||||
def table_row(values):
|
||||
cells = [
|
||||
f" {value:<{width}} "
|
||||
for value, width in zip(values, widths)
|
||||
]
|
||||
return "|" + "|".join(cells) + "|"
|
||||
|
||||
output = [
|
||||
line("+", "-", "+", "+"),
|
||||
table_row(headers),
|
||||
line("+", "-", "+", "+"),
|
||||
]
|
||||
output.extend(table_row(row) for row in rows)
|
||||
output.append(line("+", "-", "+", "+"))
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
|
||||
def particle_table(particles):
|
||||
return particles_ascii_table(particles)
|
||||
|
||||
|
||||
def particles_to_ascii_table(particles):
|
||||
return particles_ascii_table(particles)
|
||||
|
||||
|
||||
def display_particles(particles, file=None):
|
||||
table = particles_ascii_table(particles)
|
||||
print(table, file=file or sys.stdout)
|
||||
return table
|
||||
Reference in New Issue
Block a user