scservers(十一)监控管理-日志调用链跟踪管理

前言

微服务会讲业务根据实际需要按照一定的粒度将业务拆分成一个个服务,相互之间也会存在调用关系,当系统庞大复杂到一定程度后,调用关系的复杂导致请求变慢或者不可用的时候,查找问题也变得复杂困难,不像单体应用,在一个地方查看下日志就ok.这时我们就需要将相应的调用链成串,就需要分布式系统日志调用跟踪管理来进行日志数据的治理。

现今业界分布式服务跟踪的理论基础主要来自于 Google 的一篇论文《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》,使用最为广泛的开源实现是 Twitter 的 Zipkin,为了实现平台无关、厂商无关的分布式服务跟踪,CNCF 发布了布式服务跟踪标准 Open Tracing。

Open Tracing标准

  • Span: 基本工作单元,例如,在一个新建的span中发送一个RPC等同于发送一个回应请求给RPC,span通过一个64位ID唯一标识,trace以另一个64位ID表示,span还有其他数据信息,比如摘要、时间戳事件、关键值注释(tags)、span的ID、以及进度ID(通常是IP地址)

span在不断的启动和停止,同时记录了时间信息,当你创建了一个span,你必须在未来的某个时刻停止它。

  • Trace: 一系列spans组成的一个树状结构,例如,如果你正在跑一个分布式大数据工程,你可能需要创建一个trace。

  • Annotation: 用来及时记录一个事件的存在,一些核心annotations用来定义一个请求的开始和结束

  • cs – Client Sent -客户端发起一个请求,这个annotion描述了这个span的开始 sr – Server Received -服务端获得请求并准备开始处理它,如果将其sr减去cs时间戳便可得到网络延迟 ss – Server Sent -注解表明请求处理的完成(当请求返回客户端),如果ss减去sr时间戳便可得到服务端需要的处理请求时间 cr – Client Received -表明span的结束,客户端成功接收到服务端的回复,如果cr减去cs时间戳便可得到客户端从服务端获取回复的所有所需时间

概念

一般来说,一个分布式服务跟踪系统主要由三部分构成:

  • 数据收集
  • 数据存储
  • 数据展示

    ### 组成
  • Rabbitmq: 消息队列,主要用于传输日志
  • Zipkin: 服务调用链路追踪系统,聚合各业务系统调用延迟数据,达到链路调用监控与跟踪。

    服务调用链路
  • Sleuth 作为Zipkin客户端按照Open Tracing标准收集日志信息,可以通过http或者mq将日志传递到Zipkin Server 并存储起来
  • ES + Kibana提供搜索、查看和与存储在 Elasticsearch 索引中的数据进行交互的功能。开发者或运维人员可以轻松地执行高级数据分析,并在各种图表、表格和地图中可视化数据。
  • Grafana可视化图表监控工具,按照规则将数据库信息进行可视化处理,进行统计图表等的展现。

    Zipkin

    Zipkin 是 Twitter 的一个开源项目,它基于 Google Dapper 实现,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现。

    我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的 REST API 接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。除了面向开发的 API 接口之外,它也提供了方便的 UI 组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。

    Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。接下来的测试为方便直接采用 In-Memory 方式进行存储,生产推荐 Elasticsearch。

    上图展示了 Zipkin 的基础架构,它主要由 4 个核心组件构成:

    • Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为 Zipkin 内部处理的 Span 格式,以支持后续的存储、分析、展示等功能。
    • Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
    • RESTful API:API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。
    • Web UI:UI 组件,基于 API 组件实现的上层应用。通过 UI 组件用户可以方便而有直观地查询和分析跟踪信息。

    Spring Cloud Sleuth

    Spring Cloud Sleuth 为服务之间调用提供链路追踪。通过 Sleuth 可以很清楚的了解到一个服务请求经过了哪些服务,每个服务处理花费了多长。从而让我们可以很方便的理清各微服务间的调用关系。此外 Sleuth 可以帮助我们:

    • 耗时分析: 通过 Sleuth 可以很方便的了解到每个采样请求的耗时,从而分析出哪些服务调用比较耗时;
    • 可视化错误: 对于程序未捕捉的异常,可以通过集成 Zipkin 服务界面上看到;
    • 链路优化: 对于调用比较频繁的服务,可以针对这些服务实施一些优化措施。

      Spring Cloud Sleuth 可以结合 Zipkin,将信息发送到 Zipkin,利用 Zipkin 的存储来存储信息,利用 Zipkin UI 来展示数据。

    这是 Spring Cloud Sleuth 的概念图:

