반응형
Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- C++
- 백준알고리즘
- 사칙연산
- 문자열
- 백준
- 프로그래머스
- 논문구현
- 논문리뷰
- 수학
- 그리디알고리즘
- 그리디
- 큐
- 구현
- 소수판정
- 다이나믹프로그래밍
- 프로그래머스sql
- 정렬
- 프로그래머스코딩테스트
- C
- 이분탐색
- 정수론
- SQL
- Image Classification
- 해시를사용한집합과맵
- 프로그래머스연습문제
- MySQL
- 자료구조
- 이진탐색
- 브루트포스알고리즘
- C언어
Archives
- Today
- Total
초보 개발자의 이야기, 릿허브
3D Point Cloud Processing 익히기 본문
728x90
반응형
1. 주어진 point cloud 분류 데이터 (각 샘플은 (N, 3) 포인트, 라벨은 K 클래스)로 다음을 구현하라.
- 전처리: 중심화 (centering) + 스케일 정규화 (scale normalization) + 고정 포인트 수 N으로 샘플링 (random sampling)
- Custom Dataset 및 DataLoader
- kNN 기반 local feature (edge feature) 생성
- 간단한 DGCNN 스타일 분류 모델 구현
- loss (cross entropy) 및 학습/검증 루프 구현 후, validation accuracy 출력
# Colab cell 1: imports, seed
import math
import random
from dataclasses import dataclass
from typing import Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
def set_seed(seed: int = 42) -> None:
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
# Colab cell 2: point cloud preprocessing utilities
def center_and_scale(points: torch.Tensor, eps: float = 1e-6) -> torch.Tensor:
"""
points: (N, 3) 또는 (B, N, 3)
- centering: 평균을 원점으로 이동
- scaling: (각 샘플마다) 원점 기준 최대 거리로 나눔 (단위 구에 가깝게)
"""
if points.dim() == 2:
# (N, 3)
centroid = points.mean(dim=0, keepdim=True) # (1, 3)
centered = points - centroid # (N, 3)
scale = centered.norm(dim=-1).max().clamp(min=eps) # scalar
return centered / scale
elif points.dim() == 3:
# (B, N, 3)
centroid = points.mean(dim=1, keepdim=True) # (B, 1, 3)
centered = points - centroid # (B, N, 3)
scale = centered.norm(dim=-1).max(dim=1, keepdim=True).values # (B, 1)
scale = scale.unsqueeze(-1).clamp(min=eps) # (B, 1, 1)
return centered / scale
else:
raise ValueError("points must have shape (N,3) or (B,N,3)")
def random_sample(points: torch.Tensor, n_points: int) -> torch.Tensor:
"""
points: (N, 3)
n_points: 고정 포인트 수
- N >= n_points: 무작위로 n_points개 선택
- N < n_points: 부족한 만큼 중복 샘플링 (시험에서 자주 필요)
"""
N = points.size(0)
if N == n_points:
return points
if N > n_points:
idx = torch.randperm(N)[:n_points]
return points[idx]
# N < n_points
pad = n_points - N
extra_idx = torch.randint(low=0, high=N, size=(pad,))
return torch.cat([points, points[extra_idx]], dim=0)
@torch.no_grad()
def pairwise_sq_dist(x: torch.Tensor) -> torch.Tensor:
"""
x: (B, N, C)
return: (B, N, N) squared distance
- 메모리 사용이 큰 편이라 N이 커지면 병목
- 시험에서는 N이 보통 512~2048 정도로 제한됨
"""
# (x - y)^2 = x^2 + y^2 - 2xy
# x2: (B, N, 1), y2: (B, 1, N), xy: (B, N, N)
x2 = (x * x).sum(dim=-1, keepdim=True)
y2 = x2.transpose(1, 2)
xy = x @ x.transpose(1, 2)
dist = x2 + y2 - 2.0 * xy
return dist.clamp(min=0.0)
def knn_indices(x: torch.Tensor, k: int) -> torch.Tensor:
"""
x: (B, N, C)
return: idx (B, N, k)
- 자기 자신 (거리 0) 포함되므로, 보통 k+1 뽑고 첫 번째 제거
"""
dist = pairwise_sq_dist(x) # (B, N, N)
# 가장 가까운 순서로 k+1개 (자기 자신 포함) -> 자기 자신 제외
idx = dist.topk(k=k+1, dim=-1, largest=False).indices # (B, N, k+1)
return idx[..., 1:] # (B, N, k)
# Colab cell 3: edge feature (DGCNN style)
def batched_index_select(feat: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
"""
feat: (B, N, C)
idx: (B, N, k)
return: gathered (B, N, k, C)
torch.gather는 같은 rank에서 동작하므로 idx를 (B, N, k, C)로 확장해 사용
"""
B, N, C = feat.shape
k = idx.size(-1)
idx_expand = idx.unsqueeze(-1).expand(B, N, k, C) # (B, N, k, C)
feat_expand = feat.unsqueeze(2).expand(B, N, k, C) # (B, N, k, C)
# gather는 dim 기준으로 idx가 참조하는 축이 필요
# 여기서는 N 축을 선택해야 하므로, feat를 (B, N, 1, C)로 만든 뒤
# dim=1에서 gather하면 깔끔하지만 rank가 맞지 않음
# 가장 안전한 방식: feat를 (B, N, C) -> (B, 1, N, C)로 바꾸고 dim=2에서 gather
feat_ = feat.unsqueeze(1).expand(B, N, N, C) # (B, N, N, C)
# idx_expand는 (B, N, k, C)인데, gather할 dim=2에 맞춰야 하므로 그대로 사용 가능
gathered = torch.gather(feat_, dim=2, index=idx_expand) # (B, N, k, C)
return gathered
def edge_feature(x: torch.Tensor, k: int) -> torch.Tensor:
"""
x: (B, N, C)
return: edge (B, N, k, 2C)
- edge = concat( x_center, x_neighbor - x_center )
"""
idx = knn_indices(x, k=k) # (B, N, k)
neigh = batched_index_select(x, idx) # (B, N, k, C)
center = x.unsqueeze(2).expand_as(neigh) # (B, N, k, C)
edge = torch.cat([center, neigh - center], dim=-1) # (B, N, k, 2C)
return edge
# Colab cell 4: synthetic dataset (classification)
def make_synthetic_cloud(label: int, n_raw: int = 1024) -> torch.Tensor:
"""
label 0: 구 (sphere-like)
label 1: 평면 (plane-like)
label 2: 원기둥 (cylinder-like)
- 파일에서 읽어오면 됨. 여기선 파이프라인 검증용.
"""
if label == 0:
# sphere: 랜덤 방향 + 반지름 약간 변형
v = torch.randn(n_raw, 3)
v = v / (v.norm(dim=-1, keepdim=True) + 1e-6)
r = 1.0 + 0.05 * torch.randn(n_raw, 1)
pts = v * r
elif label == 1:
# plane: z ~ 0
xy = torch.randn(n_raw, 2)
z = 0.02 * torch.randn(n_raw, 1)
pts = torch.cat([xy, z], dim=-1)
else:
# cylinder: x^2 + y^2 ~ 1, z 랜덤
theta = 2 * math.pi * torch.rand(n_raw, 1)
x = torch.cos(theta)
y = torch.sin(theta)
z = 0.5 * torch.randn(n_raw, 1)
pts = torch.cat([x, y, z], dim=-1)
pts = pts + 0.02 * torch.randn_like(pts)
return pts.float()
class PointCloudClsDataset(Dataset):
def __init__(self, n_samples: int, n_points: int, n_classes: int = 3):
self.n_samples = n_samples
self.n_points = n_points
self.n_classes = n_classes
# 미리 생성 (실제 사용 시, load 후 리스트에 저장)
self.labels = torch.randint(low=0, high=n_classes, size=(n_samples,))
self.clouds = [make_synthetic_cloud(int(y), n_raw=1024) for y in self.labels]
def __len__(self) -> int:
return self.n_samples
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
pts = self.clouds[idx] # (N_raw, 3)
pts = center_and_scale(pts) # (N_raw, 3)
pts = random_sample(pts, self.n_points) # (n_points, 3)
y = self.labels[idx].long() # ()
return pts, y
def cls_collate(batch):
# batch: list of (points (N,3), label ())
pts = torch.stack([b[0] for b in batch], dim=0) # (B, N, 3)
y = torch.stack([b[1] for b in batch], dim=0) # (B,)
return pts, y
# Colab cell 5: DGCNN-style classifier (minimal)
class EdgeConvBlock(nn.Module):
def __init__(self, in_channels: int, out_channels: int):
super().__init__()
self.mlp = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
)
def forward(self, edge: torch.Tensor) -> torch.Tensor:
"""
edge: (B, N, k, Cin) but Conv2d expects (B, Cin, N, k)
return: (B, Cout, N, k)
"""
x = edge.permute(0, 3, 1, 2).contiguous()
return self.mlp(x)
class DGCNNClassifier(nn.Module):
def __init__(self, k: int = 16, num_classes: int = 3, emb_dim: int = 128):
super().__init__()
self.k = k
# 입력 feature는 xyz 3차원
# edge feature는 2C -> 6 채널
self.ec1 = EdgeConvBlock(in_channels=6, out_channels=64)
self.ec2 = EdgeConvBlock(in_channels=128, out_channels=64) # 2*64
self.ec3 = EdgeConvBlock(in_channels=128, out_channels=emb_dim)
self.cls_head = nn.Sequential(
nn.Linear(64 + 64 + emb_dim, 256, bias=False),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(p=0.2),
nn.Linear(256, num_classes),
)
def forward(self, pts: torch.Tensor) -> torch.Tensor:
"""
pts: (B, N, 3)
return logits: (B, num_classes)
"""
B, N, _ = pts.shape
# 1) edge conv block 1
e1 = edge_feature(pts, k=self.k) # (B, N, k, 6)
f1 = self.ec1(e1) # (B, 64, N, k)
f1 = f1.max(dim=-1).values # neighbor max -> (B, 64, N)
# 2) block 2 uses feature space knn (common in DGCNN)
x1 = f1.permute(0, 2, 1).contiguous() # (B, N, 64)
e2 = edge_feature(x1, k=self.k) # (B, N, k, 128)
f2 = self.ec2(e2) # (B, 64, N, k)
f2 = f2.max(dim=-1).values # (B, 64, N)
# 3) block 3
x2 = f2.permute(0, 2, 1).contiguous() # (B, N, 64)
e3 = edge_feature(x2, k=self.k) # (B, N, k, 128)
f3 = self.ec3(e3) # (B, emb, N, k)
f3 = f3.max(dim=-1).values # (B, emb, N)
# global pooling over points
g1 = f1.max(dim=-1).values # (B, 64)
g2 = f2.max(dim=-1).values # (B, 64)
g3 = f3.max(dim=-1).values # (B, emb)
g = torch.cat([g1, g2, g3], dim=-1) # (B, 64+64+emb)
logits = self.cls_head(g) # (B, num_classes)
return logits
# Colab cell 6: train / eval loop
@dataclass
class TrainConfig:
n_points: int = 512
k: int = 16
num_classes: int = 3
batch_size: int = 16
lr: float = 1e-3
epochs: int = 8
def accuracy(logits: torch.Tensor, y: torch.Tensor) -> float:
pred = logits.argmax(dim=-1)
return (pred == y).float().mean().item()
def train_one_epoch(model, loader, optim):
model.train()
total_loss = 0.0
total_acc = 0.0
for pts, y in loader:
pts = pts.to(device)
y = y.to(device)
logits = model(pts)
loss = F.cross_entropy(logits, y)
optim.zero_grad()
loss.backward()
optim.step()
total_loss += loss.item()
total_acc += accuracy(logits.detach(), y)
return total_loss / len(loader), total_acc / len(loader)
@torch.no_grad()
def eval_one_epoch(model, loader):
model.eval()
total_loss = 0.0
total_acc = 0.0
for pts, y in loader:
pts = pts.to(device)
y = y.to(device)
logits = model(pts)
loss = F.cross_entropy(logits, y)
total_loss += loss.item()
total_acc += accuracy(logits, y)
return total_loss / len(loader), total_acc / len(loader)
cfg = TrainConfig()
train_ds = PointCloudClsDataset(n_samples=400, n_points=cfg.n_points, n_classes=cfg.num_classes)
val_ds = PointCloudClsDataset(n_samples=120, n_points=cfg.n_points, n_classes=cfg.num_classes)
train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True, collate_fn=cls_collate, num_workers=0)
val_loader = DataLoader(val_ds, batch_size=cfg.batch_size, shuffle=False, collate_fn=cls_collate, num_workers=0)
model = DGCNNClassifier(k=cfg.k, num_classes=cfg.num_classes).to(device)
optim = torch.optim.Adam(model.parameters(), lr=cfg.lr)
for epoch in range(1, cfg.epochs + 1):
tr_loss, tr_acc = train_one_epoch(model, train_loader, optim)
va_loss, va_acc = eval_one_epoch(model, val_loader)
print(f"epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.3f} | val loss {va_loss:.4f} acc {va_acc:.3f}")
- 샘플마다 포인트 수가 다를 때 dataloader에서 터지는 문제 (fixed N sampling으로 해결)
- knn 이후 gather shape 정리 (B, N, k, C) 만들기
- edge feature 차원 변환 (Conv2d 입력은 (B, C, N, k))
- segmentation이면 CE 입력 shape (B, C, N) 또는 flatten (B·N, C)로 바꿔야 함
2. 주어진 point cloud segmentation 데이터 (각 샘플 (N, 3), 각 포인트 라벨 (N,) )에 대해 다음을 구현하라.
- 전처리 및 고정 포인트 수 처리
- Custom Dataset 및 DataLoader
- PointNet 스타일 segmentation 모델 구현 (global feature를 per-point feature에 concat)
- per-point cross entropy loss 구현 (shape 정리 포함)
- 학습 후 validation per-point accuracy 출력
# Colab cell 7: synthetic segmentation dataset
def make_synthetic_seg_cloud(n_raw: int = 1024) -> Tuple[torch.Tensor, torch.Tensor]:
"""
간단한 segmentation 태스크:
- 구 표면 포인트를 만들고
- z > 0 이면 class 1, else class 0 (반구 분할)
"""
v = torch.randn(n_raw, 3)
v = v / (v.norm(dim=-1, keepdim=True) + 1e-6)
r = 1.0 + 0.02 * torch.randn(n_raw, 1)
pts = (v * r).float()
labels = (pts[:, 2] > 0).long() # (N_raw,)
return pts, labels
class PointCloudSegDataset(Dataset):
def __init__(self, n_samples: int, n_points: int):
self.n_samples = n_samples
self.n_points = n_points
self.data = [make_synthetic_seg_cloud(1024) for _ in range(n_samples)]
def __len__(self) -> int:
return self.n_samples
def __getitem__(self, idx: int):
pts, y = self.data[idx] # pts (N_raw,3), y (N_raw,)
pts = center_and_scale(pts)
# fixed N sampling 시, labels도 같은 idx로 뽑아야 함
N = pts.size(0)
if N >= self.n_points:
idxs = torch.randperm(N)[:self.n_points]
else:
pad = self.n_points - N
extra = torch.randint(low=0, high=N, size=(pad,))
idxs = torch.cat([torch.arange(N), extra], dim=0)
pts = pts[idxs] # (n_points,3)
y = y[idxs] # (n_points,)
return pts, y
def seg_collate(batch):
pts = torch.stack([b[0] for b in batch], dim=0) # (B,N,3)
y = torch.stack([b[1] for b in batch], dim=0) # (B,N)
return pts, y
# Colab cell 8: PointNet segmentation model
class PointNetSeg(nn.Module):
def __init__(self, num_classes: int = 2):
super().__init__()
# per-point feature extractor (shared MLP)
self.mlp1 = nn.Sequential(
nn.Conv1d(3, 64, 1, bias=False),
nn.BatchNorm1d(64),
nn.ReLU(inplace=True),
nn.Conv1d(64, 128, 1, bias=False),
nn.BatchNorm1d(128),
nn.ReLU(inplace=True),
)
# global feature
self.mlp_global = nn.Sequential(
nn.Conv1d(128, 256, 1, bias=False),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
)
# segmentation head: concat (local 128 + global 256) = 384
self.seg_head = nn.Sequential(
nn.Conv1d(384, 256, 1, bias=False),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(p=0.2),
nn.Conv1d(256, num_classes, 1),
)
def forward(self, pts: torch.Tensor) -> torch.Tensor:
"""
pts: (B, N, 3)
return logits: (B, N, num_classes)
"""
x = pts.permute(0, 2, 1).contiguous() # (B,3,N)
local = self.mlp1(x) # (B,128,N)
g = self.mlp_global(local) # (B,256,N)
# global max pooling over points
gmax = g.max(dim=-1, keepdim=True).values # (B,256,1)
gmax = gmax.expand(-1, -1, local.size(-1)) # (B,256,N)
feat = torch.cat([local, gmax], dim=1) # (B,384,N)
out = self.seg_head(feat) # (B,C,N)
return out.permute(0, 2, 1).contiguous() # (B,N,C)
# Colab cell 9: segmentation train / eval
def per_point_accuracy(logits: torch.Tensor, y: torch.Tensor) -> float:
# logits (B,N,C), y (B,N)
pred = logits.argmax(dim=-1)
return (pred == y).float().mean().item()
def train_one_epoch_seg(model, loader, optim):
model.train()
total_loss = 0.0
total_acc = 0.0
for pts, y in loader:
pts = pts.to(device)
y = y.to(device)
logits = model(pts) # (B,N,C)
# CE는 입력이 (B,C,...) 형태를 선호하므로 reshape 또는 permute가 필요
# 방법 1: flatten
B, N, C = logits.shape
loss = F.cross_entropy(logits.view(B * N, C), y.view(B * N))
optim.zero_grad()
loss.backward()
optim.step()
total_loss += loss.item()
total_acc += per_point_accuracy(logits.detach(), y)
return total_loss / len(loader), total_acc / len(loader)
@torch.no_grad()
def eval_one_epoch_seg(model, loader):
model.eval()
total_loss = 0.0
total_acc = 0.0
for pts, y in loader:
pts = pts.to(device)
y = y.to(device)
logits = model(pts)
B, N, C = logits.shape
loss = F.cross_entropy(logits.view(B * N, C), y.view(B * N))
total_loss += loss.item()
total_acc += per_point_accuracy(logits, y)
return total_loss / len(loader), total_acc / len(loader)
train_ds = PointCloudSegDataset(n_samples=300, n_points=512)
val_ds = PointCloudSegDataset(n_samples=100, n_points=512)
train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=seg_collate, num_workers=0)
val_loader = DataLoader(val_ds, batch_size=16, shuffle=False, collate_fn=seg_collate, num_workers=0)
model = PointNetSeg(num_classes=2).to(device)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(1, 7):
tr_loss, tr_acc = train_one_epoch_seg(model, train_loader, optim)
va_loss, va_acc = eval_one_epoch_seg(model, val_loader)
print(f"epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.3f} | val loss {va_loss:.4f} acc {va_acc:.3f}")
728x90
반응형
'코딩테스트 > 📕 기타(ETC)' 카테고리의 다른 글
| [명품 JAVA Programming] 6. 모듈과 패키지 개념 실습문제 풀이 (0) | 2022.11.15 |
|---|