LOADING

加载过慢请开启缓存 浏览器默认开启

Okabe's LAB

Kubeflow 笔记(1)—— 安装与 Hello World

Cloud Native 2024/3/23

Installation

步骤1,使用 kind 安装本地 k8s 集群

Kind 即 “Kubernetes in Docker”,在 docker 内部使用 kubeadm 来安装集群。

参考:Kubernetes教程(十五)—使用 kind 在本地快速部署一个 k8s集群

注意一下 KUBECONFIG 环境变量是不是正确,我之前用 kubeadm 在本地安装过 k8s,所以 KUBECONFIG 环境变量不对。kind 的配置文件应该在 ~/.kube/config(我这里是 root 用户)。

步骤2,安装 kubeflow

参考官方:https://github.com/kubeflow/manifests

阅读全文

Spark 笔记(1)——安装与基础使用

Big Data 2024/3/12

正文

该笔记将记录在 Ubuntu-20.04 上安装并使用 Spark 的历程。

环境

  • Ubuntu 20.04
  • Spring Boot 3.2.3
  • Spark 3.2.0
  • JDK 17

安装参考:在 Ubuntu 20.04 上安装 Apache Spark 教程

启动流程

  • 启动 master

    start-master.sh
    

    这个指令会在当前主机启动一个 master 节点,可以在 localhost:8080 访问到控制面板。

  • 启动 worker

    复制命令行中生成的 master 节点的 URL,启动 worker

    start-worker.sh spark://xxx
    

    启动成功后长这样:

配置 Java Spark 环境

遇到的问题:

这一套 pom.xml 是可以用的:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.reins</groupId>
    <artifactId>spark</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spark</name>
    <description>spark</description>
    <properties>
        <java.version>17</java.version>
        <jakarta-servlet.version>4.0.3</jakarta-servlet.version>
        <jersey.version>2.36</jersey.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.43</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.21.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.6</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.13</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.9</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>4.0.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

注意:在我的环境中,worker 占用了 8081 端口,而我的 Spring 服务启动在 8081 端口,所以产生问题。按照上面的链接能够解决剩余的版本冲突问题。

简单的例子

  • 经典 word count:

    JavaRDD<String> file = session.read().textFile(filePath).cache().toJavaRDD();
    JavaRDD<String> words = file.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
    JavaPairRDD<String, Integer> wordToCountMap = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
    JavaPairRDD<String, Integer> wordCounts = wordToCountMap.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
    wordCounts.saveAsTextFile("./word_count");
    

参考:

  • spark中map()和flatmap()的区别

  • 生成 parquet:

    SparkSession session = sparkService.getSparkSession();
    Properties properties = new Properties();
    properties.setProperty("user", "root");
    properties.setProperty("password", "123456");
    Dataset<Row> dataset = session.read().jdbc("jdbc:mysql://localhost:3306/spark", "person", properties);
    // 输出表格
    dataset.show();
    dataset.coalesce(1).write().mode(SaveMode.Overwrite).option("header", true).parquet("./test.parquet");
    

Pandas 也支持生成 parquet:pandas.DataFrame.to_parquet

Spark 中的三种数据结构

  • Spark RDD

    RDD 是一种弹性分布式数据集,是一种只读分区数据。它是 Spark 的基础数据结构,具有内存计算能力、数据容错性以及数据不可修改特性。

  • Spark Dataframe

    Dataframe 也是一种不可修改的分布式数据集合,它可以按列查询数据,类似于关系数据库里面的表结构。可以对数据指定数据模式(schema)。

  • Spark Dataset

    DatasetDataFrame 的扩展,它提供了类型安全,面向对象的编程接口。也就是说 DataFrameDataset 的一种特殊形式。

参考:

阅读全文

纪念一次成功的开源贡献

Life 2024/2/8

https://github.com/qdrant/qdrant/pull/3549

没想到还能赚美刀~

阅读全文

Rust 笔记(1)—— 关于 tokio spawn

Language 2024/2/5

正文

假如说有这样一个异步函数 async_func,如果在同步函数中不用 await 调用它,它是不会被执行的,除非使用 tokio::task::spawn 函数开启一个异步任务。该函数接受一个 Future 参数,会返回一个 tokio::task::JoinHandle<T>,其中范型 T 是异步任务的返回值。

JoinHandle类型可以通过await来等待异步任务的完成,也可以通过abort()来中断异步任务,异步任务被中断后返回JoinError类型。

举个例子:

async fn async_func() -> Result<(), Box<dyn std::error::Error>> {
    ...
    Ok(())
}

fn sync_func() {
    // handle 的类型为 JoinHandle<Result<(), Box<dyn std::error::Error>>>
    let handle = spawn(async_func());
    // 中断
    handle.abort();
    // 等待完成(需要在异步函数中)
    // handle.await
}