使用

Zipkin 服务端

我们使用rabbitmq 作为传输信息的方式使用日志跟踪系统

springcloud 2.0 之后不再简易自定义Zipkin服务端,所以我们直接使用Zipkin-server jar包进行。

如下方式启动,就可以从rabbit_address获取日志消息了

RABBIT_ADDRESSES=localhost java -jar zipkin.jar
  • docker的使用配置

    • 依赖消息通信
    version: '3'
    services:
    rabbitmq:
    image: rabbitmq:alpine
    container_name: sc-rabbitmq
    restart: always
    volumes:
    - ./data/rabbitmq:/var/lib/rabbitmq
    networks:
    - sc-net
    ports:
    - 5672:5672
    • zipkin-server
    zipkin-server:
    image: openzipkin/zipkin
    container_name: sc-zipkin-server
    restart: always
    volumes:
    - ./data/logs/zipkin-server:/logs
    networks:
    - sc-net
    ports:
    - 9411:9411
    environment:
    - RABBIT_ADDRESSES=rabbitmq:5672
    - RABBIT_MQ_PORT=5672
    - RABBIT_PASSWORD=guest
    - RABBIT_USER=guest
    • ES + Grafana
    version: '3'
    services:
    elasticsearch:
    image: elasticsearch:alpine
    container_name: sc-elasticsearch
    restart: always
    volumes:
    - ./data/elasticsearch/logs:/var/logs/elasticsearch
    networks:
    - sc-net
    ports:
    - 9200:9200
    kibana:
    image: kibana
    container_name: sc-kibana
    restart: always
    volumes:
    - ./data/kibana/logs:/var/logs/kibana
    networks:
    - sc-net
    ports:
    - 5601:5601
    environment:
    - ELASTICSEARCH_URL=http://elasticsearch:9200
    depends_on:
    - elasticsearch
    grafana:
    image: grafana/grafana
    container_name: sc-grafana
    restart: always
    volumes:
    - ./data/grafana/logs:/var/logs/grafana
    networks:
    - sc-net
    ports:
    - 3000:3000

客户端

  • 依赖
<!--自省和监控的集成功能-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--注册中心-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        </dependency>
        <!--日志跟踪-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zipkin</artifactId>
        </dependency>
        <!--消息总线-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
  • 配置
spring:
  application:
    name: ribbon-consumer
  rabbitmq:
    host: ${RABBIT_MQ_HOST:localhost}
    port: ${RABBIT_MQ_PORT:5672}
    username: ${RABBIT_MQ_USERNAME:guest}
    password: ${RABBIT_MQ_PASSWORD:guest}

  zipkin:
    enabled: true
    sender:
      type: rabbit
  sleuth:
    sampler:
      probability: 1.0

scservers(十)监控管理-应用管理

前言

说到监控管理,springcloud 不应该把springboot admin落下,与actuator 结合提供日志,应用运行参数等的环境监控。

  • 显示 name/id 和版本号
  • 显示在线状态
  • Logging 日志级别管理
  • JMX beans 管理
  • Threads 会话和线程管理
  • Trace 应用请求跟踪
  • 应用运行参数信息,如:
    • Java 系统属性
    • Java 环境变量属性
    • 内存信息
    • Spring 环境属性

      ## admin 服务端
  • 依赖
