초보 개발자의 이야기, 릿허브

3D Point Cloud Processing 익히기 본문

코딩테스트/📕 기타(ETC)

3D Point Cloud Processing 익히기

릿99 2025. 12. 18. 22:47
728x90
반응형

1. 주어진 point cloud 분류 데이터 (각 샘플은 (N, 3) 포인트, 라벨은 K 클래스)로 다음을 구현하라.

  1. 전처리: 중심화 (centering) + 스케일 정규화 (scale normalization) + 고정 포인트 수 N으로 샘플링 (random sampling)
  2. Custom Dataset 및 DataLoader
  3. kNN 기반 local feature (edge feature) 생성
  4. 간단한 DGCNN 스타일 분류 모델 구현 
  5. loss (cross entropy) 및 학습/검증 루프 구현 후, validation accuracy 출력

 


# Colab cell 1: imports, seed
import math
import random
from dataclasses import dataclass
from typing import Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

def set_seed(seed: int = 42) -> None:
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
# Colab cell 2: point cloud preprocessing utilities

def center_and_scale(points: torch.Tensor, eps: float = 1e-6) -> torch.Tensor:
    """
    points: (N, 3) 또는 (B, N, 3)
    - centering: 평균을 원점으로 이동
    - scaling: (각 샘플마다) 원점 기준 최대 거리로 나눔 (단위 구에 가깝게)
    """
    if points.dim() == 2:
        # (N, 3)
        centroid = points.mean(dim=0, keepdim=True)             # (1, 3)
        centered = points - centroid                             # (N, 3)
        scale = centered.norm(dim=-1).max().clamp(min=eps)       # scalar
        return centered / scale
    elif points.dim() == 3:
        # (B, N, 3)
        centroid = points.mean(dim=1, keepdim=True)              # (B, 1, 3)
        centered = points - centroid                             # (B, N, 3)
        scale = centered.norm(dim=-1).max(dim=1, keepdim=True).values  # (B, 1)
        scale = scale.unsqueeze(-1).clamp(min=eps)               # (B, 1, 1)
        return centered / scale
    else:
        raise ValueError("points must have shape (N,3) or (B,N,3)")

def random_sample(points: torch.Tensor, n_points: int) -> torch.Tensor:
    """
    points: (N, 3)
    n_points: 고정 포인트 수
    - N >= n_points: 무작위로 n_points개 선택
    - N < n_points: 부족한 만큼 중복 샘플링 (시험에서 자주 필요)
    """
    N = points.size(0)
    if N == n_points:
        return points
    if N > n_points:
        idx = torch.randperm(N)[:n_points]
        return points[idx]
    # N < n_points
    pad = n_points - N
    extra_idx = torch.randint(low=0, high=N, size=(pad,))
    return torch.cat([points, points[extra_idx]], dim=0)

@torch.no_grad()
def pairwise_sq_dist(x: torch.Tensor) -> torch.Tensor:
    """
    x: (B, N, C)
    return: (B, N, N) squared distance
    - 메모리 사용이 큰 편이라 N이 커지면 병목
    - 시험에서는 N이 보통 512~2048 정도로 제한됨
    """
    # (x - y)^2 = x^2 + y^2 - 2xy
    # x2: (B, N, 1), y2: (B, 1, N), xy: (B, N, N)
    x2 = (x * x).sum(dim=-1, keepdim=True)
    y2 = x2.transpose(1, 2)
    xy = x @ x.transpose(1, 2)
    dist = x2 + y2 - 2.0 * xy
    return dist.clamp(min=0.0)

def knn_indices(x: torch.Tensor, k: int) -> torch.Tensor:
    """
    x: (B, N, C)
    return: idx (B, N, k)
    - 자기 자신 (거리 0) 포함되므로, 보통 k+1 뽑고 첫 번째 제거
    """
    dist = pairwise_sq_dist(x)  # (B, N, N)
    # 가장 가까운 순서로 k+1개 (자기 자신 포함) -> 자기 자신 제외
    idx = dist.topk(k=k+1, dim=-1, largest=False).indices  # (B, N, k+1)
    return idx[..., 1:]  # (B, N, k)