可以使用 tokio::join! 宏来等待多个 JoinHandle 执行完成。

这里的 spawn 如果不等待的话,就会自己独立执行,所以会要求引用数据的生命周期是 'static,也就是活到程序结束(因为异步任务也可能执行到程序结束)。这就会带来一些编码上的困难,例如下面的例子:

struct Manager;
impl Manager {
    async fn do_something_async(&self) {}

    fn start(&self) {
        spawn(self.do_something_async());
    }
}

上述的代码会报错:

error[E0521]: borrowed data escapes outside of method
  --> src/test.rs:22:15
   |
21 |     fn start(&self) {
   |              -----
   |              |
   |              `self` is a reference that is only valid in the method body
   |              let's call the lifetime of this reference `'1`
22 |         spawn(self.do_something_async());
   |               ^^^^^^^^^^^^^^^^^^^^^^^^^
   |               |
   |               `self` escapes the method body here
   |               argument requires that `'1` must outlive `'static`

spawn 要求 &self 的生命周期必须是 'static,我查阅了一些资料,从 Rust 论坛中看到了相关问题。解决方案是使用 ArcArc/Mutex(如果有可变引用)。

struct Manager {
    inner: Arc<ManagerInner>,
}
struct ManagerInner;
impl ManagerInner {
    async fn do_something_async(&self) {}
}

impl Manager {
    fn start(&self) {
        let inner_cloned = self.inner.clone();
        spawn(async move {
            inner_cloned.do_something_async().await;
        });
    }
}

通过这一层包裹,就不报错了。Arc 保证了引用可以活到程序结束(只要还有引用,就不会被回收),即使 Manager 被回收,只要异步任务还在进行,ManagerInner 还是存在一份。这边必须使用 move 告诉编译器移动 inner_cloned 而不是捕获它的引用(捕获引用是默认行为)。

参考

阅读全文

Kubernetes 笔记(2)—— 记一次 Kubernetes 小练习

2023/4/6

Lab 1

该 lab 基于 Minikube,用于练习 K8s 最基本的 Api Object,如 ConfigMap,Secret,Pod,Service 等等。

使用 Secret 指定 Mysql 密码

kubectl apply -f lab1/mysql_secret.yml

该 YAML 文件中制定了数据库的密码。

stringData:
  db_password: '123456'

在创建 Mysql 对应的 Pod 的时候,可以使用这个 Secret 来指定 Mysql 的密码。
如下所示,使用环境变量指定 Mysql root 用户的密码,这个密码源自 Secret 中定义的 db_password

env:
    - name: MYSQL_ROOT_PASSWORD
    valueFrom:
      secretKeyRef:
        name: mysql-secret
        key: db_password

创建 Mysql 对应的 Pod:

kubectl apply -f lab1/mysql_pod.yml

验证 Mysql 正确指定了密码:

kubectl exec -it mysql -- mysql -uroot -p123456

有关 Secret 的更多声明方式:https://github.com/omerbsezer/Fast-Kubernetes/blob/main/K8s-Secret.md

创建 Mysql 对应的 Service

kubectl apply -f lab1/mysql_service.yml

该 YAML 文件中指定了选中的 Pod 以及端口。使用 NodePort 的方式会在每个 Node 的 ip 上暴露一个端口,
来访问对应的 Service 服务。
同时,可以随机暴露出一个端口外部访问的端口(默认值:30000-32767)。

一般而言,对于数据库这种服务,应该使用 ClusterIp,只在集群内部使用,这边为了测试服务连接,故暴露给外部。

spec:
  type: NodePort
  selector:
    app: db
  ports:
    - protocol: TCP
      port: 3306
      targetPort: 3306

这里的 selector 对应了 lab1/mysql_pod.yml 中指定的 labels:

labels:
  app: db

创建该 Service:

kubectl apply -f lab1/mysql_service.yml

查看当前的 Service:

$ kubectl get svc
NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
kubernetes      ClusterIP   10.96.0.1        <none>        443/TCP          25m
mysql-service   NodePort    10.108.143.219   <none>        3306:30160/TCP   14s

其中 3306 是集群内的端口,pod 可以通过这个端口进行访问:mysql-service:3306(通过 service 名直接访问该服务)。30160 是对外可见的端口,在集群外部可以通过 nodeIp:30160 访问。

可以看到集群暴露了 30160 的端口给外部使用,不过我们使用的是 Minikube,所有的 k8s 组件都跑在 Docker 容器里,
所以我们必须要用 tunnel 才能真正访问该 Service:

minikube service --url mysql-service

该指令会开启一个 tunnel,并且提供一个可访问的 url,相当于让我们能够去访问 30160 端口,并最终访问到内部的 Service。

$ minikube service --url mysql-service
http://127.0.0.1:51634
❗  Because you are using a Docker driver on windows, the terminal needs to be open to run it.

可以用一些数据库工具来验证是否能够连接:

使用 ConfigMap 配置数据库信息

ConfigMap 和 Secret 基本一致,只不过后者用于存储密文信息。但是注意,单纯使用 Secret 仍然存在风险,
因为其使用的 base64 并不能保证安全性,应该配合 k8s 提供的 RBAC 机制使用。

这边指定了使用的 mysql-server。

data:
  db_server: "mysql-service"

添加该 ConfigMap:

kubectl apply -f lab1/mysql_configmap.yml

我们启动一个测试 Pod 查看效果:

kubectl apply -f lab1/test_pod.yml

在该 Pod 内部查看环境变量是否正确:

kubectl exec -it test -- bin/sh
echo $MYSQL_SERVER
echo $MYSQL_ROOT_PASSWORD

应该得到如下的输出:

$ kubectl exec -it test -- bin/sh
/ # echo $MYSQL_SERVER
mysql-service
/ # echo $MYSQL_ROOT_PASSWORD
123456

注意,k8s 里的 Service 是 ping 不通的,以下摘自:https://kuboard.cn/learning/faq/ping-service.html

因为 Kubernetes 只是为 Service 生成了一个虚拟 IP 地址,实现的方式有:

  • User space 代理模式
  • Iptables 代理模式
  • IPVS 代理模式

不管是哪种代理模式,Kubernetes Service 的 IP 背后都没有任何实体可以响应「ICMP」,全称为 Internet 控制报文协议(Internet Control Message Protocol)。

我们可以在 test pod 里面安装 telnet 指令,查看连接情况:

apk update
apk add busybox-extras

注意,可以直接用 Service 的名字,依靠 DNS 访问服务,但是这里只是 hostname,还需要指定端口号

telnet "$MYSQL_SERVER:3306"

修改 ConfigMap 之后更新 Pod

我们可以通过以下方式对 Api Object 的配置进行修改,以之前的 ConfigMap 为例:

kubectl edit configmap mysql-config

这会启动一个文本编辑器让你进行修改。这里我们把 db_server 修改成了 dummy
我们再次进入 test Pod,看看环境变量是否改变:

kubectl exec -it test -- bin/sh
echo $MYSQL_SERVER

输出依旧是之前的 mysql-service,也就是说修改 ConfigMap 不会导致引用它的 Pod 的自动更新。我们需要一些其他手段让 Pod 在 ConfigMap 更新的时候也进行更新。

使用 Deployment

Deployment 可以支持滚动升级,当我们的 ConfigMap 修改的时候,可以认为是一次版本变动,我们可以通过 Deployment 更新对应的 Pod。

我们先删除之前的 test Pod,

kubectl delete -f lab1/test_pod.yml

接下来我们要用 Deployment 来管理这个 Pod。Deployment 的 Template 对应了 Pod 的 Spec。

创建对应的 Deployment:

kubectl apply -f lab1/test_deployment.yml

查看生成的 Pod(s):

$ kubectl get pods
NAME                   READY   STATUS    RESTARTS   AGE
mysql                  1/1     Running   0          102m
test-7775f744b-c7sgs   1/1     Running   0          25s

进入该 Pod 查看环境变量:

kubectl exec -it test-7775f744b-c7sgs -- bin/sh
echo $MYSQL_SERVER

输出为 dummy,现在我们把 configmap 修改为之前的版本。

然后,我们使用如下方法( https://www.qttc.net/504-how-update-latest-configmap-in-pods.html )更新 Pod:

kubectl rollout restart deploy/test

我们可以在另外两个终端,通过:

kubectl get rs -w

以及

kubectl get pods -w

查看发生的变化:

$ kubectl get rs -w
NAME             DESIRED   CURRENT   READY   AGE
test-7775f744b   1         1         1       9m32s
test-595fb97b87   1         0         0       0s
test-595fb97b87   1         0         0       0s
test-595fb97b87   1         1         0       0s
test-595fb97b87   1         1         1       5s
test-7775f744b    0         1         1       9m58s
test-7775f744b    0         1         1       9m58s
test-7775f744b    0         0         0       9m58s
$ kubectl get pods -w
NAME                   READY   STATUS    RESTARTS   AGE  
mysql                  1/1     Running   0          112m 
test-7775f744b-c7sgs   1/1     Running   0          9m46s
test-595fb97b87-mqb2m   0/1     Pending   0          0s
test-595fb97b87-mqb2m   0/1     Pending   0          0s
test-595fb97b87-mqb2m   0/1     ContainerCreating   0          0s
test-595fb97b87-mqb2m   1/1     Running             0          5s
test-7775f744b-c7sgs    1/1     Terminating         0          9m58s
test-7775f744b-c7sgs    0/1     Terminating         0          10m
test-7775f744b-c7sgs    0/1     Terminating         0          10m
test-7775f744b-c7sgs    0/1     Terminating         0          10m

可以通过如下指令查看 Deployment 的历史版本:

kubectl rollout history deploy/test

我们再次进入 test Pod,查看环境变量:

kubectl exec -it test-595fb97b87-mqb2m -- bin/sh
echo $MYSQL_SERVER

更新成功:mysql-service。具体的,还可以根据 https://github.com/omerbsezer/Fast-Kubernetes/blob/main/K8s-Rollout-Rollback.md 中提到的两种策略,指定更新策略。
Recreate 是全部删除,然后新建(显然服务会有一段时间 Down),而 RollingUpdate 也就是滚动升级,两个版本的 Pod 将同时存在,慢慢将所有 Pod 变为最新版本(关闭一部分旧的,开启一部分新的)。

使用 Kustomize

项目网址:https://github.com/kubernetes-sigs/kustomize

kustomize lets you customize raw, template-free YAML files for multiple purposes, leaving the original YAML untouched and usable as is.

推荐阅读:

我们先删除之前测试的残留:

kubectl delete -f lab1/mysql_configmap.yml
kubectl delete -f lab1/test_deployment.yml

创建如下的 Kustomization.yml:

resources:
  - test_deployment.yml
configMapGenerator:
  - name: mysql-config
    literals:
      - db_server=mysql-service

查看对应的生成结果(只是打印,没有创建):

kubectl kustomize lab1/base
kubectl kustomize lab1/stagging

应用到 k8s:

kubectl apply -k lab1/base

查看生成的 ConfigMap:

$ kubectl get configmap
NAME                      DATA   AGE
kube-root-ca.crt          1      164m
mysql-config-5bhm7k67gb   1      38s

进入 Pod,查看环境变量:

kubectl exec -it test-6fc4d8f9cc-f6zwr -- bin/sh
echo $MYSQL_SERVER

结果为 mysql-service,和预期一致。

接下来修改为 dummy

kubectl apply -k lab1/stagging

可以发现自动创建了新的 Pod:

$ kubectl get pods
NAME                    READY   STATUS        RESTARTS   AGE
mysql                   1/1     Running       0          166m
test-6fc4d8f9cc-f6zwr   1/1     Terminating   0          2m13s
test-85d74d7669-d9ft8   1/1     Running       0          26s

进入 Pod,查看环境变量:

kubectl exec -it test-85d74d7669-d9ft8 -- bin/sh
echo $MYSQL_SERVER

输出为 dummy,成功!

注意,原来的 configmap 还是存在的(这种方式比较好,删除总是一个比较危险的行为)。

$ kubectl get configmap
NAME                      DATA   AGE
kube-root-ca.crt          1      174m
mysql-config-2h6ddfhh59   1      4m48s
mysql-config-5bhm7k67gb   1      6m35s
阅读全文

Kubernetes 笔记(1)—— Informer 与 Controller

2023/4/6

Informer

组成部分

  • Controller:Informer 的实施载体,可以创建 reflector 及控制 processLoop。processLoop 将DeltaFIFO 队列中的数据 pop 出,首先调用Indexer进行缓存并建立索引,然后分发给 processor 进行处理。

  • Reflector:Informer 并没有直接访问 api-server,而是通过一个叫 Reflector 的对象进行 api-server 的访问。Reflector 通过 ListAndWatch 监控指定的 kubernetes 资源,当资源发生变化的时候,例如发生了 Added 资源添加等事件,会将其资源对象存放在本地缓存 DeltaFIFO 中。

  • DeltaFIFO:是一个先进先出的缓存队列,用来存储 Watch API 返回的各种事件,如Added、Updated、Deleted。

  • Indexer:Indexer 使用一个线程安全的数据存储来存储对象和它们的键值。需要注意的是,Indexer 中的数据与 etcd中 的数据是完全一致的,这样 client-go 需要数据时,无须每次都从 api-server 获取,从而减少了请求过多造成对 api-server 的压力。一句话总结:Indexer 是用于存储+快速查找资源。

  • Processor:记录了所有的回调函数(即 ResourceEventHandler)的实例,并负责触发回调函数

使用 client-go 自定义 controller

controller 可以使用 informer 来绑定事件的回调函数,以实现对对象的控制。比如对于 replicaset 而言,监听三种事件,判断 replicaset 实际的 replicas 数量是否符合预期,若不符合预期,则进行相应的扩缩容。

最佳实践

源自:https://raw.githubusercontent.com/HanFa/learn-k8s/master/lesson3/lesson3_slides.pdf

阅读全文

Kubernetes 笔记(0)—— K8s 实践报告

2023/3/23
阅读全文
1 ... 2
avatar
Zihong Lin

What I can’t create, I don’t understand.