<dependency>
            <groupId>de.codecentric</groupId>
            <artifactId>spring-boot-admin-starter-server</artifactId>
            <version>${admin-server.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
  • 注解
@EnableAdminServer
public class AdminApplication {
    public static void main(String[] args) {
        SpringApplication.run(AdminApplication.class, args);
    }
}
  • 属性设置
 #spring boot admin的登陆账号和密码配置
  spring:
    security:
     user:
        name: admin
        password: 123456

admin 客户端

  • 依赖
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
  • 注解

  • 属性设置

#Actuator 在 spring boot 2.0 版本后,只暴露了两个端点,下面开启所有端点
management:
  endpoints:
    web:
      exposure:
        include: '*'

即admin 通过actuator,mq 收集监控信息。

scservers(九)监控管理-断路器管理

前言

作为一个能上生产的系统,我觉得需要两个指标

  • 性能稳定
  • 安全运行

    前者考量设计编码能力,后者更加侧重后续监控管理,这往往被人忽略却异常重要。性能稳定需要性能优化,这个下次再讲,监控管理,我们需要对机器,应用,日志等各方面进行。这是框架应用运行良好生态系统。

    springcloud 在这方面做得很好,如springcloud admin,日志跟踪管理,甚至断路器监控管理。我们首先先补上断路器的监控管理,在前面讲熔断的时候就已经讲过部分,现在把它补全。

    ## Hystrix Dashboard

    在熔断篇已经有过,当时直接讲面板集成到了应用中,这里再说下其使用并讲面板应用独立出来。
  • 配置

    dashboard 为ui,不然不需要
    xml
    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-actuator</artifactId>

    </dependency>

    <dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>

    </dependency>

    添加注解
@EnableCircuitBreaker
@EnableHystrixDashboard

手动开启

feign: 
  hystrix:
    enabled: true
    
management:
  endpoints:
    web:
      exposure:
        include: hystrix.stream
  • 使用

    通过/hystrix访问dashboard界面,在这里可以通过输入各个应用的hystrix.stream地址来访问监控各应用的熔断情况。

Turbine

通过hystrix dashboard 可以监控各应用熔断情况,而turbine补充了对集群进行监控。我们通过mq 收集数据流的方式来继承。

  • 配置

    依赖
 <!--集群监控-->
        <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
注解
@EnableTurbineStream

属性

turbine:
  stream:
    port: 8030
  • 服务调用者设置

    添加依赖
    xml
    <dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-netflix-hystrix-stream</artifactId>

    </dependency>

    <dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

    </dependency>

scservers(八)网关-动态路由及管理

前言

前面我们已经弄了网关的基本路由鉴权等,初步的是个可运行的poc验证demo了,那么在上实际生产之前我们还要解决一个问题——静态路由,是的,之前我们在yml文件里面配置了静态的路由,在实际生产中,不可能每次增删改路由信息都要从新启动,那么我们就得做到路由是可以在运行时动态管理的。

我们将路由信息改为从Redis里面动态加载,并且增删动作放到一个独立的应用中,gateway-admin,而不是gateway-server,gateway-admin只需要内部使用。

动态路由实现

springcloud gateway支持这个动态路由的扩展,我们只需要实现RouteDefinitionRepository 这个接口,这个接口表示从存储设备中获取路由信息,我们这里选择实现从Redis获取。

  • gateway-server 路由动态获取
@Component
@Slf4j
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository {
    @Autowired
    private IRouteService routeService;
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return routeService.getRouteDefinitions();
    }
    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return routeService.save(route);
    }
    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeService.delete(routeId);
    }
}
@Service
@Slf4j
public class RouteService implements IRouteService {

    private static final String GATEWAY_ROUTES = "gateway_routes::";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private Map<String, RouteDefinition> routeDefinitionMaps = new HashMap<>();