# Colab cell 3: edge feature (DGCNN style)

def batched_index_select(feat: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
    """
    feat: (B, N, C)
    idx: (B, N, k)
    return: gathered (B, N, k, C)

    torch.gather는 같은 rank에서 동작하므로 idx를 (B, N, k, C)로 확장해 사용
    """
    B, N, C = feat.shape
    k = idx.size(-1)

    idx_expand = idx.unsqueeze(-1).expand(B, N, k, C)  # (B, N, k, C)
    feat_expand = feat.unsqueeze(2).expand(B, N, k, C) # (B, N, k, C)

    # gather는 dim 기준으로 idx가 참조하는 축이 필요
    # 여기서는 N 축을 선택해야 하므로, feat를 (B, N, 1, C)로 만든 뒤
    # dim=1에서 gather하면 깔끔하지만 rank가 맞지 않음
    # 가장 안전한 방식: feat를 (B, N, C) -> (B, 1, N, C)로 바꾸고 dim=2에서 gather
    feat_ = feat.unsqueeze(1).expand(B, N, N, C)       # (B, N, N, C)
    # idx_expand는 (B, N, k, C)인데, gather할 dim=2에 맞춰야 하므로 그대로 사용 가능
    gathered = torch.gather(feat_, dim=2, index=idx_expand)  # (B, N, k, C)
    return gathered

def edge_feature(x: torch.Tensor, k: int) -> torch.Tensor:
    """
    x: (B, N, C)
    return: edge (B, N, k, 2C)
    - edge = concat( x_center, x_neighbor - x_center )
    """
    idx = knn_indices(x, k=k)               # (B, N, k)
    neigh = batched_index_select(x, idx)    # (B, N, k, C)
    center = x.unsqueeze(2).expand_as(neigh)  # (B, N, k, C)
    edge = torch.cat([center, neigh - center], dim=-1)       # (B, N, k, 2C)
    return edge
# Colab cell 4: synthetic dataset (classification)

def make_synthetic_cloud(label: int, n_raw: int = 1024) -> torch.Tensor:
    """
    label 0: 구 (sphere-like)
    label 1: 평면 (plane-like)
    label 2: 원기둥 (cylinder-like)
    - 파일에서 읽어오면 됨. 여기선 파이프라인 검증용.
    """
    if label == 0:
        # sphere: 랜덤 방향 + 반지름 약간 변형
        v = torch.randn(n_raw, 3)
        v = v / (v.norm(dim=-1, keepdim=True) + 1e-6)
        r = 1.0 + 0.05 * torch.randn(n_raw, 1)
        pts = v * r
    elif label == 1:
        # plane: z ~ 0
        xy = torch.randn(n_raw, 2)
        z = 0.02 * torch.randn(n_raw, 1)
        pts = torch.cat([xy, z], dim=-1)
    else:
        # cylinder: x^2 + y^2 ~ 1, z 랜덤
        theta = 2 * math.pi * torch.rand(n_raw, 1)
        x = torch.cos(theta)
        y = torch.sin(theta)
        z = 0.5 * torch.randn(n_raw, 1)
        pts = torch.cat([x, y, z], dim=-1)
        pts = pts + 0.02 * torch.randn_like(pts)

    return pts.float()

class PointCloudClsDataset(Dataset):
    def __init__(self, n_samples: int, n_points: int, n_classes: int = 3):
        self.n_samples = n_samples
        self.n_points = n_points
        self.n_classes = n_classes

        # 미리 생성 (실제 사용 시, load 후 리스트에 저장)
        self.labels = torch.randint(low=0, high=n_classes, size=(n_samples,))
        self.clouds = [make_synthetic_cloud(int(y), n_raw=1024) for y in self.labels]

    def __len__(self) -> int:
        return self.n_samples

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        pts = self.clouds[idx]                     # (N_raw, 3)
        pts = center_and_scale(pts)                # (N_raw, 3)
        pts = random_sample(pts, self.n_points)    # (n_points, 3)
        y = self.labels[idx].long()                # ()
        return pts, y

def cls_collate(batch):
    # batch: list of (points (N,3), label ())
    pts = torch.stack([b[0] for b in batch], dim=0)   # (B, N, 3)
    y = torch.stack([b[1] for b in batch], dim=0)     # (B,)
    return pts, y
# Colab cell 5: DGCNN-style classifier (minimal)

class EdgeConvBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, edge: torch.Tensor) -> torch.Tensor:
        """
        edge: (B, N, k, Cin) but Conv2d expects (B, Cin, N, k)
        return: (B, Cout, N, k)
        """
        x = edge.permute(0, 3, 1, 2).contiguous()
        return self.mlp(x)