    private void loadRouteDefinition() {
        Set<String> gatewayKeys = stringRedisTemplate.keys(GATEWAY_ROUTES + "*");

        if (CollectionUtils.isEmpty(gatewayKeys)) {
            return;
        }

        List<String> gatewayRoutes = Optional.ofNullable(stringRedisTemplate.opsForValue().multiGet(gatewayKeys)).orElse(Lists.newArrayList());
        gatewayRoutes.forEach(value -> {
            try {
                RouteDefinition routeDefinition = new ObjectMapper().readValue(value, RouteDefinition.class);
                routeDefinitionMaps.put(routeDefinition.getId(), routeDefinition);
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        });
    }

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        loadRouteDefinition();
        return Flux.fromIterable(routeDefinitionMaps.values());
    }

    @Override
    public Mono<Void> save(Mono<RouteDefinition> routeDefinitionMono) {
        return routeDefinitionMono.flatMap(routeDefinition -> {
            routeDefinitionMaps.put(routeDefinition.getId(), routeDefinition);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
            routeDefinitionMaps.remove(id);
            return Mono.empty();
        });
    }
}
  • gateway-admin 管理动态路由

    动态路由管理我们将初始路由设置到关系型数据库,并且加载到缓存Redis中,增删改的时候同时修改落地数据库及缓存。
@RestController
@RequestMapping("/gateway/routes")
@Api("gateway routes")
@Slf4j
public class GatewayRouteController {

    @Autowired
    private IGatewayRouteService gatewayRoutService;

    @ApiOperation(value = "新增网关路由", notes = "新增一个网关路由")
    @ApiImplicitParam(name = "gatewayRoutForm", value = "新增网关路由form表单", required = true, dataType = "GatewayRouteForm")
    @PostMapping
    public Result add(@Valid @RequestBody GatewayRouteForm gatewayRoutForm) {
        log.info("name:", gatewayRoutForm);
        GatewayRoute gatewayRout = gatewayRoutForm.toPo(GatewayRoute.class);
        return Result.success(gatewayRoutService.add(gatewayRout));
    }

    @ApiOperation(value = "删除网关路由", notes = "根据url的id来指定删除对象")
    @ApiImplicitParam(paramType = "path", name = "id", value = "网关路由ID", required = true, dataType = "long")
    @DeleteMapping(value = "/{id}")
    public Result delete(@PathVariable long id) {
        gatewayRoutService.delete(id);
        return Result.success();
    }