class DGCNNClassifier(nn.Module):
    def __init__(self, k: int = 16, num_classes: int = 3, emb_dim: int = 128):
        super().__init__()
        self.k = k

        # 입력 feature는 xyz 3차원
        # edge feature는 2C -> 6 채널
        self.ec1 = EdgeConvBlock(in_channels=6, out_channels=64)
        self.ec2 = EdgeConvBlock(in_channels=128, out_channels=64)  # 2*64
        self.ec3 = EdgeConvBlock(in_channels=128, out_channels=emb_dim)

        self.cls_head = nn.Sequential(
            nn.Linear(64 + 64 + emb_dim, 256, bias=False),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.2),
            nn.Linear(256, num_classes),
        )

    def forward(self, pts: torch.Tensor) -> torch.Tensor:
        """
        pts: (B, N, 3)
        return logits: (B, num_classes)
        """
        B, N, _ = pts.shape

        # 1) edge conv block 1
        e1 = edge_feature(pts, k=self.k)                 # (B, N, k, 6)
        f1 = self.ec1(e1)                                # (B, 64, N, k)
        f1 = f1.max(dim=-1).values                       # neighbor max -> (B, 64, N)

        # 2) block 2 uses feature space knn (common in DGCNN)
        x1 = f1.permute(0, 2, 1).contiguous()            # (B, N, 64)
        e2 = edge_feature(x1, k=self.k)                  # (B, N, k, 128)
        f2 = self.ec2(e2)                                # (B, 64, N, k)
        f2 = f2.max(dim=-1).values                       # (B, 64, N)

        # 3) block 3
        x2 = f2.permute(0, 2, 1).contiguous()            # (B, N, 64)
        e3 = edge_feature(x2, k=self.k)                  # (B, N, k, 128)
        f3 = self.ec3(e3)                                # (B, emb, N, k)
        f3 = f3.max(dim=-1).values                       # (B, emb, N)

        # global pooling over points
        g1 = f1.max(dim=-1).values                       # (B, 64)
        g2 = f2.max(dim=-1).values                       # (B, 64)
        g3 = f3.max(dim=-1).values                       # (B, emb)

        g = torch.cat([g1, g2, g3], dim=-1)              # (B, 64+64+emb)
        logits = self.cls_head(g)                        # (B, num_classes)
        return logits
# Colab cell 6: train / eval loop

@dataclass
class TrainConfig:
    n_points: int = 512
    k: int = 16
    num_classes: int = 3
    batch_size: int = 16
    lr: float = 1e-3
    epochs: int = 8

def accuracy(logits: torch.Tensor, y: torch.Tensor) -> float:
    pred = logits.argmax(dim=-1)
    return (pred == y).float().mean().item()

def train_one_epoch(model, loader, optim):
    model.train()
    total_loss = 0.0
    total_acc = 0.0

    for pts, y in loader:
        pts = pts.to(device)
        y = y.to(device)

        logits = model(pts)
        loss = F.cross_entropy(logits, y)

        optim.zero_grad()
        loss.backward()
        optim.step()

        total_loss += loss.item()
        total_acc += accuracy(logits.detach(), y)

    return total_loss / len(loader), total_acc / len(loader)

@torch.no_grad()
def eval_one_epoch(model, loader):
    model.eval()
    total_loss = 0.0
    total_acc = 0.0

    for pts, y in loader:
        pts = pts.to(device)
        y = y.to(device)

        logits = model(pts)
        loss = F.cross_entropy(logits, y)

        total_loss += loss.item()
        total_acc += accuracy(logits, y)

    return total_loss / len(loader), total_acc / len(loader)

cfg = TrainConfig()

train_ds = PointCloudClsDataset(n_samples=400, n_points=cfg.n_points, n_classes=cfg.num_classes)
val_ds   = PointCloudClsDataset(n_samples=120, n_points=cfg.n_points, n_classes=cfg.num_classes)

train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True, collate_fn=cls_collate, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=cfg.batch_size, shuffle=False, collate_fn=cls_collate, num_workers=0)

model = DGCNNClassifier(k=cfg.k, num_classes=cfg.num_classes).to(device)
optim = torch.optim.Adam(model.parameters(), lr=cfg.lr)

for epoch in range(1, cfg.epochs + 1):
    tr_loss, tr_acc = train_one_epoch(model, train_loader, optim)
    va_loss, va_acc = eval_one_epoch(model, val_loader)
    print(f"epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.3f} | val loss {va_loss:.4f} acc {va_acc:.3f}")

 

  • 샘플마다 포인트 수가 다를 때 dataloader에서 터지는 문제 (fixed N sampling으로 해결)
  • knn 이후 gather shape 정리 (B, N, k, C) 만들기
  • edge feature 차원 변환 (Conv2d 입력은 (B, C, N, k))
  • segmentation이면 CE 입력 shape (B, C, N) 또는 flatten (B·N, C)로 바꿔야 함

 


2. 주어진 point cloud segmentation 데이터 (각 샘플 (N, 3), 각 포인트 라벨 (N,) )에 대해 다음을 구현하라.

  1. 전처리 및 고정 포인트 수 처리
  2. Custom Dataset 및 DataLoader
  3. PointNet 스타일 segmentation 모델 구현 (global feature를 per-point feature에 concat)
  4. per-point cross entropy loss 구현 (shape 정리 포함)
  5. 학습 후 validation per-point accuracy 출력
# Colab cell 7: synthetic segmentation dataset