    @ApiOperation(value = "修改网关路由", notes = "修改指定网关路由信息")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "id", value = "网关路由ID", required = true, dataType = "long"),
            @ApiImplicitParam(name = "gatewayRoutForm", value = "网关路由实体", required = true, dataType = "GatewayRouteForm")
    })
    @PutMapping(value = "/{id}")
    public Result update(@PathVariable long id, @Valid @RequestBody GatewayRouteForm gatewayRoutForm) {
        GatewayRoute gatewayRout = gatewayRoutForm.toPo(GatewayRoute.class);
        gatewayRout.setId(id);
        gatewayRoutService.update(gatewayRout);
        return Result.success();
    }

    @ApiOperation(value = "获取网关路由", notes = "根据id获取指定网关路由信息")
    @ApiImplicitParam(paramType = "path", name = "id", value = "网关路由ID", required = true, dataType = "long")
    @GetMapping(value = "/{id}")
    public Result get(@PathVariable long id) {
        log.info("get with id:{}", id);
        return Result.success(new GatewayRouteVo(gatewayRoutService.get(id)));
    }

    @ApiOperation(value = "根据uri获取网关路由", notes = "根据uri获取网关路由信息,简单查询")
    @ApiImplicitParam(paramType = "query", name = "name", value = "网关路由路径", required = true, dataType = "string")
    @ApiResponses(
            @ApiResponse(code = 200, message = "处理成功", response = Result.class)
    )
    @GetMapping
    public Result get(@RequestParam String uri) {
        List<GatewayRouteVo> gatewayRoutesVo = gatewayRoutService.query(new GatewayRouteQueryParam(uri)).stream().map(GatewayRouteVo::new).collect(Collectors.toList());
        return Result.success(gatewayRoutesVo.stream().findFirst());
    }

    @ApiOperation(value = "搜索网关路由", notes = "根据条件查询网关路由信息")
    @ApiImplicitParam(name = "gatewayRoutQueryForm", value = "网关路由查询参数", required = true, dataType = "GatewayRouteQueryForm")
    @ApiResponses(
            @ApiResponse(code = 200, message = "处理成功", response = Result.class)
    )
    @PostMapping(value = "/conditions")
    public Result search(@Valid @RequestBody GatewayRouteQueryForm gatewayRouteQueryForm) {
        List<GatewayRoute> gatewayRoutes = gatewayRoutService.query(gatewayRouteQueryForm.toParam(GatewayRouteQueryParam.class));
        List<GatewayRouteVo> gatewayRoutesVo = gatewayRoutes.stream().map(GatewayRouteVo::new).collect(Collectors.toList());
        return Result.success(gatewayRoutesVo);
    }

    @ApiOperation(value = "重载网关路由", notes = "将所以网关的路由全部重载到redis中")
    @ApiResponses(
            @ApiResponse(code = 200, message = "处理成功", response = Result.class)
    )
    @PostMapping(value = "/overload")
    public Result overload() {
        return Result.success(gatewayRoutService.overload());
    }

}
@Service
@Slf4j
public class GatewayRouteService implements IGatewayRouteService {

    private static final String GATEWAY_ROUTES = "gateway_routes::";

    @Autowired
    private GatewayRouteMapper gatewayRouteMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public long add(GatewayRoute gatewayRoute) {
        long gatewayId = gatewayRouteMapper.insert(gatewayRoute);
        stringRedisTemplate.opsForValue().set(GATEWAY_ROUTES + gatewayRoute.getId(), toJson(new GatewayRouteVo(gatewayRoute)));
        return gatewayId;
    }

    @Override
    public void delete(long id) {
        gatewayRouteMapper.delete(id);
        stringRedisTemplate.delete(GATEWAY_ROUTES + id);
    }

    @Override
    public void update(GatewayRoute gatewayRoute) {
        stringRedisTemplate.delete(GATEWAY_ROUTES + gatewayRoute.getId());
        stringRedisTemplate.opsForValue().set(GATEWAY_ROUTES, toJson(new GatewayRouteVo(get(gatewayRoute.getId()))));
    }

    @Override
    public GatewayRoute get(long id) {
        return gatewayRouteMapper.select(id);
    }

    @Override
    public List<GatewayRoute> query(GatewayRouteQueryParam gatewayRouteQueryParam) {
        return gatewayRouteMapper.query(gatewayRouteQueryParam);
    }

    @Override
    public boolean overload() {
        List<GatewayRoute> gatewayRoutes = gatewayRouteMapper.query(new GatewayRouteQueryParam());
        ValueOperations<String, String> opsForValue = stringRedisTemplate.opsForValue();
        gatewayRoutes.forEach(gatewayRoute ->
                opsForValue.set(GATEWAY_ROUTES + gatewayRoute.getId(), toJson(new GatewayRouteVo(gatewayRoute)))
        );
        return true;
    }

    /**
     * GatewayRoute转换为json
     *
     * @param gatewayRouteVo redis需要的vo
     * @return json string
     */
    private String toJson(GatewayRouteVo gatewayRouteVo) {
        String routeDefinitionJson = Strings.EMPTY;
        try {
            routeDefinitionJson = new ObjectMapper().writeValueAsString(gatewayRouteVo);
        } catch (JsonProcessingException e) {
            log.error("网关对象序列化为json String", e);
        }
        return routeDefinitionJson;
    }
}