def make_synthetic_seg_cloud(n_raw: int = 1024) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    간단한 segmentation 태스크:
    - 구 표면 포인트를 만들고
    - z > 0 이면 class 1, else class 0 (반구 분할)
    """
    v = torch.randn(n_raw, 3)
    v = v / (v.norm(dim=-1, keepdim=True) + 1e-6)
    r = 1.0 + 0.02 * torch.randn(n_raw, 1)
    pts = (v * r).float()

    labels = (pts[:, 2] > 0).long()  # (N_raw,)
    return pts, labels

class PointCloudSegDataset(Dataset):
    def __init__(self, n_samples: int, n_points: int):
        self.n_samples = n_samples
        self.n_points = n_points
        self.data = [make_synthetic_seg_cloud(1024) for _ in range(n_samples)]

    def __len__(self) -> int:
        return self.n_samples

    def __getitem__(self, idx: int):
        pts, y = self.data[idx]                    # pts (N_raw,3), y (N_raw,)
        pts = center_and_scale(pts)

        # fixed N sampling 시, labels도 같은 idx로 뽑아야 함
        N = pts.size(0)
        if N >= self.n_points:
            idxs = torch.randperm(N)[:self.n_points]
        else:
            pad = self.n_points - N
            extra = torch.randint(low=0, high=N, size=(pad,))
            idxs = torch.cat([torch.arange(N), extra], dim=0)

        pts = pts[idxs]                            # (n_points,3)
        y = y[idxs]                                # (n_points,)
        return pts, y

def seg_collate(batch):
    pts = torch.stack([b[0] for b in batch], dim=0)  # (B,N,3)
    y = torch.stack([b[1] for b in batch], dim=0)    # (B,N)
    return pts, y

 

# Colab cell 8: PointNet segmentation model

class PointNetSeg(nn.Module):
    def __init__(self, num_classes: int = 2):
        super().__init__()
        # per-point feature extractor (shared MLP)
        self.mlp1 = nn.Sequential(
            nn.Conv1d(3, 64, 1, bias=False),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 128, 1, bias=False),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
        )
        # global feature
        self.mlp_global = nn.Sequential(
            nn.Conv1d(128, 256, 1, bias=False),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
        )
        # segmentation head: concat (local 128 + global 256) = 384
        self.seg_head = nn.Sequential(
            nn.Conv1d(384, 256, 1, bias=False),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.2),
            nn.Conv1d(256, num_classes, 1),
        )

    def forward(self, pts: torch.Tensor) -> torch.Tensor:
        """
        pts: (B, N, 3)
        return logits: (B, N, num_classes)
        """
        x = pts.permute(0, 2, 1).contiguous()  # (B,3,N)

        local = self.mlp1(x)                   # (B,128,N)
        g = self.mlp_global(local)             # (B,256,N)

        # global max pooling over points
        gmax = g.max(dim=-1, keepdim=True).values     # (B,256,1)
        gmax = gmax.expand(-1, -1, local.size(-1))    # (B,256,N)

        feat = torch.cat([local, gmax], dim=1)        # (B,384,N)
        out = self.seg_head(feat)                     # (B,C,N)

        return out.permute(0, 2, 1).contiguous()      # (B,N,C)
# Colab cell 9: segmentation train / eval

def per_point_accuracy(logits: torch.Tensor, y: torch.Tensor) -> float:
    # logits (B,N,C), y (B,N)
    pred = logits.argmax(dim=-1)
    return (pred == y).float().mean().item()

def train_one_epoch_seg(model, loader, optim):
    model.train()
    total_loss = 0.0
    total_acc = 0.0

    for pts, y in loader:
        pts = pts.to(device)
        y = y.to(device)

        logits = model(pts)  # (B,N,C)

        # CE는 입력이 (B,C,...) 형태를 선호하므로 reshape 또는 permute가 필요
        # 방법 1: flatten
        B, N, C = logits.shape
        loss = F.cross_entropy(logits.view(B * N, C), y.view(B * N))

        optim.zero_grad()
        loss.backward()
        optim.step()

        total_loss += loss.item()
        total_acc += per_point_accuracy(logits.detach(), y)

    return total_loss / len(loader), total_acc / len(loader)

@torch.no_grad()
def eval_one_epoch_seg(model, loader):
    model.eval()
    total_loss = 0.0
    total_acc = 0.0

    for pts, y in loader:
        pts = pts.to(device)
        y = y.to(device)

        logits = model(pts)
        B, N, C = logits.shape
        loss = F.cross_entropy(logits.view(B * N, C), y.view(B * N))

        total_loss += loss.item()
        total_acc += per_point_accuracy(logits, y)

    return total_loss / len(loader), total_acc / len(loader)

train_ds = PointCloudSegDataset(n_samples=300, n_points=512)
val_ds   = PointCloudSegDataset(n_samples=100, n_points=512)

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=seg_collate, num_workers=0)
val_loader   = DataLoader(val_ds, batch_size=16, shuffle=False, collate_fn=seg_collate, num_workers=0)

model = PointNetSeg(num_classes=2).to(device)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(1, 7):
    tr_loss, tr_acc = train_one_epoch_seg(model, train_loader, optim)
    va_loss, va_acc = eval_one_epoch_seg(model, val_loader)
    print(f"epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.3f} | val loss {va_loss:.4f} acc {va_acc:.3f}")

 

728x90
반응형